You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/01/23 20:40:47 UTC

svn commit: r371636 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: AbstractConnection.java TransportConnection.java TransportConnector.java TransportStatusDetector.java jmx/ManagedTransportConnector.java

Author: chirino
Date: Mon Jan 23 11:40:42 2006
New Revision: 371636

URL: http://svn.apache.org/viewcvs?rev=371636&view=rev
Log:
The Transport Connection now notifies it's Connector of lifecycle events so that the Connector does not have to wrap the broker in yet another filter to notice the connection's lifecycle events.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Mon Jan 23 11:40:42 2006
@@ -77,7 +77,7 @@
     
     protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
     protected final TaskRunner taskRunner;
-    protected final Connector connector;
+    protected final TransportConnector connector;
     protected BrokerInfo brokerInfo;
     private ConnectionStatistics statistics = new ConnectionStatistics();
     private boolean inServiceException=false;
@@ -107,7 +107,7 @@
      * @param broker
      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport else commands are sent async.
      */
-    public AbstractConnection(Connector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) {
+    public AbstractConnection(TransportConnector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) {
         
         this.connector = connector;
         this.broker = broker;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Jan 23 11:40:42 2006
@@ -73,9 +73,11 @@
         transport.start();
         active = true;
         super.start();
+        connector.onStarted(this);
     }
 
     public void stop() throws Exception {
+        connector.onStopped(this);
         try {
             if (masterBroker != null){
                 masterBroker.stop();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Mon Jan 23 11:40:42 2006
@@ -27,7 +27,6 @@
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.region.ConnectorStatistics;
 import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -51,7 +50,6 @@
     private static final Log log = LogFactory.getLog(TransportConnector.class);
 
     private Broker broker;
-    private BrokerFilter brokerFilter;
     private TransportServer server;
     private URI uri;
     private BrokerInfo brokerInfo = new BrokerInfo();
@@ -195,8 +193,8 @@
         }
         this.statusDector.stop();
         for (Iterator iter = connections.iterator(); iter.hasNext();) {
-            ConnectionContext context = (ConnectionContext) iter.next();
-            ss.stop(context.getConnection());
+            TransportConnection c = (TransportConnection) iter.next();
+            ss.stop(c);
         }
         ss.throwFirstException();
     }
@@ -204,28 +202,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected Connection createConnection(Transport transport) throws IOException {
-        return new TransportConnection(this, transport, getBrokerFilter(), taskRunnerFactory);
-    }
-
-    protected BrokerFilter getBrokerFilter() {
-        if (brokerFilter == null) {
-            if (broker == null) {
-                throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?");
-            }
-            this.brokerFilter = new BrokerFilter(broker) {
-                public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable {
-                    connections.add(context);
-                    super.addConnection(context, info);
-                }
-
-                public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Throwable {
-                    connections.remove(context);
-                    super.removeConnection(context, info, error);
-                }
-            };
-
-        }
-        return brokerFilter;
+        return new TransportConnection(this, transport, broker, taskRunnerFactory);
     }
 
     protected TransportServer createTransportServer() throws IOException, URISyntaxException {
@@ -276,6 +253,14 @@
 
     public void setConnectUri(URI transportUri) {
         this.connectUri = transportUri;
+    }
+
+    public void onStarted(TransportConnection connection) {
+        connections.add(connection);
+    }
+
+    public void onStopped(TransportConnection connection) {
+        connections.remove(connection);
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java Mon Jan 23 11:40:42 2006
@@ -75,14 +75,10 @@
     }
     protected void doSweep(){
         for(Iterator i=connector.getConnections().iterator();i.hasNext();){
-            ConnectionContext cc=(ConnectionContext) i.next();
-            Connection connection=cc.getConnection();
-            if(connection instanceof TransportConnection){
-                TransportConnection tc=(TransportConnection) connection;
-                if(tc.isMarkedCandidate()){
-                    tc.doMark();
-                    collectionCandidates.add(tc);
-                }
+            TransportConnection connection=(TransportConnection) i.next();
+            if(connection.isMarkedCandidate()){
+                connection.doMark();
+                collectionCandidates.add(connection);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java?rev=371636&r1=371635&r2=371636&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java Mon Jan 23 11:40:42 2006
@@ -57,7 +57,7 @@
             connectionId = "" + (nextConnectionId++);
         }
 
-        return new ManagedTransportConnection(this, transport, getBrokerFilter(), getTaskRunnerFactory(), mbeanServer, connectorName, connectionId);
+        return new ManagedTransportConnection(this, transport, getBroker(), getTaskRunnerFactory(), mbeanServer, connectorName, connectionId);
     }
 
 }