You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/29 21:19:25 UTC
svn commit: r491088 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Author: rajdavies
Date: Fri Dec 29 12:19:25 2006
New Revision: 491088
URL: http://svn.apache.org/viewvc?view=rev&rev=491088
Log:
put the synchronization back for now
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=491088&r1=491087&r2=491088
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Dec 29 12:19:25 2006
@@ -1,20 +1,17 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker;
import java.io.IOException;
@@ -24,7 +21,10 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -73,42 +73,32 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* @version $Revision: 1.8 $
*/
-public class TransportConnection implements Service, Connection, Task, CommandVisitor {
-
- private static final Log log = LogFactory.getLog(TransportConnection.class);
- private static final Log transportLog = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
- private static final Log serviceLog = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
-
+public class TransportConnection implements Service,Connection,Task,CommandVisitor{
+
+ private static final Log log=LogFactory.getLog(TransportConnection.class);
+ private static final Log transportLog=LogFactory.getLog(TransportConnection.class.getName()+".Transport");
+ private static final Log serviceLog=LogFactory.getLog(TransportConnection.class.getName()+".Service");
// Keeps track of the broker and connector that created this connection.
protected final Broker broker;
- private MasterBroker masterBroker;
+ private MasterBroker masterBroker;
protected final TransportConnector connector;
- private final Transport transport;
+ private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
-
// Keeps track of the state of the connections.
- protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
+ protected final ConcurrentHashMap localConnectionStates=new ConcurrentHashMap();
protected final Map brokerConnectionStates;
-
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
- private WireFormatInfo wireFormatInfo;
-
- // Used to do async dispatch.. this should perhaps be pushed down into the transport layer..
- protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
+ private WireFormatInfo wireFormatInfo;
+ // Used to do async dispatch.. this should perhaps be pushed down into the transport layer..
+ protected final List dispatchQueue=Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner;
- protected IOException transportException;
+ protected IOException transportException;
private boolean inServiceException=false;
-
- private ConnectionStatistics statistics = new ConnectionStatistics();
+ private ConnectionStatistics statistics=new ConnectionStatistics();
private boolean manageable;
private boolean slow;
private boolean markedCandidate;
@@ -118,382 +108,357 @@
private boolean active;
private boolean starting;
private boolean pendingStop;
- private long timeStamp = 0;
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
+ private long timeStamp=0;
+ private AtomicBoolean stopped=new AtomicBoolean(false);
protected final AtomicBoolean disposed=new AtomicBoolean(false);
- private CountDownLatch stopLatch = new CountDownLatch(1);
- protected final AtomicBoolean asyncException = new AtomicBoolean(false);
-
- static class ConnectionState extends org.apache.activemq.state.ConnectionState {
+ private CountDownLatch stopLatch=new CountDownLatch(1);
+ protected final AtomicBoolean asyncException=new AtomicBoolean(false);
+
+ static class ConnectionState extends org.apache.activemq.state.ConnectionState{
+
private final ConnectionContext context;
TransportConnection connection;
- public ConnectionState(ConnectionInfo info, ConnectionContext context, TransportConnection connection) {
+ public ConnectionState(ConnectionInfo info,ConnectionContext context,TransportConnection connection){
super(info);
- this.context = context;
+ this.context=context;
this.connection=connection;
}
-
- public ConnectionContext getContext() {
+
+ public ConnectionContext getContext(){
return context;
}
-
- public TransportConnection getConnection() {
+
+ public TransportConnection getConnection(){
return connection;
}
-
}
-
+
/**
* @param connector
* @param transport
* @param broker
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport else commands are sent async.
*/
- public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory) {
-
- this.connector = connector;
- this.broker = broker;
-
- RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
- brokerConnectionStates = rb.getConnectionStates();
-
- if (connector != null) {
+ public TransportConnection(TransportConnector connector,final Transport transport,Broker broker,
+ TaskRunnerFactory taskRunnerFactory){
+ this.connector=connector;
+ this.broker=broker;
+ RegionBroker rb=(RegionBroker)broker.getAdaptor(RegionBroker.class);
+ brokerConnectionStates=rb.getConnectionStates();
+ if(connector!=null){
this.statistics.setParent(connector.getStatistics());
}
-
- if( taskRunnerFactory != null ) {
- taskRunner = taskRunnerFactory.createTaskRunner( this, "ActiveMQ Connection Dispatcher: "+System.identityHashCode(this) );
- }
- else {
- taskRunner = null;
- }
-
+ if(taskRunnerFactory!=null){
+ taskRunner=taskRunnerFactory.createTaskRunner(this,"ActiveMQ Connection Dispatcher: "
+ +System.identityHashCode(this));
+ }else{
+ taskRunner=null;
+ }
connector.setBrokerName(broker.getBrokerName());
- this.transport = transport;
- this.transport.setTransportListener(new DefaultTransportListener() {
- public void onCommand(Object o) {
- Command command = (Command) o;
- Response response = service(command);
- if (response != null) {
+ this.transport=transport;
+ this.transport.setTransportListener(new DefaultTransportListener(){
+
+ public void onCommand(Object o){
+ Command command=(Command)o;
+ Response response=service(command);
+ if(response!=null){
dispatch(response);
}
}
- public void onException(IOException exception) {
+ public void onException(IOException exception){
serviceTransportException(exception);
}
});
- connected = true;
+ connected=true;
}
-
-
-
-
-
/**
* Returns the number of messages to be dispatched to this connection
*/
- public int getDispatchQueueSize() {
+ public int getDispatchQueueSize(){
return dispatchQueue.size();
}
-
- public void serviceTransportException(IOException e) {
- if( !disposed.get() ) {
- transportException = e;
- if( transportLog.isDebugEnabled() )
+ public void serviceTransportException(IOException e){
+ if(!disposed.get()){
+ transportException=e;
+ if(transportLog.isDebugEnabled())
transportLog.debug("Transport failed: "+e,e);
ServiceSupport.dispose(this);
}
}
-
+
/**
- * Calls the serviceException method in an async thread. Since
- * handling a service exception closes a socket, we should not tie
- * up broker threads since client sockets may hang or cause deadlocks.
+ * Calls the serviceException method in an async thread. Since handling a service exception closes a socket, we
+ * should not tie up broker threads since client sockets may hang or cause deadlocks.
*
* @param e
*/
- public void serviceExceptionAsync(final IOException e) {
- if( asyncException.compareAndSet(false, true) ) {
- new Thread("Async Exception Handler") {
- public void run() {
- serviceException(e);
- }
- }.start();
- }
- }
-
- /**
- * Closes a clients connection due to a detected error.
- *
- * Errors are ignored if: the client is closing or broker is closing.
- * Otherwise, the connection error transmitted to the client before stopping it's
- * transport.
- */
- public void serviceException(Throwable e) {
+ public void serviceExceptionAsync(final IOException e){
+ if(asyncException.compareAndSet(false,true)){
+ new Thread("Async Exception Handler"){
+
+ public void run(){
+ serviceException(e);
+ }
+ }.start();
+ }
+ }
+
+ /**
+ * Closes a clients connection due to a detected error.
+ *
+ * Errors are ignored if: the client is closing or broker is closing. Otherwise, the connection error transmitted to
+ * the client before stopping it's transport.
+ */
+ public void serviceException(Throwable e){
// are we a transport exception such as not being able to dispatch
// synchronously to a transport
- if (e instanceof IOException) {
- serviceTransportException((IOException) e);
+ if(e instanceof IOException){
+ serviceTransportException((IOException)e);
}
-
- // Handle the case where the broker is stopped
+ // Handle the case where the broker is stopped
// But the client is still connected.
- else if (e.getClass() == BrokerStoppedException.class ) {
- if( !disposed.get() ) {
- if( serviceLog.isDebugEnabled() )
+ else if(e.getClass()==BrokerStoppedException.class){
+ if(!disposed.get()){
+ if(serviceLog.isDebugEnabled())
serviceLog.debug("Broker has been stopped. Notifying client and closing his connection.");
-
- ConnectionError ce = new ConnectionError();
+ ConnectionError ce=new ConnectionError();
ce.setException(e);
dispatchSync(ce);
-
// Wait a little bit to try to get the output buffer to flush the exption notification to the client.
- try {
+ try{
Thread.sleep(500);
- } catch (InterruptedException ie) {
+ }catch(InterruptedException ie){
Thread.currentThread().interrupt();
}
-
// Worst case is we just kill the connection before the notification gets to him.
ServiceSupport.dispose(this);
}
- }
-
- else if( !disposed.get() && !inServiceException ) {
- inServiceException = true;
- try {
-
+ }else if(!disposed.get()&&!inServiceException){
+ inServiceException=true;
+ try{
serviceLog.error("Async error occurred: "+e,e);
- ConnectionError ce = new ConnectionError();
+ ConnectionError ce=new ConnectionError();
ce.setException(e);
dispatchAsync(ce);
- } finally {
- inServiceException = false;
+ }finally{
+ inServiceException=false;
}
- }
+ }
}
- public Response service(Command command) {
-
+ public Response service(Command command){
Response response=null;
- boolean responseRequired = command.isResponseRequired();
- int commandId = command.getCommandId();
- try {
- response = command.visit(this);
- } catch ( Throwable e ) {
- if( responseRequired ) {
- if( serviceLog.isDebugEnabled() && e.getClass()!=BrokerStoppedException.class )
+ boolean responseRequired=command.isResponseRequired();
+ int commandId=command.getCommandId();
+ try{
+ response=command.visit(this);
+ }catch(Throwable e){
+ if(responseRequired){
+ if(serviceLog.isDebugEnabled()&&e.getClass()!=BrokerStoppedException.class)
serviceLog.debug("Error occured while processing sync command: "+e,e);
- response = new ExceptionResponse(e);
- } else {
+ response=new ExceptionResponse(e);
+ }else{
serviceException(e);
}
- }
- if( responseRequired ) {
- if( response == null ) {
- response = new Response();
+ }
+ if(responseRequired){
+ if(response==null){
+ response=new Response();
}
response.setCorrelationId(commandId);
}
return response;
-
}
-
- protected ConnectionState lookupConnectionState(ConsumerId id) {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
- if( cs== null )
- throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "+id.getParentId().getParentId());
+
+ protected ConnectionState lookupConnectionState(ConsumerId id){
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: "
+ +id.getParentId().getParentId());
return cs;
}
- protected ConnectionState lookupConnectionState(ProducerId id) {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId().getParentId());
- if( cs== null )
- throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "+id.getParentId().getParentId());
+
+ protected ConnectionState lookupConnectionState(ProducerId id){
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId().getParentId());
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: "
+ +id.getParentId().getParentId());
return cs;
}
- protected ConnectionState lookupConnectionState(SessionId id) {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(id.getParentId());
- if( cs== null )
- throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "+id.getParentId());
+
+ protected ConnectionState lookupConnectionState(SessionId id){
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(id.getParentId());
+ if(cs==null)
+ throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: "
+ +id.getParentId());
return cs;
}
- protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(connectionId);
- if( cs== null )
+
+ protected ConnectionState lookupConnectionState(ConnectionId connectionId){
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(connectionId);
+ if(cs==null)
throw new IllegalStateException("Cannot lookup a connection that had not been registered: "+connectionId);
return cs;
}
- public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+ public Response processKeepAlive(KeepAliveInfo info) throws Exception{
return null;
}
- public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
- broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
+ public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception{
+ broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(),info);
return null;
}
-
- public Response processWireFormat(WireFormatInfo info) throws Exception {
- wireFormatInfo = info;
+
+ public Response processWireFormat(WireFormatInfo info) throws Exception{
+ wireFormatInfo=info;
return null;
}
-
- public Response processShutdown(ShutdownInfo info) throws Exception {
+
+ public Response processShutdown(ShutdownInfo info) throws Exception{
stop();
return null;
}
-
- public Response processFlush(FlushCommand command) throws Exception {
+
+ public Response processFlush(FlushCommand command) throws Exception{
return null;
}
- public Response processBeginTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+ synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
-
// Avoid replaying dup commands
- if( cs.getTransactionState(info.getTransactionId())==null ) {
+ if(cs.getTransactionState(info.getTransactionId())==null){
cs.addTransactionState(info.getTransactionId());
- broker.beginTransaction(context, info.getTransactionId());
+ broker.beginTransaction(context,info.getTransactionId());
}
return null;
}
-
- public Response processEndTransaction(TransactionInfo info) throws Exception {
- // No need to do anything. This packet is just sent by the client
+
+ synchronized public Response processEndTransaction(TransactionInfo info) throws Exception{
+ // No need to do anything. This packet is just sent by the client
// make sure he is synced with the server as commit command could
// come from a different connection.
return null;
}
-
- public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+
+ synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
-
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- if( transactionState == null )
- throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
-
+ TransactionState transactionState=cs.getTransactionState(info.getTransactionId());
+ if(transactionState==null)
+ throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
+ +info.getTransactionId());
// Avoid dups.
- if( !transactionState.isPrepared() ) {
+ if(!transactionState.isPrepared()){
transactionState.setPrepared(true);
- int result = broker.prepareTransaction(context, info.getTransactionId());
+ int result=broker.prepareTransaction(context,info.getTransactionId());
transactionState.setPreparedResult(result);
- IntegerResponse response = new IntegerResponse(result);
+ IntegerResponse response=new IntegerResponse(result);
return response;
- } else {
- IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
+ }else{
+ IntegerResponse response=new IntegerResponse(transactionState.getPreparedResult());
return response;
}
}
- public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+ synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
-
cs.removeTransactionState(info.getTransactionId());
- broker.commitTransaction(context, info.getTransactionId(), true);
-
+ broker.commitTransaction(context,info.getTransactionId(),true);
return null;
-
}
- public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+ synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
-
cs.removeTransactionState(info.getTransactionId());
- broker.commitTransaction(context, info.getTransactionId(), false);
+ broker.commitTransaction(context,info.getTransactionId(),false);
return null;
}
- public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+ synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
-
cs.removeTransactionState(info.getTransactionId());
- broker.rollbackTransaction(context, info.getTransactionId());
+ broker.rollbackTransaction(context,info.getTransactionId());
return null;
}
-
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+
+ synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
- broker.forgetTransaction(context, info.getTransactionId());
+ broker.forgetTransaction(context,info.getTransactionId());
return null;
}
-
- public Response processRecoverTransactions(TransactionInfo info) throws Exception {
- ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
+
+ synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception{
+ ConnectionState cs=(ConnectionState)localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null;
- if( cs!=null ) {
- context = cs.getContext();
+ if(cs!=null){
+ context=cs.getContext();
}
- TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
+ TransactionId[] preparedTransactions=broker.getPreparedTransactions(context);
return new DataArrayResponse(preparedTransactions);
}
-
- public Response processMessage(Message messageSend) throws Exception {
-
- ProducerId producerId = messageSend.getProducerId();
- ConnectionState state = lookupConnectionState(producerId);
- ConnectionContext context = state.getContext();
-
- // If the message originates from this client connection,
+ public Response processMessage(Message messageSend) throws Exception{
+ ProducerId producerId=messageSend.getProducerId();
+ ConnectionState state=lookupConnectionState(producerId);
+ ConnectionContext context=state.getContext();
+ // If the message originates from this client connection,
// then, finde the associated producer state so we can do some dup detection.
- ProducerState producerState=null;
- if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
- SessionState ss = state.getSessionState(producerId.getParentId());
- if( ss == null )
- throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
- producerState = ss.getProducerState(producerId);
- }
-
- if( producerState == null ) {
- broker.send(context, messageSend);
- } else {
+ ProducerState producerState=null;
+ if(messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())){
+ SessionState ss=state.getSessionState(producerId.getParentId());
+ if(ss==null)
+ throw new IllegalStateException("Cannot send from a session that had not been registered: "
+ +producerId.getParentId());
+ producerState=ss.getProducerState(producerId);
+ }
+ if(producerState==null){
+ broker.send(context,messageSend);
+ }else{
// Avoid Dups.
- long seq = messageSend.getMessageId().getProducerSequenceId();
- if( seq > producerState.getLastSequenceId() ) {
+ long seq=messageSend.getMessageId().getProducerSequenceId();
+ if(seq>producerState.getLastSequenceId()){
producerState.setLastSequenceId(seq);
- broker.send(context, messageSend);
+ broker.send(context,messageSend);
}
}
-
return null;
}
- public Response processMessageAck(MessageAck ack) throws Exception {
- broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(), ack);
+ public Response processMessageAck(MessageAck ack) throws Exception{
+ broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(),ack);
return null;
}
-
- public Response processMessagePull(MessagePull pull) throws Exception {
- return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
+
+ public Response processMessagePull(MessagePull pull) throws Exception{
+ return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),pull);
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
@@ -501,177 +466,157 @@
return null;
}
- public Response processAddDestination(DestinationInfo info) throws Exception {
- ConnectionState cs = lookupConnectionState(info.getConnectionId());
- broker.addDestinationInfo(cs.getContext(), info);
- if( info.getDestination().isTemporary() ) {
+ synchronized public Response processAddDestination(DestinationInfo info) throws Exception{
+ ConnectionState cs=lookupConnectionState(info.getConnectionId());
+ broker.addDestinationInfo(cs.getContext(),info);
+ if(info.getDestination().isTemporary()){
cs.addTempDestination(info);
}
return null;
}
- public Response processRemoveDestination(DestinationInfo info) throws Exception {
- ConnectionState cs = lookupConnectionState(info.getConnectionId());
- broker.removeDestinationInfo(cs.getContext(), info);
- if( info.getDestination().isTemporary() ) {
+ synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception{
+ ConnectionState cs=lookupConnectionState(info.getConnectionId());
+ broker.removeDestinationInfo(cs.getContext(),info);
+ if(info.getDestination().isTemporary()){
cs.removeTempDestination(info.getDestination());
}
return null;
}
-
- public Response processAddProducer(ProducerInfo info) throws Exception {
- SessionId sessionId = info.getProducerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
-
- ConnectionState cs = lookupConnectionState(connectionId);
- SessionState ss = cs.getSessionState(sessionId);
- if( ss == null )
- throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "+sessionId);
-
+ synchronized public Response processAddProducer(ProducerInfo info) throws Exception{
+ SessionId sessionId=info.getProducerId().getParentId();
+ ConnectionId connectionId=sessionId.getParentId();
+ ConnectionState cs=lookupConnectionState(connectionId);
+ SessionState ss=cs.getSessionState(sessionId);
+ if(ss==null)
+ throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
+ +sessionId);
// Avoid replaying dup commands
- if( !ss.getProducerIds().contains(info.getProducerId()) ) {
- broker.addProducer(cs.getContext(), info);
- try {
+ if(!ss.getProducerIds().contains(info.getProducerId())){
+ broker.addProducer(cs.getContext(),info);
+ try{
ss.addProducer(info);
- } catch (IllegalStateException e) {
- broker.removeProducer(cs.getContext(), info);
+ }catch(IllegalStateException e){
+ broker.removeProducer(cs.getContext(),info);
}
}
return null;
}
-
- public Response processRemoveProducer(ProducerId id) throws Exception {
- SessionId sessionId = id.getParentId();
- ConnectionId connectionId = sessionId.getParentId();
-
- ConnectionState cs = lookupConnectionState(connectionId);
- SessionState ss = cs.getSessionState(sessionId);
- if( ss == null )
- throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "+sessionId);
- ProducerState ps = ss.removeProducer(id);
- if( ps == null )
+
+ synchronized public Response processRemoveProducer(ProducerId id) throws Exception{
+ SessionId sessionId=id.getParentId();
+ ConnectionId connectionId=sessionId.getParentId();
+ ConnectionState cs=lookupConnectionState(connectionId);
+ SessionState ss=cs.getSessionState(sessionId);
+ if(ss==null)
+ throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
+ +sessionId);
+ ProducerState ps=ss.removeProducer(id);
+ if(ps==null)
throw new IllegalStateException("Cannot remove a producer that had not been registered: "+id);
-
- broker.removeProducer(cs.getContext(), ps.getInfo());
+ broker.removeProducer(cs.getContext(),ps.getInfo());
return null;
}
- public Response processAddConsumer(ConsumerInfo info) throws Exception {
- SessionId sessionId = info.getConsumerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
-
- ConnectionState cs = lookupConnectionState(connectionId);
- SessionState ss = cs.getSessionState(sessionId);
- if( ss == null )
- throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "+sessionId);
-
+ synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception{
+ SessionId sessionId=info.getConsumerId().getParentId();
+ ConnectionId connectionId=sessionId.getParentId();
+ ConnectionState cs=lookupConnectionState(connectionId);
+ SessionState ss=cs.getSessionState(sessionId);
+ if(ss==null)
+ throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: "
+ +sessionId);
// Avoid replaying dup commands
- if( !ss.getConsumerIds().contains(info.getConsumerId()) ) {
- broker.addConsumer(cs.getContext(), info);
- try {
+ if(!ss.getConsumerIds().contains(info.getConsumerId())){
+ broker.addConsumer(cs.getContext(),info);
+ try{
ss.addConsumer(info);
- } catch (IllegalStateException e) {
- broker.removeConsumer(cs.getContext(), info);
+ }catch(IllegalStateException e){
+ broker.removeConsumer(cs.getContext(),info);
}
}
-
return null;
}
-
- public Response processRemoveConsumer(ConsumerId id) throws Exception {
-
- SessionId sessionId = id.getParentId();
- ConnectionId connectionId = sessionId.getParentId();
-
- ConnectionState cs = lookupConnectionState(connectionId);
- SessionState ss = cs.getSessionState(sessionId);
- if( ss == null )
- throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "+sessionId);
- ConsumerState consumerState = ss.removeConsumer(id);
- if( consumerState == null )
+
+ synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception{
+ SessionId sessionId=id.getParentId();
+ ConnectionId connectionId=sessionId.getParentId();
+ ConnectionState cs=lookupConnectionState(connectionId);
+ SessionState ss=cs.getSessionState(sessionId);
+ if(ss==null)
+ throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
+ +sessionId);
+ ConsumerState consumerState=ss.removeConsumer(id);
+ if(consumerState==null)
throw new IllegalStateException("Cannot remove a consumer that had not been registered: "+id);
-
- broker.removeConsumer(cs.getContext(), consumerState.getInfo());
+ broker.removeConsumer(cs.getContext(),consumerState.getInfo());
return null;
}
-
- public Response processAddSession(SessionInfo info) throws Exception {
- ConnectionId connectionId = info.getSessionId().getParentId();
- ConnectionState cs = lookupConnectionState(connectionId);
-
+
+ synchronized public Response processAddSession(SessionInfo info) throws Exception{
+ ConnectionId connectionId=info.getSessionId().getParentId();
+ ConnectionState cs=lookupConnectionState(connectionId);
// Avoid replaying dup commands
- if( !cs.getSessionIds().contains(info.getSessionId()) ) {
- broker.addSession(cs.getContext(), info);
- try {
+ if(!cs.getSessionIds().contains(info.getSessionId())){
+ broker.addSession(cs.getContext(),info);
+ try{
cs.addSession(info);
- } catch (IllegalStateException e) {
- broker.removeSession(cs.getContext(), info);
+ }catch(IllegalStateException e){
+ broker.removeSession(cs.getContext(),info);
}
}
return null;
}
-
- public Response processRemoveSession(SessionId id) throws Exception {
-
- ConnectionId connectionId = id.getParentId();
-
- ConnectionState cs = lookupConnectionState(connectionId);
- SessionState session = cs.getSessionState(id);
- if( session == null )
+ synchronized public Response processRemoveSession(SessionId id) throws Exception{
+ ConnectionId connectionId=id.getParentId();
+ ConnectionState cs=lookupConnectionState(connectionId);
+ SessionState session=cs.getSessionState(id);
+ if(session==null)
throw new IllegalStateException("Cannot remove session that had not been registered: "+id);
-
// Don't let new consumers or producers get added while we are closing this down.
session.shutdown();
-
// Cascade the connection stop to the consumers and producers.
- for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
- ConsumerId consumerId = (ConsumerId) iter.next();
- try {
+ for(Iterator iter=session.getConsumerIds().iterator();iter.hasNext();){
+ ConsumerId consumerId=(ConsumerId)iter.next();
+ try{
processRemoveConsumer(consumerId);
- }
- catch (Throwable e) {
- log.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
+ }catch(Throwable e){
+ log.warn("Failed to remove consumer: "+consumerId+". Reason: "+e,e);
}
}
- for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
- ProducerId producerId = (ProducerId) iter.next();
- try {
+ for(Iterator iter=session.getProducerIds().iterator();iter.hasNext();){
+ ProducerId producerId=(ProducerId)iter.next();
+ try{
processRemoveProducer(producerId);
- }
- catch (Throwable e) {
- log.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
+ }catch(Throwable e){
+ log.warn("Failed to remove producer: "+producerId+". Reason: "+e,e);
}
}
cs.removeSession(id);
- broker.removeSession(cs.getContext(), session.getInfo());
+ broker.removeSession(cs.getContext(),session.getInfo());
return null;
}
-
- public Response processAddConnection(ConnectionInfo info) throws Exception {
-
- ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
-
- if( state !=null ) {
- // ConnectionInfo replay?? Chances are that it's a client reconnecting,
- // and we have not detected that that old connection died.. Kill the old connection
- // to make sure our state is in sync with the client.
- if( this != state.getConnection() ) {
- log.debug("Killing previous stale connection: "+state.getConnection());
- state.getConnection().stop();
- if( !state.getConnection().stopLatch.await(15, TimeUnit.SECONDS) ) {
- throw new Exception("Previous connection could not be clean up.");
- }
- }
- }
-
- log.debug("Setting up new connection: "+this);
-
+ synchronized public Response processAddConnection(ConnectionInfo info) throws Exception{
+ ConnectionState state=(ConnectionState)brokerConnectionStates.get(info.getConnectionId());
+ if(state!=null){
+ // ConnectionInfo replay?? Chances are that it's a client reconnecting,
+ // and we have not detected that that old connection died.. Kill the old connection
+ // to make sure our state is in sync with the client.
+ if(this!=state.getConnection()){
+ log.debug("Killing previous stale connection: "+state.getConnection());
+ state.getConnection().stop();
+ if(!state.getConnection().stopLatch.await(15,TimeUnit.SECONDS)){
+ throw new Exception("Previous connection could not be clean up.");
+ }
+ }
+ }
+ log.debug("Setting up new connection: "+this);
// Setup the context.
- String clientId = info.getClientId();
- ConnectionContext context = new ConnectionContext();
+ String clientId=info.getClientId();
+ ConnectionContext context=new ConnectionContext();
context.setConnection(this);
context.setBroker(broker);
context.setConnector(connector);
@@ -681,95 +626,84 @@
context.setConnectionId(info.getConnectionId());
context.setWireFormatInfo(wireFormatInfo);
context.incrementReference();
- this.manageable = info.isManageable();
-
- state = new ConnectionState(info, context, this);
- brokerConnectionStates.put(info.getConnectionId(), state);
- localConnectionStates.put(info.getConnectionId(), state);
-
- broker.addConnection(context, info);
- if (info.isManageable() && broker.isFaultTolerantConfiguration()){
- //send ConnectionCommand
- ConnectionControl command = new ConnectionControl();
+ this.manageable=info.isManageable();
+ state=new ConnectionState(info,context,this);
+ brokerConnectionStates.put(info.getConnectionId(),state);
+ localConnectionStates.put(info.getConnectionId(),state);
+ broker.addConnection(context,info);
+ if(info.isManageable()&&broker.isFaultTolerantConfiguration()){
+ // send ConnectionCommand
+ ConnectionControl command=new ConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
-
return null;
}
-
- public Response processRemoveConnection(ConnectionId id) {
-
- ConnectionState cs = lookupConnectionState(id);
-
+
+ synchronized public Response processRemoveConnection(ConnectionId id){
+ ConnectionState cs=lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are shutting down.
cs.shutdown();
-
// Cascade the connection stop to the sessions.
- for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
-
- SessionId sessionId = (SessionId) iter.next();
- try{
+ for(Iterator iter=cs.getSessionIds().iterator();iter.hasNext();){
+ SessionId sessionId=(SessionId)iter.next();
+ try{
processRemoveSession(sessionId);
}catch(Throwable e){
- serviceLog.warn("Failed to remove session " + sessionId,e);
+ serviceLog.warn("Failed to remove session "+sessionId,e);
}
}
-
// Cascade the connection stop to temp destinations.
- for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
- DestinationInfo di = (DestinationInfo) iter.next();
+ for(Iterator iter=cs.getTempDesinations().iterator();iter.hasNext();){
+ DestinationInfo di=(DestinationInfo)iter.next();
try{
- broker.removeDestination(cs.getContext(), di.getDestination(), 0);
+ broker.removeDestination(cs.getContext(),di.getDestination(),0);
}catch(Throwable e){
- serviceLog.warn("Failed to remove tmp destination " + di.getDestination(), e);
+ serviceLog.warn("Failed to remove tmp destination "+di.getDestination(),e);
}
iter.remove();
}
-
try{
- broker.removeConnection(cs.getContext(), cs.getInfo(), null);
+ broker.removeConnection(cs.getContext(),cs.getInfo(),null);
}catch(Throwable e){
- serviceLog.warn("Failed to remove connection " + cs.getInfo(),e);
+ serviceLog.warn("Failed to remove connection "+cs.getInfo(),e);
}
- ConnectionState state = (ConnectionState) localConnectionStates.remove(id);
- if( state != null ) {
+ ConnectionState state=(ConnectionState)localConnectionStates.remove(id);
+ if(state!=null){
// If we are the last reference, we should remove the state
// from the broker.
- if( state.getContext().decrementReference() == 0 ){
+ if(state.getContext().decrementReference()==0){
brokerConnectionStates.remove(id);
}
}
return null;
}
-
- public Connector getConnector() {
+ public Connector getConnector(){
return connector;
}
- public void dispatchSync(Command message) {
+ public void dispatchSync(Command message){
processDispatch(message);
}
-
-
- public void dispatchAsync(Command message) {
- if( taskRunner==null ) {
- dispatchSync( message );
- } else {
+
+ public void dispatchAsync(Command message){
+ if(taskRunner==null){
+ dispatchSync(message);
+ }else{
dispatchQueue.add(message);
- try {
+ try{
taskRunner.wakeup();
- } catch (InterruptedException e) {
+ }catch(InterruptedException e){
Thread.currentThread().interrupt();
}
- }
+ }
}
-
+
protected void processDispatch(Command command){
if(command.isMessageDispatch()){
- MessageDispatch md=(MessageDispatch) command;
- Runnable sub=(Runnable) md.getConsumer();
+ MessageDispatch md=(MessageDispatch)command;
+ Runnable sub=(Runnable)md.getConsumer();
broker.processDispatch(md);
try{
dispatch(command);
@@ -781,288 +715,270 @@
}else{
dispatch(command);
}
- }
-
- public boolean iterate() {
- if( dispatchQueue.isEmpty() || broker.isStopped()) {
+ }
+
+ public boolean iterate(){
+ if(dispatchQueue.isEmpty()||broker.isStopped()){
return false;
- } else {
- Command command = (Command) dispatchQueue.remove(0);
- processDispatch( command );
+ }else{
+ Command command=(Command)dispatchQueue.remove(0);
+ processDispatch(command);
return true;
}
- }
-
+ }
+
/**
* Returns the statistics for this connection
*/
- public ConnectionStatistics getStatistics() {
+ public ConnectionStatistics getStatistics(){
return statistics;
}
- public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
+ public MessageAuthorizationPolicy getMessageAuthorizationPolicy(){
return messageAuthorizationPolicy;
}
- public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
- this.messageAuthorizationPolicy = messageAuthorizationPolicy;
+ public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy){
+ this.messageAuthorizationPolicy=messageAuthorizationPolicy;
}
-
+
public boolean isManageable(){
return manageable;
}
-
-
- public synchronized void start() throws Exception {
- starting = true;
- try {
+
+ public synchronized void start() throws Exception{
+ starting=true;
+ try{
transport.start();
- active = true;
+ active=true;
this.processDispatch(connector.getBrokerInfo());
connector.onStarted(this);
- }
- finally {
+ }finally{
// stop() can be called from within the above block,
// but we want to be sure start() completes before
// stop() runs, so queue the stop until right now:
- starting = false;
- if (pendingStop) {
+ starting=false;
+ if(pendingStop){
log.debug("Calling the delayed stop()");
stop();
}
}
}
- public void stop() throws Exception {
+ public void stop() throws Exception{
// If we're in the middle of starting
// then go no further... for now.
- synchronized(this) {
- pendingStop = true;
- if (starting) {
- log.debug("stop() called in the middle of start(). Delaying...");
- return;
- }
- }
-
-
- if( stopped.compareAndSet(false, true) ) {
-
- log.debug("Stopping connection: "+transport.getRemoteAddress());
- connector.onStopped(this);
- try {
- if (masterBroker != null){
- masterBroker.stop();
- }
-
- // If the transport has not failed yet,
- // notify the peer that we are doing a normal shutdown.
- if( transportException == null ) {
- transport.oneway(new ShutdownInfo());
- }
- } catch (Exception ignore) {
- //ignore.printStackTrace();
- }
-
- transport.stop();
- active = false;
-
- if(disposed.compareAndSet(false, true)) {
-
- if( taskRunner!=null )
- taskRunner.shutdown();
-
- // Clear out the dispatch queue to release any memory that
- // is being held on to.
- dispatchQueue.clear();
-
- //
- // Remove all logical connection associated with this connection
- // from the broker.
- if(!broker.isStopped()){
- ArrayList l=new ArrayList(localConnectionStates.keySet());
- for(Iterator iter=l.iterator();iter.hasNext();){
- ConnectionId connectionId=(ConnectionId) iter.next();
- try{
- log.debug("Cleaning up connection resources.");
- processRemoveConnection(connectionId);
- }catch(Throwable ignore){
- ignore.printStackTrace();
- }
- }
- if(brokerInfo!=null){
- broker.removeBroker(this,brokerInfo);
- }
- }
- stopLatch.countDown();
- }
-
-
- log.debug("Stopped connection: "+transport.getRemoteAddress());
- }
+ synchronized(this){
+ pendingStop=true;
+ if(starting){
+ log.debug("stop() called in the middle of start(). Delaying...");
+ return;
+ }
+ }
+ if(stopped.compareAndSet(false,true)){
+ log.debug("Stopping connection: "+transport.getRemoteAddress());
+ connector.onStopped(this);
+ try{
+ if(masterBroker!=null){
+ masterBroker.stop();
+ }
+ // If the transport has not failed yet,
+ // notify the peer that we are doing a normal shutdown.
+ if(transportException==null){
+ transport.oneway(new ShutdownInfo());
+ }
+ }catch(Exception ignore){
+ // ignore.printStackTrace();
+ }
+ transport.stop();
+ active=false;
+ if(disposed.compareAndSet(false,true)){
+ if(taskRunner!=null)
+ taskRunner.shutdown();
+ // Clear out the dispatch queue to release any memory that
+ // is being held on to.
+ dispatchQueue.clear();
+ //
+ // Remove all logical connection associated with this connection
+ // from the broker.
+ if(!broker.isStopped()){
+ ArrayList l=new ArrayList(localConnectionStates.keySet());
+ for(Iterator iter=l.iterator();iter.hasNext();){
+ ConnectionId connectionId=(ConnectionId)iter.next();
+ try{
+ log.debug("Cleaning up connection resources.");
+ processRemoveConnection(connectionId);
+ }catch(Throwable ignore){
+ ignore.printStackTrace();
+ }
+ }
+ if(brokerInfo!=null){
+ broker.removeBroker(this,brokerInfo);
+ }
+ }
+ stopLatch.countDown();
+ }
+ log.debug("Stopped connection: "+transport.getRemoteAddress());
+ }
}
-
/**
* @return Returns the blockedCandidate.
*/
- public boolean isBlockedCandidate() {
+ public boolean isBlockedCandidate(){
return blockedCandidate;
}
/**
* @param blockedCandidate The blockedCandidate to set.
*/
- public void setBlockedCandidate(boolean blockedCandidate) {
- this.blockedCandidate = blockedCandidate;
+ public void setBlockedCandidate(boolean blockedCandidate){
+ this.blockedCandidate=blockedCandidate;
}
/**
* @return Returns the markedCandidate.
*/
- public boolean isMarkedCandidate() {
+ public boolean isMarkedCandidate(){
return markedCandidate;
}
/**
* @param markedCandidate The markedCandidate to set.
*/
- public void setMarkedCandidate(boolean markedCandidate) {
- this.markedCandidate = markedCandidate;
- if (!markedCandidate) {
- timeStamp = 0;
- blockedCandidate = false;
+ public void setMarkedCandidate(boolean markedCandidate){
+ this.markedCandidate=markedCandidate;
+ if(!markedCandidate){
+ timeStamp=0;
+ blockedCandidate=false;
}
}
/**
* @param slow The slow to set.
*/
- public void setSlow(boolean slow) {
- this.slow = slow;
+ public void setSlow(boolean slow){
+ this.slow=slow;
}
/**
* @return true if the Connection is slow
*/
- public boolean isSlow() {
+ public boolean isSlow(){
return slow;
}
/**
* @return true if the Connection is potentially blocked
*/
- public boolean isMarkedBlockedCandidate() {
+ public boolean isMarkedBlockedCandidate(){
return markedCandidate;
}
/**
* Mark the Connection, so we can deem if it's collectable on the next sweep
*/
- public void doMark() {
- if (timeStamp == 0) {
- timeStamp = System.currentTimeMillis();
+ public void doMark(){
+ if(timeStamp==0){
+ timeStamp=System.currentTimeMillis();
}
}
/**
* @return if after being marked, the Connection is still writing
*/
- public boolean isBlocked() {
+ public boolean isBlocked(){
return blocked;
}
/**
* @return true if the Connection is connected
*/
- public boolean isConnected() {
+ public boolean isConnected(){
return connected;
}
/**
* @param blocked The blocked to set.
*/
- public void setBlocked(boolean blocked) {
- this.blocked = blocked;
+ public void setBlocked(boolean blocked){
+ this.blocked=blocked;
}
/**
* @param connected The connected to set.
*/
- public void setConnected(boolean connected) {
- this.connected = connected;
+ public void setConnected(boolean connected){
+ this.connected=connected;
}
/**
* @return true if the Connection is active
*/
- public boolean isActive() {
+ public boolean isActive(){
return active;
}
/**
* @param active The active to set.
*/
- public void setActive(boolean active) {
- this.active = active;
+ public void setActive(boolean active){
+ this.active=active;
}
/**
* @return true if the Connection is starting
*/
- public synchronized boolean isStarting() {
+ public synchronized boolean isStarting(){
return starting;
}
- synchronized protected void setStarting(boolean starting) {
- this.starting = starting;
+ synchronized protected void setStarting(boolean starting){
+ this.starting=starting;
}
/**
* @return true if the Connection needs to stop
*/
- public synchronized boolean isPendingStop() {
+ public synchronized boolean isPendingStop(){
return pendingStop;
}
- protected synchronized void setPendingStop(boolean pendingStop) {
- this.pendingStop = pendingStop;
+ protected synchronized void setPendingStop(boolean pendingStop){
+ this.pendingStop=pendingStop;
}
- public Response processBrokerInfo(BrokerInfo info) {
- if (info.isSlaveBroker()) {
- //stream messages from this broker (the master) to
- //the slave
- MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
- masterBroker = new MasterBroker(parent, transport);
+ public Response processBrokerInfo(BrokerInfo info){
+ if(info.isSlaveBroker()){
+ // stream messages from this broker (the master) to
+ // the slave
+ MutableBrokerFilter parent=(MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
+ masterBroker=new MasterBroker(parent,transport);
masterBroker.startProcessing();
- log.info("Slave Broker " + info.getBrokerName() + " is attached");
+ log.info("Slave Broker "+info.getBrokerName()+" is attached");
}
-
// We only expect to get one broker info command per connection
- if( this.brokerInfo!=null ) {
+ if(this.brokerInfo!=null){
log.warn("Unexpected extra broker info command received: "+info);
}
-
- this.brokerInfo = info;
- broker.addBroker(this, info);
+ this.brokerInfo=info;
+ broker.addBroker(this,info);
return null;
}
- protected void dispatch(Command command) {
- try {
+ protected void dispatch(Command command){
+ try{
setMarkedCandidate(true);
transport.oneway(command);
getStatistics().onCommand(command);
- }
- catch (IOException e) {
+ }catch(IOException e){
serviceExceptionAsync(e);
- }
- finally {
+ }finally{
setMarkedCandidate(false);
}
}
- public String getRemoteAddress() {
+ public String getRemoteAddress(){
return transport.getRemoteAddress();
}
}