You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/09/25 13:26:53 UTC

svn commit: r449654 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: TransportConnection.java jmx/ManagedTransportConnection.java

Author: jstrachan
Date: Mon Sep 25 04:26:53 2006
New Revision: 449654

URL: http://svn.apache.org/viewvc?view=rev&rev=449654
Log:
applied patch from John Heitmann for AMQ-932  - many thanks!

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=449654&r1=449653&r2=449654
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Sep 25 04:26:53 2006
@@ -17,8 +17,6 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.IOException;
-
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
@@ -30,8 +28,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.IOException;
+
 /**
- * 
  * @version $Revision: 1.8 $
  */
 public class TransportConnection extends AbstractConnection {
@@ -43,7 +42,9 @@
     private boolean blocked;
     private boolean connected;
     private boolean active;
-    private long timeStamp=0;
+    private boolean starting;
+    private boolean pendingStop;
+    private long timeStamp = 0;
     private MasterBroker masterBroker; //used if this connection is used by a Slave
 
     /**
@@ -59,10 +60,11 @@
         this.transport.setTransportListener(new DefaultTransportListener() {
             public void onCommand(Command command) {
                 Response response = service(command);
-                if( response!=null ) {
+                if (response != null) {
                     dispatch(response);
                 }
             }
+
             public void onException(IOException exception) {
                 serviceTransportException(exception);
             }
@@ -70,26 +72,48 @@
         connected = true;
     }
 
-    public void start() throws Exception {
-        transport.start();
-        active = true;
-        super.start();
-        connector.onStarted(this);
+    public synchronized void start() throws Exception {
+        starting = true;
+        try {
+            transport.start();
+            active = true;
+            super.start();
+            connector.onStarted(this);
+        }
+        finally {
+            // stop() can be called from within the above block,
+            // but we want to be sure start() completes before
+            // stop() runs, so queue the stop until right now:
+            starting = false;
+            if (pendingStop) {
+                log.debug("Calling the delayed stop()");
+                stop();
+            }
+        }
     }
 
-    public void stop() throws Exception {
+    public synchronized void stop() throws Exception {
+        // If we're in the middle of starting
+        // then go no further... for now.
+        pendingStop = true;
+        if (starting) {
+            log.debug("stop() called in the middle of start(). Delaying...");
+            return;
+        }
+
         connector.onStopped(this);
         try {
-            if (masterBroker != null){
+            if (masterBroker != null) {
                 masterBroker.stop();
             }
-            
+
             // If the transport has not failed yet,
             // notify the peer that we are doing a normal shutdown.
-            if( transportException == null ) {
-            	transport.oneway(new ShutdownInfo());
+            if (transportException == null) {
+                transport.oneway(new ShutdownInfo());
             }
-        } catch (Exception ignore) {
+        }
+        catch (Exception ignore) {
             //ignore.printStackTrace();
         }
 
@@ -97,131 +121,161 @@
         active = false;
         super.stop();
     }
-    
-    
+
+
     /**
      * @return Returns the blockedCandidate.
      */
-    public boolean isBlockedCandidate(){
+    public boolean isBlockedCandidate() {
         return blockedCandidate;
     }
+
     /**
-     * @param blockedCandidate
-     *            The blockedCandidate to set.
+     * @param blockedCandidate The blockedCandidate to set.
      */
-    public void setBlockedCandidate(boolean blockedCandidate){
-        this.blockedCandidate=blockedCandidate;
+    public void setBlockedCandidate(boolean blockedCandidate) {
+        this.blockedCandidate = blockedCandidate;
     }
+
     /**
      * @return Returns the markedCandidate.
      */
-    public boolean isMarkedCandidate(){
+    public boolean isMarkedCandidate() {
         return markedCandidate;
     }
+
     /**
-     * @param markedCandidate
-     *            The markedCandidate to set.
+     * @param markedCandidate The markedCandidate to set.
      */
-    public void setMarkedCandidate(boolean markedCandidate){
-        this.markedCandidate=markedCandidate;
-        if(!markedCandidate){
-            timeStamp=0;
-            blockedCandidate=false;
+    public void setMarkedCandidate(boolean markedCandidate) {
+        this.markedCandidate = markedCandidate;
+        if (!markedCandidate) {
+            timeStamp = 0;
+            blockedCandidate = false;
         }
     }
+
     /**
-     * @param slow
-     *            The slow to set.
+     * @param slow The slow to set.
      */
-    public void setSlow(boolean slow){
-        this.slow=slow;
+    public void setSlow(boolean slow) {
+        this.slow = slow;
     }
+
     /**
      * @return true if the Connection is slow
      */
-    public boolean isSlow(){
+    public boolean isSlow() {
         return slow;
     }
+
     /**
      * @return true if the Connection is potentially blocked
      */
-    public boolean isMarkedBlockedCandidate(){
+    public boolean isMarkedBlockedCandidate() {
         return markedCandidate;
     }
-    
+
     /**
      * Mark the Connection, so we can deem if it's collectable on the next sweep
      */
-    public void doMark(){
-        if(timeStamp==0){
-            timeStamp=System.currentTimeMillis();
+    public void doMark() {
+        if (timeStamp == 0) {
+            timeStamp = System.currentTimeMillis();
         }
     }
+
     /**
      * @return if after being marked, the Connection is still writing
      */
-    public boolean isBlocked(){
+    public boolean isBlocked() {
         return blocked;
     }
+
     /**
      * @return true if the Connection is connected
      */
-    public boolean isConnected(){
+    public boolean isConnected() {
         return connected;
     }
+
     /**
-     * @param blocked
-     *            The blocked to set.
+     * @param blocked The blocked to set.
      */
-    public void setBlocked(boolean blocked){
-        this.blocked=blocked;
+    public void setBlocked(boolean blocked) {
+        this.blocked = blocked;
     }
+
     /**
-     * @param connected
-     *            The connected to set.
+     * @param connected The connected to set.
      */
-    public void setConnected(boolean connected){
-        this.connected=connected;
+    public void setConnected(boolean connected) {
+        this.connected = connected;
     }
+
     /**
      * @return true if the Connection is active
      */
-    public boolean isActive(){
+    public boolean isActive() {
         return active;
     }
+
+    /**
+     * @param active The active to set.
+     */
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    /**
+     * @return true if the Connection is starting
+     */
+    public synchronized boolean isStarting() {
+        return starting;
+    }
+
+    synchronized protected void setStarting(boolean starting) {
+        this.starting = starting;
+    }
+
     /**
-     * @param active
-     *            The active to set.
+     * @return true if the Connection needs to stop
      */
-    public void setActive(boolean active){
-        this.active=active;
+    public synchronized boolean isPendingStop() {
+        return pendingStop;
+    }
+
+    protected synchronized void setPendingStop(boolean pendingStop) {
+        this.pendingStop = pendingStop;
     }
-    
-    public Response processBrokerInfo(BrokerInfo info){
-        if(info.isSlaveBroker()){
+
+    public Response processBrokerInfo(BrokerInfo info) {
+        if (info.isSlaveBroker()) {
             //stream messages from this broker (the master) to 
             //the slave
-            MutableBrokerFilter parent=(MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
-            masterBroker=new MasterBroker(parent,transport);
+            MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
+            masterBroker = new MasterBroker(parent, transport);
             masterBroker.startProcessing();
-            log.info("Slave Broker "+info.getBrokerName()+" is attached");
+            log.info("Slave Broker " + info.getBrokerName() + " is attached");
         }
         return super.processBrokerInfo(info);
     }
 
-    protected void dispatch(Command command){
-        try{
+    protected void dispatch(Command command) {
+        try {
             setMarkedCandidate(true);
             transport.oneway(command);
             getStatistics().onCommand(command);
-        }catch(IOException e){
+        }
+        catch (IOException e) {
             serviceException(e);
-        }finally{
+        }
+        finally {
             setMarkedCandidate(false);
         }
     }
 
-	public String getRemoteAddress() {
-		return transport.getRemoteAddress();
-	}
+    public String getRemoteAddress() {
+        return transport.getRemoteAddress();
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?view=diff&rev=449654&r1=449653&r2=449654
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Mon Sep 25 04:26:53 2006
@@ -58,7 +58,11 @@
         setConnectionId(connectionId);
     }
 
-    public void stop() throws Exception {
+    public synchronized void stop() throws Exception {
+        if (isStarting()) {
+            setPendingStop(true);
+            return;
+        }
         unregisterMBean();
         super.stop();
     }