You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/29 21:19:25 UTC

svn commit: r491088 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Author: rajdavies
Date: Fri Dec 29 12:19:25 2006
New Revision: 491088

URL: http://svn.apache.org/viewvc?view=rev&rev=491088
Log:
put the synchronization back for now

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=491088&r1=491087&r2=491088
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Dec 29 12:19:25 2006
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker;
 
 import java.io.IOException;
@@ -24,7 +21,10 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -73,42 +73,32 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * @version $Revision: 1.8 $
  */
-public class TransportConnection implements Service, Connection, Task, CommandVisitor {
-	
-    private static final Log log = LogFactory.getLog(TransportConnection.class);
-    private static final Log transportLog = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
-    private static final Log serviceLog = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
-    
+public class TransportConnection implements Service,Connection,Task,CommandVisitor{
+
+    private static final Log log=LogFactory.getLog(TransportConnection.class);
+    private static final Log transportLog=LogFactory.getLog(TransportConnection.class.getName()+".Transport");
+    private static final Log serviceLog=LogFactory.getLog(TransportConnection.class.getName()+".Service");
     // Keeps track of the broker and connector that created this connection.
     protected final Broker broker;
-    private MasterBroker masterBroker; 
+    private MasterBroker masterBroker;
     protected final TransportConnector connector;
-    private final Transport transport;    
+    private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
-    
     // Keeps track of the state of the connections.
-    protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
+    protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
     protected final Map brokerConnectionStates;
-    
     // The broker and wireformat info that was exchanged.
     protected BrokerInfo brokerInfo;
-    private WireFormatInfo wireFormatInfo;    
-
-    // Used to do async dispatch..  this should perhaps be pushed down into the transport layer..
-    protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
+    private WireFormatInfo wireFormatInfo;
+    // Used to do async dispatch.. this should perhaps be pushed down into the transport layer..
+    protected final List dispatchQueue=Collections.synchronizedList(new LinkedList());
     protected final TaskRunner taskRunner;
-    protected IOException transportException;        
+    protected IOException transportException;
     private boolean inServiceException=false;
-
-    private ConnectionStatistics statistics = new ConnectionStatistics();
+    private ConnectionStatistics statistics=new ConnectionStatistics();
     private boolean manageable;
     private boolean slow;
     private boolean markedCandidate;
@@ -118,382 +108,357 @@
     private boolean active;
     private boolean starting;
     private boolean pendingStop;
-    private long timeStamp = 0;
-    
-    private AtomicBoolean stopped = new AtomicBoolean(false);
+    private long timeStamp=0;
+    private AtomicBoolean stopped=new AtomicBoolean(false);
     protected final AtomicBoolean disposed=new AtomicBoolean(false);
-    private CountDownLatch stopLatch = new CountDownLatch(1);
-    protected final AtomicBoolean asyncException = new AtomicBoolean(false);
-    
-    static class ConnectionState extends org.apache.activemq.state.ConnectionState {
+    private CountDownLatch stopLatch=new CountDownLatch(1);
+    protected final AtomicBoolean asyncException=new AtomicBoolean(false);
+
+    static class ConnectionState extends org.apache.activemq.state.ConnectionState{
+
         private final ConnectionContext context;
         TransportConnection connection;
 
-        public ConnectionState(ConnectionInfo info, ConnectionContext context, TransportConnection connection) {
+        public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){
             super(info);
-            this.context = context;
+            this.context=context;
             this.connection=connection;
         }
-        
-        public ConnectionContext getContext() {
+
+        public ConnectionContext getContext(){
             return context;
         }
-        
-        public TransportConnection getConnection() {
+
+        public TransportConnection getConnection(){
             return connection;
         }
-        
     }
-    
+
     /**
      * @param connector
      * @param transport
      * @param broker
      * @param taskRunnerFactory - can be null if you want direct dispatch to the transport else commands are sent async.
      */
-    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory) {
-
-        this.connector = connector;
-        this.broker = broker;
-        
-        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
-        brokerConnectionStates = rb.getConnectionStates();
-        
-        if (connector != null) {
+    public TransportConnection(TransportConnector connector,final Transport transport,Broker broker,
+            TaskRunnerFactory taskRunnerFactory){
+        this.connector=connector;
+        this.broker=broker;
+        RegionBroker rb=(RegionBroker)broker.getAdaptor(RegionBroker.class);
+        brokerConnectionStates=rb.getConnectionStates();
+        if(connector!=null){
             this.statistics.setParent(connector.getStatistics());
         }
-        
-        if( taskRunnerFactory != null ) {
-            taskRunner = taskRunnerFactory.createTaskRunner( this, "ActiveMQ Connection Dispatcher: "+System.identityHashCode(this) );
-        }
-        else { 
-            taskRunner = null;
-        }        
-        
+        if(taskRunnerFactory!=null){
+            taskRunner=taskRunnerFactory.createTaskRunner(this,"ActiveMQ Connection Dispatcher: "
+                    +System.identityHashCode(this));
+        }else{
+            taskRunner=null;
+        }
         connector.setBrokerName(broker.getBrokerName());
-        this.transport = transport;
-        this.transport.setTransportListener(new DefaultTransportListener() {
-            public void onCommand(Object o) {
-            	Command command = (Command) o;
-                Response response = service(command);
-                if (response != null) {
+        this.transport=transport;
+        this.transport.setTransportListener(new DefaultTransportListener(){
+
+            public void onCommand(Object o){
+                Command command=(Command)o;
+                Response response=service(command);
+                if(response!=null){
                     dispatch(response);
                 }
             }
 
-            public void onException(IOException exception) {
+            public void onException(IOException exception){
                 serviceTransportException(exception);
             }
         });
-        connected = true;
+        connected=true;
     }
 
-    
-    
-    
-    
-    
     /**
      * Returns the number of messages to be dispatched to this connection
      */
-    public int getDispatchQueueSize() {
+    public int getDispatchQueueSize(){
         return dispatchQueue.size();
     }
-    
 
-    public void serviceTransportException(IOException e) {
-        if( !disposed.get() ) {
-            transportException = e; 
-            if( transportLog.isDebugEnabled() )
+    public void serviceTransportException(IOException e){
+        if(!disposed.get()){
+            transportException=e;
+            if(transportLog.isDebugEnabled())
                 transportLog.debug("Transport failed: "+e,e);
             ServiceSupport.dispose(this);
         }
     }
-    
+
     /**
-     * Calls the serviceException method in an async thread.  Since 
-     * handling a service exception closes a socket, we should not tie 
-     * up broker threads since client sockets may hang or cause deadlocks.
+     * Calls the serviceException method in an async thread. Since handling a service exception closes a socket, we
+     * should not tie up broker threads since client sockets may hang or cause deadlocks.
      * 
      * @param e
      */
-	public void serviceExceptionAsync(final IOException e) {
-		if( asyncException.compareAndSet(false, true) ) {
-			new Thread("Async Exception Handler") {
-				public void run() {
-					serviceException(e);
-				}
-			}.start();
-		}
-	}
-
-	/**
-	 * Closes a clients connection due to a detected error.
-	 * 
-	 * Errors are ignored if: the client is closing or broker is closing.
-	 * Otherwise, the connection error transmitted to the client before stopping it's
-	 * transport.
-	 */
-    public void serviceException(Throwable e) {
+    public void serviceExceptionAsync(final IOException e){
+        if(asyncException.compareAndSet(false,true)){
+            new Thread("Async Exception Handler"){
+
+                public void run(){
+                    serviceException(e);
+                }
+            }.start();
+        }
+    }
+
+    /**
+     * Closes a clients connection due to a detected error.
+     * 
+     * Errors are ignored if: the client is closing or broker is closing. Otherwise, the connection error transmitted to
+     * the client before stopping it's transport.
+     */
+    public void serviceException(Throwable e){
         // are we a transport exception such as not being able to dispatch
         // synchronously to a transport
-        if (e instanceof IOException) {
-            serviceTransportException((IOException) e);
+        if(e instanceof IOException){
+            serviceTransportException((IOException)e);
         }
-        
-        // Handle the case where the broker is stopped 
+        // Handle the case where the broker is stopped
         // But the client is still connected.
-        else if (e.getClass() == BrokerStoppedException.class ) {
-            if( !disposed.get() ) {
-                if( serviceLog.isDebugEnabled() )
+        else if(e.getClass()==BrokerStoppedException.class){
+            if(!disposed.get()){
+                if(serviceLog.isDebugEnabled())
                     serviceLog.debug("Broker has been stopped.  Notifying client and closing his connection.");
-                
-                ConnectionError ce = new ConnectionError();
+                ConnectionError ce=new ConnectionError();
                 ce.setException(e);
                 dispatchSync(ce);
-                
                 // Wait a little bit to try to get the output buffer to flush the exption notification to the client.
-                try {
+                try{
                     Thread.sleep(500);
-                } catch (InterruptedException ie) {
+                }catch(InterruptedException ie){
                     Thread.currentThread().interrupt();
                 }
-                
                 // Worst case is we just kill the connection before the notification gets to him.
                 ServiceSupport.dispose(this);
             }
-        }
-        
-        else if( !disposed.get() && !inServiceException ) {
-            inServiceException = true;
-                try {
-                
+        }else if(!disposed.get()&&!inServiceException){
+            inServiceException=true;
+            try{
                 serviceLog.error("Async error occurred: "+e,e);
-                ConnectionError ce = new ConnectionError();
+                ConnectionError ce=new ConnectionError();
                 ce.setException(e);
                 dispatchAsync(ce);
-            } finally {
-                inServiceException = false;
+            }finally{
+                inServiceException=false;
             }
-        } 
+        }
     }
 
-    public Response service(Command command) {
-        
+    public Response service(Command command){
         Response response=null;
-        boolean responseRequired = command.isResponseRequired();
-        int commandId = command.getCommandId();
-        try {
-            response = command.visit(this);
-        } catch ( Throwable e ) {
-            if( responseRequired ) {
-                if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
+        boolean responseRequired=command.isResponseRequired();
+        int commandId=command.getCommandId();
+        try{
+            response=command.visit(this);
+        }catch(Throwable e){
+            if(responseRequired){
+                if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
                     serviceLog.debug("Error occured while processing sync command: "+e,e);
-                response = new ExceptionResponse(e);
-            } else {
+                response=new ExceptionResponse(e);
+            }else{
                 serviceException(e);
             }
-        }        
-        if( responseRequired ) {
-            if( response == null ) {
-                response = new Response();                
+        }
+        if(responseRequired){
+            if(response==null){
+                response=new Response();
             }
             response.setCorrelationId(commandId);
         }
         return response;
-        
     }
-    
-    protected ConnectionState lookupConnectionState(ConsumerId id) {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
-        if( cs== null )
-            throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "+id.getParentId().getParentId());
+
+    protected ConnectionState lookupConnectionState(ConsumerId id){
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
+                    +id.getParentId().getParentId());
         return cs;
     }
-    protected ConnectionState lookupConnectionState(ProducerId id) {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
-        if( cs== null )
-            throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "+id.getParentId().getParentId());        
+
+    protected ConnectionState lookupConnectionState(ProducerId id){
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
+                    +id.getParentId().getParentId());
         return cs;
     }
-    protected ConnectionState lookupConnectionState(SessionId id) {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId());
-        if( cs== null )
-            throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "+id.getParentId());        
+
+    protected ConnectionState lookupConnectionState(SessionId id){
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId());
+        if(cs==null)
+            throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
+                    +id.getParentId());
         return cs;
     }
-    protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(connectionId);
-        if( cs== null )
+
+    protected ConnectionState lookupConnectionState(ConnectionId connectionId){
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId);
+        if(cs==null)
             throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
         return cs;
     }
 
-    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+    public Response processKeepAlive(KeepAliveInfo info) throws Exception{
         return null;
     }
 
-    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
-        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
+    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception{
+        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(),info);
         return null;
     }
-    
-    public Response processWireFormat(WireFormatInfo info) throws Exception {
-        wireFormatInfo = info;
+
+    public Response processWireFormat(WireFormatInfo info) throws Exception{
+        wireFormatInfo=info;
         return null;
     }
-    
-    public Response processShutdown(ShutdownInfo info) throws Exception {
+
+    public Response processShutdown(ShutdownInfo info) throws Exception{
         stop();
         return null;
     }
-     
-    public Response processFlush(FlushCommand command) throws Exception {
+
+    public Response processFlush(FlushCommand command) throws Exception{
         return null;
     }
 
-    public Response processBeginTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+    synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        
         // Avoid replaying dup commands
-        if( cs.getTransactionState(info.getTransactionId())==null ) {
+        if(cs.getTransactionState(info.getTransactionId())==null){
             cs.addTransactionState(info.getTransactionId());
-            broker.beginTransaction(context, info.getTransactionId());
+            broker.beginTransaction(context,info.getTransactionId());
         }
         return null;
     }
-    
-    public Response processEndTransaction(TransactionInfo info) throws Exception {
-        // No need to do anything.  This packet is just sent by the client
+
+    synchronized public Response processEndTransaction(TransactionInfo info) throws Exception{
+        // No need to do anything. This packet is just sent by the client
         // make sure he is synced with the server as commit command could
         // come from a different connection.
         return null;
     }
-    
-    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+
+    synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        
-        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-        if( transactionState == null )
-            throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
-
+        TransactionState transactionState=cs.getTransactionState(info.getTransactionId());
+        if(transactionState==null)
+            throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
+                    +info.getTransactionId());
         // Avoid dups.
-        if( !transactionState.isPrepared() ) {
+        if(!transactionState.isPrepared()){
             transactionState.setPrepared(true);
-            int result = broker.prepareTransaction(context, info.getTransactionId());
+            int result=broker.prepareTransaction(context,info.getTransactionId());
             transactionState.setPreparedResult(result);
-            IntegerResponse response = new IntegerResponse(result);
+            IntegerResponse response=new IntegerResponse(result);
             return response;
-        } else {
-            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
+        }else{
+            IntegerResponse response=new IntegerResponse(transactionState.getPreparedResult());
             return response;
         }
     }
 
-    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+    synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        
         cs.removeTransactionState(info.getTransactionId());
-        broker.commitTransaction(context, info.getTransactionId(), true);
-
+        broker.commitTransaction(context,info.getTransactionId(),true);
         return null;
-        
     }
 
-    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+    synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        
         cs.removeTransactionState(info.getTransactionId());
-        broker.commitTransaction(context, info.getTransactionId(), false);
+        broker.commitTransaction(context,info.getTransactionId(),false);
         return null;
     }
 
-    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+    synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        
         cs.removeTransactionState(info.getTransactionId());
-        broker.rollbackTransaction(context, info.getTransactionId());
+        broker.rollbackTransaction(context,info.getTransactionId());
         return null;
     }
-    
-    public Response processForgetTransaction(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+
+    synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        broker.forgetTransaction(context, info.getTransactionId());
+        broker.forgetTransaction(context,info.getTransactionId());
         return null;
     }
-    
-    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
-        ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+
+    synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{
+        ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
         ConnectionContext context=null;
-        if( cs!=null ) {
-           context = cs.getContext();
+        if(cs!=null){
+            context=cs.getContext();
         }
-        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
+        TransactionId[] preparedTransactions=broker.getPreparedTransactions(context);
         return new DataArrayResponse(preparedTransactions);
     }
 
-
-    public Response processMessage(Message messageSend) throws Exception {
-        
-        ProducerId producerId = messageSend.getProducerId();
-        ConnectionState state = lookupConnectionState(producerId);
-        ConnectionContext context = state.getContext();
-        
-        // If the message originates from this client connection, 
+    public Response processMessage(Message messageSend) throws Exception{
+        ProducerId producerId=messageSend.getProducerId();
+        ConnectionState state=lookupConnectionState(producerId);
+        ConnectionContext context=state.getContext();
+        // If the message originates from this client connection,
         // then, finde the associated producer state so we can do some dup detection.
-        ProducerState producerState=null;        
-        if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
-            SessionState ss = state.getSessionState(producerId.getParentId());
-            if( ss == null )
-                throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
-            producerState = ss.getProducerState(producerId); 
-        }
-        
-        if( producerState == null ) {
-            broker.send(context, messageSend);
-        } else {
+        ProducerState producerState=null;
+        if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
+            SessionState ss=state.getSessionState(producerId.getParentId());
+            if(ss==null)
+                throw new IllegalStateException("Cannot send from a session that had not been registered: "
+                        +producerId.getParentId());
+            producerState=ss.getProducerState(producerId);
+        }
+        if(producerState==null){
+            broker.send(context,messageSend);
+        }else{
             // Avoid Dups.
-            long seq = messageSend.getMessageId().getProducerSequenceId();
-            if( seq > producerState.getLastSequenceId() ) {
+            long seq=messageSend.getMessageId().getProducerSequenceId();
+            if(seq>producerState.getLastSequenceId()){
                 producerState.setLastSequenceId(seq);
-                broker.send(context, messageSend);
+                broker.send(context,messageSend);
             }
         }
-        
         return null;
     }
 
-    public Response processMessageAck(MessageAck ack) throws Exception {
-        broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(), ack);
+    public Response processMessageAck(MessageAck ack) throws Exception{
+        broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(),ack);
         return null;
     }
-    
-    public Response processMessagePull(MessagePull pull) throws Exception {
-        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
+
+    public Response processMessagePull(MessagePull pull) throws Exception{
+        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),pull);
     }
 
     public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
@@ -501,177 +466,157 @@
         return null;
     }
 
-    public Response processAddDestination(DestinationInfo info) throws Exception {
-        ConnectionState cs = lookupConnectionState(info.getConnectionId());
-        broker.addDestinationInfo(cs.getContext(), info);
-        if( info.getDestination().isTemporary() ) {
+    synchronized public Response processAddDestination(DestinationInfo info) throws Exception{
+        ConnectionState cs=lookupConnectionState(info.getConnectionId());
+        broker.addDestinationInfo(cs.getContext(),info);
+        if(info.getDestination().isTemporary()){
             cs.addTempDestination(info);
         }
         return null;
     }
 
-    public Response processRemoveDestination(DestinationInfo info) throws Exception {
-        ConnectionState cs = lookupConnectionState(info.getConnectionId());
-        broker.removeDestinationInfo(cs.getContext(), info);
-        if( info.getDestination().isTemporary() ) {
+    synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception{
+        ConnectionState cs=lookupConnectionState(info.getConnectionId());
+        broker.removeDestinationInfo(cs.getContext(),info);
+        if(info.getDestination().isTemporary()){
             cs.removeTempDestination(info.getDestination());
         }
         return null;
     }
 
-
-    public Response processAddProducer(ProducerInfo info) throws Exception {
-        SessionId sessionId = info.getProducerId().getParentId();
-        ConnectionId connectionId = sessionId.getParentId();
-        
-        ConnectionState cs = lookupConnectionState(connectionId);
-        SessionState ss = cs.getSessionState(sessionId);
-        if( ss == null )
-            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
-
+    synchronized public Response processAddProducer(ProducerInfo info) throws Exception{
+        SessionId sessionId=info.getProducerId().getParentId();
+        ConnectionId connectionId=sessionId.getParentId();
+        ConnectionState cs=lookupConnectionState(connectionId);
+        SessionState ss=cs.getSessionState(sessionId);
+        if(ss==null)
+            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
+                    +sessionId);
         // Avoid replaying dup commands
-        if( !ss.getProducerIds().contains(info.getProducerId()) ) {
-            broker.addProducer(cs.getContext(), info);
-            try {
+        if(!ss.getProducerIds().contains(info.getProducerId())){
+            broker.addProducer(cs.getContext(),info);
+            try{
                 ss.addProducer(info);
-            } catch (IllegalStateException e) {
-                broker.removeProducer(cs.getContext(), info);
+            }catch(IllegalStateException e){
+                broker.removeProducer(cs.getContext(),info);
             }
         }
         return null;
     }
-    
-    public Response processRemoveProducer(ProducerId id) throws Exception {
-        SessionId sessionId = id.getParentId();
-        ConnectionId connectionId = sessionId.getParentId();
-        
-        ConnectionState cs = lookupConnectionState(connectionId);
-        SessionState ss = cs.getSessionState(sessionId);
-        if( ss == null )
-            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "+sessionId);
-        ProducerState ps = ss.removeProducer(id);
-        if( ps == null )
+
+    synchronized public Response processRemoveProducer(ProducerId id) throws Exception{
+        SessionId sessionId=id.getParentId();
+        ConnectionId connectionId=sessionId.getParentId();
+        ConnectionState cs=lookupConnectionState(connectionId);
+        SessionState ss=cs.getSessionState(sessionId);
+        if(ss==null)
+            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
+                    +sessionId);
+        ProducerState ps=ss.removeProducer(id);
+        if(ps==null)
             throw new IllegalStateException("Cannot remove a producer that had not been registered: "+id);
-        
-        broker.removeProducer(cs.getContext(), ps.getInfo());
+        broker.removeProducer(cs.getContext(),ps.getInfo());
         return null;
     }
 
-    public Response processAddConsumer(ConsumerInfo info) throws Exception {
-        SessionId sessionId = info.getConsumerId().getParentId();
-        ConnectionId connectionId = sessionId.getParentId();
-        
-        ConnectionState cs = lookupConnectionState(connectionId);
-        SessionState ss = cs.getSessionState(sessionId);
-        if( ss == null )
-            throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
-
+    synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception{
+        SessionId sessionId=info.getConsumerId().getParentId();
+        ConnectionId connectionId=sessionId.getParentId();
+        ConnectionState cs=lookupConnectionState(connectionId);
+        SessionState ss=cs.getSessionState(sessionId);
+        if(ss==null)
+            throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "
+                    +sessionId);
         // Avoid replaying dup commands
-        if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
-            broker.addConsumer(cs.getContext(), info);
-            try {
+        if(!ss.getConsumerIds().contains(info.getConsumerId())){
+            broker.addConsumer(cs.getContext(),info);
+            try{
                 ss.addConsumer(info);
-            } catch (IllegalStateException e) {
-                broker.removeConsumer(cs.getContext(), info);
+            }catch(IllegalStateException e){
+                broker.removeConsumer(cs.getContext(),info);
             }
         }
-        
         return null;
     }
-    
-    public Response processRemoveConsumer(ConsumerId id) throws Exception {
-        
-        SessionId sessionId = id.getParentId();
-        ConnectionId connectionId = sessionId.getParentId();
-        
-        ConnectionState cs = lookupConnectionState(connectionId);
-        SessionState ss = cs.getSessionState(sessionId);
-        if( ss == null )
-            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "+sessionId);
-        ConsumerState consumerState = ss.removeConsumer(id);
-        if( consumerState == null )
+
+    synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception{
+        SessionId sessionId=id.getParentId();
+        ConnectionId connectionId=sessionId.getParentId();
+        ConnectionState cs=lookupConnectionState(connectionId);
+        SessionState ss=cs.getSessionState(sessionId);
+        if(ss==null)
+            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
+                    +sessionId);
+        ConsumerState consumerState=ss.removeConsumer(id);
+        if(consumerState==null)
             throw new IllegalStateException("Cannot remove a consumer that had not been registered: "+id);
-        
-        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
+        broker.removeConsumer(cs.getContext(),consumerState.getInfo());
         return null;
     }
-    
-    public Response processAddSession(SessionInfo info) throws Exception {
-        ConnectionId connectionId = info.getSessionId().getParentId();
-        ConnectionState cs = lookupConnectionState(connectionId);
-        
+
+    synchronized public Response processAddSession(SessionInfo info) throws Exception{
+        ConnectionId connectionId=info.getSessionId().getParentId();
+        ConnectionState cs=lookupConnectionState(connectionId);
         // Avoid replaying dup commands
-        if( !cs.getSessionIds().contains(info.getSessionId()) ) {
-            broker.addSession(cs.getContext(), info);
-            try {
+        if(!cs.getSessionIds().contains(info.getSessionId())){
+            broker.addSession(cs.getContext(),info);
+            try{
                 cs.addSession(info);
-            } catch (IllegalStateException e) {
-                broker.removeSession(cs.getContext(), info);
+            }catch(IllegalStateException e){
+                broker.removeSession(cs.getContext(),info);
             }
         }
         return null;
     }
-    
-    public Response processRemoveSession(SessionId id) throws Exception {
-        
-        ConnectionId connectionId = id.getParentId();
-        
-        ConnectionState cs = lookupConnectionState(connectionId);
-        SessionState session = cs.getSessionState(id);
 
-        if( session == null )
+    synchronized public Response processRemoveSession(SessionId id) throws Exception{
+        ConnectionId connectionId=id.getParentId();
+        ConnectionState cs=lookupConnectionState(connectionId);
+        SessionState session=cs.getSessionState(id);
+        if(session==null)
             throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
-
         // Don't let new consumers or producers get added while we are closing this down.
         session.shutdown();
-        
         // Cascade the connection stop to the consumers and producers.
-        for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
-            ConsumerId consumerId = (ConsumerId) iter.next();
-            try {
+        for(Iterator iter=session.getConsumerIds().iterator();iter.hasNext();){
+            ConsumerId consumerId=(ConsumerId)iter.next();
+            try{
                 processRemoveConsumer(consumerId);
-            }
-            catch (Throwable e) {
-                log.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
+            }catch(Throwable e){
+                log.warn("Failed to remove consumer: "+consumerId+". Reason: "+e,e);
             }
         }
-        for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
-            ProducerId producerId = (ProducerId) iter.next();
-            try {
+        for(Iterator iter=session.getProducerIds().iterator();iter.hasNext();){
+            ProducerId producerId=(ProducerId)iter.next();
+            try{
                 processRemoveProducer(producerId);
-            }
-            catch (Throwable e) {
-                log.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
+            }catch(Throwable e){
+                log.warn("Failed to remove producer: "+producerId+". Reason: "+e,e);
             }
         }
         cs.removeSession(id);
-        broker.removeSession(cs.getContext(), session.getInfo());
+        broker.removeSession(cs.getContext(),session.getInfo());
         return null;
     }
-    
-    public Response processAddConnection(ConnectionInfo info) throws Exception {
-
-    	ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
-    	
-    	if( state !=null ) {
-    		// ConnectionInfo replay??  Chances are that it's a client reconnecting,
-    		// and we have not detected that that old connection died.. Kill the old connection
-    		// to make sure our state is in sync with the client.
-    		if( this != state.getConnection() ) {
-    			log.debug("Killing previous stale connection: "+state.getConnection());
-    			state.getConnection().stop();
-    			if( !state.getConnection().stopLatch.await(15, TimeUnit.SECONDS) ) {
-    				throw new Exception("Previous connection could not be clean up.");
-    			}
-    		}
-    	}
-    	
-		log.debug("Setting up new connection: "+this);
 
-    	
+    synchronized public Response processAddConnection(ConnectionInfo info) throws Exception{
+        ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId());
+        if(state!=null){
+            // ConnectionInfo replay?? Chances are that it's a client reconnecting,
+            // and we have not detected that that old connection died.. Kill the old connection
+            // to make sure our state is in sync with the client.
+            if(this!=state.getConnection()){
+                log.debug("Killing previous stale connection: "+state.getConnection());
+                state.getConnection().stop();
+                if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){
+                    throw new Exception("Previous connection could not be clean up.");
+                }
+            }
+        }
+        log.debug("Setting up new connection: "+this);
         // Setup the context.
-        String clientId = info.getClientId();
-        ConnectionContext context = new ConnectionContext();
+        String clientId=info.getClientId();
+        ConnectionContext context=new ConnectionContext();
         context.setConnection(this);
         context.setBroker(broker);
         context.setConnector(connector);
@@ -681,95 +626,84 @@
         context.setConnectionId(info.getConnectionId());
         context.setWireFormatInfo(wireFormatInfo);
         context.incrementReference();
-        this.manageable = info.isManageable();
-        
-        state = new ConnectionState(info, context, this);
-        brokerConnectionStates.put(info.getConnectionId(), state);
-        localConnectionStates.put(info.getConnectionId(), state);           
-        
-        broker.addConnection(context, info);
-        if (info.isManageable() && broker.isFaultTolerantConfiguration()){
-            //send ConnectionCommand
-            ConnectionControl command = new ConnectionControl();
+        this.manageable=info.isManageable();
+        state=new ConnectionState(info,context,this);
+        brokerConnectionStates.put(info.getConnectionId(),state);
+        localConnectionStates.put(info.getConnectionId(),state);
+        broker.addConnection(context,info);
+        if(info.isManageable()&&broker.isFaultTolerantConfiguration()){
+            // send ConnectionCommand
+            ConnectionControl command=new ConnectionControl();
             command.setFaultTolerant(broker.isFaultTolerantConfiguration());
             dispatchAsync(command);
         }
-
         return null;
     }
-    
-    public Response processRemoveConnection(ConnectionId id)  {
-        
-        ConnectionState cs = lookupConnectionState(id);
-        
+
+    synchronized public Response processRemoveConnection(ConnectionId id){
+        ConnectionState cs=lookupConnectionState(id);
         // Don't allow things to be added to the connection state while we are shutting down.
         cs.shutdown();
-        
         // Cascade the connection stop to the sessions.
-        for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
-           
-                SessionId sessionId = (SessionId) iter.next();
-                try{
+        for(Iterator iter=cs.getSessionIds().iterator();iter.hasNext();){
+            SessionId sessionId=(SessionId)iter.next();
+            try{
                 processRemoveSession(sessionId);
             }catch(Throwable e){
-                serviceLog.warn("Failed to remove session " + sessionId,e);
+                serviceLog.warn("Failed to remove session "+sessionId,e);
             }
         }
-        
         // Cascade the connection stop to temp destinations.
-        for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
-            DestinationInfo di = (DestinationInfo) iter.next();
+        for(Iterator iter=cs.getTempDesinations().iterator();iter.hasNext();){
+            DestinationInfo di=(DestinationInfo)iter.next();
             try{
-                broker.removeDestination(cs.getContext(), di.getDestination(), 0);
+                broker.removeDestination(cs.getContext(),di.getDestination(),0);
             }catch(Throwable e){
-               serviceLog.warn("Failed to remove tmp destination " + di.getDestination(), e);
+                serviceLog.warn("Failed to remove tmp destination "+di.getDestination(),e);
             }
             iter.remove();
         }
-        
         try{
-            broker.removeConnection(cs.getContext(), cs.getInfo(), null);
+            broker.removeConnection(cs.getContext(),cs.getInfo(),null);
         }catch(Throwable e){
-            serviceLog.warn("Failed to remove connection " +  cs.getInfo(),e);
+            serviceLog.warn("Failed to remove connection "+cs.getInfo(),e);
         }
-        ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
-        if( state != null ) {
+        ConnectionState state=(ConnectionState)localConnectionStates.remove(id);
+        if(state!=null){
             // If we are the last reference, we should remove the state
             // from the broker.
-            if( state.getContext().decrementReference() == 0 ){ 
+            if(state.getContext().decrementReference()==0){
                 brokerConnectionStates.remove(id);
             }
         }
         return null;
     }
 
-    
-    public Connector getConnector() {
+    public Connector getConnector(){
         return connector;
     }
 
-    public void dispatchSync(Command message) {
+    public void dispatchSync(Command message){
         processDispatch(message);
     }
-    
-    
-    public void dispatchAsync(Command message) {
-        if( taskRunner==null ) {
-            dispatchSync( message );
-        } else {
+
+    public void dispatchAsync(Command message){
+        if(taskRunner==null){
+            dispatchSync(message);
+        }else{
             dispatchQueue.add(message);
-            try {
+            try{
                 taskRunner.wakeup();
-            } catch (InterruptedException e) {
+            }catch(InterruptedException e){
                 Thread.currentThread().interrupt();
             }
-        }        
+        }
     }
-    
+
     protected void processDispatch(Command command){
         if(command.isMessageDispatch()){
-            MessageDispatch md=(MessageDispatch) command;
-            Runnable sub=(Runnable) md.getConsumer();
+            MessageDispatch md=(MessageDispatch)command;
+            Runnable sub=(Runnable)md.getConsumer();
             broker.processDispatch(md);
             try{
                 dispatch(command);
@@ -781,288 +715,270 @@
         }else{
             dispatch(command);
         }
-    }       
-    
-    public boolean iterate() {
-        if( dispatchQueue.isEmpty() || broker.isStopped()) {
+    }
+
+    public boolean iterate(){
+        if(dispatchQueue.isEmpty()||broker.isStopped()){
             return false;
-        } else {
-            Command command = (Command) dispatchQueue.remove(0);
-            processDispatch( command );
+        }else{
+            Command command=(Command)dispatchQueue.remove(0);
+            processDispatch(command);
             return true;
         }
-    }    
-            
+    }
+
     /**
      * Returns the statistics for this connection
      */
-    public ConnectionStatistics getStatistics() {
+    public ConnectionStatistics getStatistics(){
         return statistics;
     }
 
-    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
+    public MessageAuthorizationPolicy getMessageAuthorizationPolicy(){
         return messageAuthorizationPolicy;
     }
 
-    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
-        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
+    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy){
+        this.messageAuthorizationPolicy=messageAuthorizationPolicy;
     }
-    
+
     public boolean isManageable(){
         return manageable;
     }
-    
-    
-    public synchronized void start() throws Exception {
-        starting = true;
-        try {
+
+    public synchronized void start() throws Exception{
+        starting=true;
+        try{
             transport.start();
-            active = true;
+            active=true;
             this.processDispatch(connector.getBrokerInfo());
             connector.onStarted(this);
-        }
-        finally {
+        }finally{
             // stop() can be called from within the above block,
             // but we want to be sure start() completes before
             // stop() runs, so queue the stop until right now:
-            starting = false;
-            if (pendingStop) {
+            starting=false;
+            if(pendingStop){
                 log.debug("Calling the delayed stop()");
                 stop();
             }
         }
     }
 
-    public void stop() throws Exception {
+    public void stop() throws Exception{
         // If we're in the middle of starting
         // then go no further... for now.
-        synchronized(this) { 
-	        pendingStop = true;
-	        if (starting) {
-	            log.debug("stop() called in the middle of start(). Delaying...");
-	            return;
-	        }
-        }
-
-    	
-    	if( stopped.compareAndSet(false, true) ) {
-
-    		log.debug("Stopping connection: "+transport.getRemoteAddress());
-	        connector.onStopped(this);
-	        try {
-	            if (masterBroker != null){
-	                masterBroker.stop();
-	            }
-	            
-	            // If the transport has not failed yet,
-	            // notify the peer that we are doing a normal shutdown.
-	            if( transportException == null ) {
-	            	transport.oneway(new ShutdownInfo());
-	            }
-	        } catch (Exception ignore) {
-	            //ignore.printStackTrace();
-	        }
-	
-	        transport.stop();
-	        active = false;
-	       
-	        if(disposed.compareAndSet(false, true)) {
-		        
-		        if( taskRunner!=null )
-		            taskRunner.shutdown();
-		        
-		        // Clear out the dispatch queue to release any memory that
-		        // is being held on to.
-		        dispatchQueue.clear();
-		        
-		        //
-		        // Remove all logical connection associated with this connection
-		        // from the broker.
-		        if(!broker.isStopped()){
-		            ArrayList l=new ArrayList(localConnectionStates.keySet());
-		            for(Iterator iter=l.iterator();iter.hasNext();){
-		                ConnectionId connectionId=(ConnectionId) iter.next();
-		                try{
-		                	log.debug("Cleaning up connection resources.");
-		                    processRemoveConnection(connectionId);
-		                }catch(Throwable ignore){
-		                	ignore.printStackTrace();
-		                }
-		            }
-		            if(brokerInfo!=null){
-		                broker.removeBroker(this,brokerInfo);
-		            }
-		        }
-				stopLatch.countDown();
-	        }
-	        
-	        
-    		log.debug("Stopped connection: "+transport.getRemoteAddress());
-    	}
+        synchronized(this){
+            pendingStop=true;
+            if(starting){
+                log.debug("stop() called in the middle of start(). Delaying...");
+                return;
+            }
+        }
+        if(stopped.compareAndSet(false,true)){
+            log.debug("Stopping connection: "+transport.getRemoteAddress());
+            connector.onStopped(this);
+            try{
+                if(masterBroker!=null){
+                    masterBroker.stop();
+                }
+                // If the transport has not failed yet,
+                // notify the peer that we are doing a normal shutdown.
+                if(transportException==null){
+                    transport.oneway(new ShutdownInfo());
+                }
+            }catch(Exception ignore){
+                // ignore.printStackTrace();
+            }
+            transport.stop();
+            active=false;
+            if(disposed.compareAndSet(false,true)){
+                if(taskRunner!=null)
+                    taskRunner.shutdown();
+                // Clear out the dispatch queue to release any memory that
+                // is being held on to.
+                dispatchQueue.clear();
+                //
+                // Remove all logical connection associated with this connection
+                // from the broker.
+                if(!broker.isStopped()){
+                    ArrayList l=new ArrayList(localConnectionStates.keySet());
+                    for(Iterator iter=l.iterator();iter.hasNext();){
+                        ConnectionId connectionId=(ConnectionId)iter.next();
+                        try{
+                            log.debug("Cleaning up connection resources.");
+                            processRemoveConnection(connectionId);
+                        }catch(Throwable ignore){
+                            ignore.printStackTrace();
+                        }
+                    }
+                    if(brokerInfo!=null){
+                        broker.removeBroker(this,brokerInfo);
+                    }
+                }
+                stopLatch.countDown();
+            }
+            log.debug("Stopped connection: "+transport.getRemoteAddress());
+        }
     }
 
-
     /**
      * @return Returns the blockedCandidate.
      */
-    public boolean isBlockedCandidate() {
+    public boolean isBlockedCandidate(){
         return blockedCandidate;
     }
 
     /**
      * @param blockedCandidate The blockedCandidate to set.
      */
-    public void setBlockedCandidate(boolean blockedCandidate) {
-        this.blockedCandidate = blockedCandidate;
+    public void setBlockedCandidate(boolean blockedCandidate){
+        this.blockedCandidate=blockedCandidate;
     }
 
     /**
      * @return Returns the markedCandidate.
      */
-    public boolean isMarkedCandidate() {
+    public boolean isMarkedCandidate(){
         return markedCandidate;
     }
 
     /**
      * @param markedCandidate The markedCandidate to set.
      */
-    public void setMarkedCandidate(boolean markedCandidate) {
-        this.markedCandidate = markedCandidate;
-        if (!markedCandidate) {
-            timeStamp = 0;
-            blockedCandidate = false;
+    public void setMarkedCandidate(boolean markedCandidate){
+        this.markedCandidate=markedCandidate;
+        if(!markedCandidate){
+            timeStamp=0;
+            blockedCandidate=false;
         }
     }
 
     /**
      * @param slow The slow to set.
      */
-    public void setSlow(boolean slow) {
-        this.slow = slow;
+    public void setSlow(boolean slow){
+        this.slow=slow;
     }
 
     /**
      * @return true if the Connection is slow
      */
-    public boolean isSlow() {
+    public boolean isSlow(){
         return slow;
     }
 
     /**
      * @return true if the Connection is potentially blocked
      */
-    public boolean isMarkedBlockedCandidate() {
+    public boolean isMarkedBlockedCandidate(){
         return markedCandidate;
     }
 
     /**
      * Mark the Connection, so we can deem if it's collectable on the next sweep
      */
-    public void doMark() {
-        if (timeStamp == 0) {
-            timeStamp = System.currentTimeMillis();
+    public void doMark(){
+        if(timeStamp==0){
+            timeStamp=System.currentTimeMillis();
         }
     }
 
     /**
      * @return if after being marked, the Connection is still writing
      */
-    public boolean isBlocked() {
+    public boolean isBlocked(){
         return blocked;
     }
 
     /**
      * @return true if the Connection is connected
      */
-    public boolean isConnected() {
+    public boolean isConnected(){
         return connected;
     }
 
     /**
      * @param blocked The blocked to set.
      */
-    public void setBlocked(boolean blocked) {
-        this.blocked = blocked;
+    public void setBlocked(boolean blocked){
+        this.blocked=blocked;
     }
 
     /**
      * @param connected The connected to set.
      */
-    public void setConnected(boolean connected) {
-        this.connected = connected;
+    public void setConnected(boolean connected){
+        this.connected=connected;
     }
 
     /**
      * @return true if the Connection is active
      */
-    public boolean isActive() {
+    public boolean isActive(){
         return active;
     }
 
     /**
      * @param active The active to set.
      */
-    public void setActive(boolean active) {
-        this.active = active;
+    public void setActive(boolean active){
+        this.active=active;
     }
 
     /**
      * @return true if the Connection is starting
      */
-    public synchronized boolean isStarting() {
+    public synchronized boolean isStarting(){
         return starting;
     }
 
-    synchronized protected void setStarting(boolean starting) {
-        this.starting = starting;
+    synchronized protected void setStarting(boolean starting){
+        this.starting=starting;
     }
 
     /**
      * @return true if the Connection needs to stop
      */
-    public synchronized boolean isPendingStop() {
+    public synchronized boolean isPendingStop(){
         return pendingStop;
     }
 
-    protected synchronized void setPendingStop(boolean pendingStop) {
-        this.pendingStop = pendingStop;
+    protected synchronized void setPendingStop(boolean pendingStop){
+        this.pendingStop=pendingStop;
     }
 
-    public Response processBrokerInfo(BrokerInfo info) {
-        if (info.isSlaveBroker()) {
-            //stream messages from this broker (the master) to 
-            //the slave
-            MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
-            masterBroker = new MasterBroker(parent, transport);
+    public Response processBrokerInfo(BrokerInfo info){
+        if(info.isSlaveBroker()){
+            // stream messages from this broker (the master) to
+            // the slave
+            MutableBrokerFilter parent=(MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
+            masterBroker=new MasterBroker(parent,transport);
             masterBroker.startProcessing();
-            log.info("Slave Broker " + info.getBrokerName() + " is attached");
+            log.info("Slave Broker "+info.getBrokerName()+" is attached");
         }
-        
         // We only expect to get one broker info command per connection
-        if( this.brokerInfo!=null ) {
+        if(this.brokerInfo!=null){
             log.warn("Unexpected extra broker info command received: "+info);
         }
-        
-        this.brokerInfo = info;
-        broker.addBroker(this, info);
+        this.brokerInfo=info;
+        broker.addBroker(this,info);
         return null;
     }
 
-    protected void dispatch(Command command) {
-        try {
+    protected void dispatch(Command command){
+        try{
             setMarkedCandidate(true);
             transport.oneway(command);
             getStatistics().onCommand(command);
-        }
-        catch (IOException e) {
+        }catch(IOException e){
             serviceExceptionAsync(e);
-        }
-        finally {
+        }finally{
             setMarkedCandidate(false);
         }
     }
 
-    public String getRemoteAddress() {
+    public String getRemoteAddress(){
         return transport.getRemoteAddress();
     }
 }