You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/09/25 13:26:53 UTC
svn commit: r449654 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
TransportConnection.java jmx/ManagedTransportConnection.java
Author: jstrachan
Date: Mon Sep 25 04:26:53 2006
New Revision: 449654
URL: http://svn.apache.org/viewvc?view=rev&rev=449654
Log:
applied patch from John Heitmann for AMQ-932 - many thanks!
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=449654&r1=449653&r2=449654
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Sep 25 04:26:53 2006
@@ -17,8 +17,6 @@
*/
package org.apache.activemq.broker;
-import java.io.IOException;
-
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
@@ -30,8 +28,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+
/**
- *
* @version $Revision: 1.8 $
*/
public class TransportConnection extends AbstractConnection {
@@ -43,7 +42,9 @@
private boolean blocked;
private boolean connected;
private boolean active;
- private long timeStamp=0;
+ private boolean starting;
+ private boolean pendingStop;
+ private long timeStamp = 0;
private MasterBroker masterBroker; //used if this connection is used by a Slave
/**
@@ -59,10 +60,11 @@
this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
Response response = service(command);
- if( response!=null ) {
+ if (response != null) {
dispatch(response);
}
}
+
public void onException(IOException exception) {
serviceTransportException(exception);
}
@@ -70,26 +72,48 @@
connected = true;
}
- public void start() throws Exception {
- transport.start();
- active = true;
- super.start();
- connector.onStarted(this);
+ public synchronized void start() throws Exception {
+ starting = true;
+ try {
+ transport.start();
+ active = true;
+ super.start();
+ connector.onStarted(this);
+ }
+ finally {
+ // stop() can be called from within the above block,
+ // but we want to be sure start() completes before
+ // stop() runs, so queue the stop until right now:
+ starting = false;
+ if (pendingStop) {
+ log.debug("Calling the delayed stop()");
+ stop();
+ }
+ }
}
- public void stop() throws Exception {
+ public synchronized void stop() throws Exception {
+ // If we're in the middle of starting
+ // then go no further... for now.
+ pendingStop = true;
+ if (starting) {
+ log.debug("stop() called in the middle of start(). Delaying...");
+ return;
+ }
+
connector.onStopped(this);
try {
- if (masterBroker != null){
+ if (masterBroker != null) {
masterBroker.stop();
}
-
+
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
- if( transportException == null ) {
- transport.oneway(new ShutdownInfo());
+ if (transportException == null) {
+ transport.oneway(new ShutdownInfo());
}
- } catch (Exception ignore) {
+ }
+ catch (Exception ignore) {
//ignore.printStackTrace();
}
@@ -97,131 +121,161 @@
active = false;
super.stop();
}
-
-
+
+
/**
* @return Returns the blockedCandidate.
*/
- public boolean isBlockedCandidate(){
+ public boolean isBlockedCandidate() {
return blockedCandidate;
}
+
/**
- * @param blockedCandidate
- * The blockedCandidate to set.
+ * @param blockedCandidate The blockedCandidate to set.
*/
- public void setBlockedCandidate(boolean blockedCandidate){
- this.blockedCandidate=blockedCandidate;
+ public void setBlockedCandidate(boolean blockedCandidate) {
+ this.blockedCandidate = blockedCandidate;
}
+
/**
* @return Returns the markedCandidate.
*/
- public boolean isMarkedCandidate(){
+ public boolean isMarkedCandidate() {
return markedCandidate;
}
+
/**
- * @param markedCandidate
- * The markedCandidate to set.
+ * @param markedCandidate The markedCandidate to set.
*/
- public void setMarkedCandidate(boolean markedCandidate){
- this.markedCandidate=markedCandidate;
- if(!markedCandidate){
- timeStamp=0;
- blockedCandidate=false;
+ public void setMarkedCandidate(boolean markedCandidate) {
+ this.markedCandidate = markedCandidate;
+ if (!markedCandidate) {
+ timeStamp = 0;
+ blockedCandidate = false;
}
}
+
/**
- * @param slow
- * The slow to set.
+ * @param slow The slow to set.
*/
- public void setSlow(boolean slow){
- this.slow=slow;
+ public void setSlow(boolean slow) {
+ this.slow = slow;
}
+
/**
* @return true if the Connection is slow
*/
- public boolean isSlow(){
+ public boolean isSlow() {
return slow;
}
+
/**
* @return true if the Connection is potentially blocked
*/
- public boolean isMarkedBlockedCandidate(){
+ public boolean isMarkedBlockedCandidate() {
return markedCandidate;
}
-
+
/**
* Mark the Connection, so we can deem if it's collectable on the next sweep
*/
- public void doMark(){
- if(timeStamp==0){
- timeStamp=System.currentTimeMillis();
+ public void doMark() {
+ if (timeStamp == 0) {
+ timeStamp = System.currentTimeMillis();
}
}
+
/**
* @return if after being marked, the Connection is still writing
*/
- public boolean isBlocked(){
+ public boolean isBlocked() {
return blocked;
}
+
/**
* @return true if the Connection is connected
*/
- public boolean isConnected(){
+ public boolean isConnected() {
return connected;
}
+
/**
- * @param blocked
- * The blocked to set.
+ * @param blocked The blocked to set.
*/
- public void setBlocked(boolean blocked){
- this.blocked=blocked;
+ public void setBlocked(boolean blocked) {
+ this.blocked = blocked;
}
+
/**
- * @param connected
- * The connected to set.
+ * @param connected The connected to set.
*/
- public void setConnected(boolean connected){
- this.connected=connected;
+ public void setConnected(boolean connected) {
+ this.connected = connected;
}
+
/**
* @return true if the Connection is active
*/
- public boolean isActive(){
+ public boolean isActive() {
return active;
}
+
+ /**
+ * @param active The active to set.
+ */
+ public void setActive(boolean active) {
+ this.active = active;
+ }
+
+ /**
+ * @return true if the Connection is starting
+ */
+ public synchronized boolean isStarting() {
+ return starting;
+ }
+
+ synchronized protected void setStarting(boolean starting) {
+ this.starting = starting;
+ }
+
/**
- * @param active
- * The active to set.
+ * @return true if the Connection needs to stop
*/
- public void setActive(boolean active){
- this.active=active;
+ public synchronized boolean isPendingStop() {
+ return pendingStop;
+ }
+
+ protected synchronized void setPendingStop(boolean pendingStop) {
+ this.pendingStop = pendingStop;
}
-
- public Response processBrokerInfo(BrokerInfo info){
- if(info.isSlaveBroker()){
+
+ public Response processBrokerInfo(BrokerInfo info) {
+ if (info.isSlaveBroker()) {
//stream messages from this broker (the master) to
//the slave
- MutableBrokerFilter parent=(MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
- masterBroker=new MasterBroker(parent,transport);
+ MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
+ masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
- log.info("Slave Broker "+info.getBrokerName()+" is attached");
+ log.info("Slave Broker " + info.getBrokerName() + " is attached");
}
return super.processBrokerInfo(info);
}
- protected void dispatch(Command command){
- try{
+ protected void dispatch(Command command) {
+ try {
setMarkedCandidate(true);
transport.oneway(command);
getStatistics().onCommand(command);
- }catch(IOException e){
+ }
+ catch (IOException e) {
serviceException(e);
- }finally{
+ }
+ finally {
setMarkedCandidate(false);
}
}
- public String getRemoteAddress() {
- return transport.getRemoteAddress();
- }
+ public String getRemoteAddress() {
+ return transport.getRemoteAddress();
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?view=diff&rev=449654&r1=449653&r2=449654
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Mon Sep 25 04:26:53 2006
@@ -58,7 +58,11 @@
setConnectionId(connectionId);
}
- public void stop() throws Exception {
+ public synchronized void stop() throws Exception {
+ if (isStarting()) {
+ setPendingStop(true);
+ return;
+ }
unregisterMBean();
super.stop();
}