You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/27 08:40:14 UTC
svn commit: r490452 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Author: rajdavies
Date: Tue Dec 26 23:40:13 2006
New Revision: 490452
URL: http://svn.apache.org/viewvc?view=rev&rev=490452
Log:
somehow, somewhere, the ResponseCorrelator got dropped from the transport used by the
MasterBroker - so I've added it back here
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?view=diff&rev=490452&r1=490451&r2=490452
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Tue Dec 26 23:40:13 2006
@@ -1,24 +1,20 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.ft;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.InsertableMutableBrokerFilter;
@@ -40,6 +36,8 @@
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,23 +48,28 @@
* @version $Revision: 1.8 $
*/
public class MasterBroker extends InsertableMutableBrokerFilter{
+
private static final Log log=LogFactory.getLog(MasterBroker.class);
private Transport slave;
private AtomicBoolean started=new AtomicBoolean(false);
/**
* Constructor
+ *
* @param parent
- * @param slave
+ * @param transport
*/
- public MasterBroker(MutableBrokerFilter parent,Transport slave){
+ public MasterBroker(MutableBrokerFilter parent,Transport transport){
super(parent);
- this.slave=slave;
+ this.slave=transport;
+ this.slave=new MutexTransport(slave);
+ this.slave=new ResponseCorrelator(slave);
+ this.slave.setTransportListener(transport.getTransportListener());
}
/**
* start processing this broker
- *
+ *
*/
public void startProcessing(){
started.set(true);
@@ -95,251 +98,240 @@
super.stop();
stopProcessing();
}
-
+
/**
* stop processing this broker
- *
+ *
*/
public void stopProcessing(){
- if (started.compareAndSet(true,false)){
+ if(started.compareAndSet(true,false)){
remove();
}
}
-
-
/**
* A client is establishing a connection with the broker.
+ *
* @param context
- * @param info
- * @throws Exception
+ * @param info
+ * @throws Exception
*/
- public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{
+ public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception{
super.addConnection(context,info);
sendAsyncToSlave(info);
}
-
+
/**
* A client is disconnecting from the broker.
+ *
* @param context the environment the operation is being executed under.
- * @param info
+ * @param info
* @param error null if the client requested the disconnect or the error that caused the client to disconnect.
- * @throws Exception
+ * @throws Exception
*/
- public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{
+ public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Exception{
super.removeConnection(context,info,error);
sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
}
/**
* Adds a session.
+ *
* @param context
* @param info
- * @throws Exception
+ * @throws Exception
*/
- public void addSession(ConnectionContext context, SessionInfo info) throws Exception{
-
- super.addSession(context, info);
+ public void addSession(ConnectionContext context,SessionInfo info) throws Exception{
+ super.addSession(context,info);
sendAsyncToSlave(info);
}
/**
* Removes a session.
+ *
* @param context
* @param info
- * @throws Exception
+ * @throws Exception
*/
- public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{
- super.removeSession(context, info);
+ public void removeSession(ConnectionContext context,SessionInfo info) throws Exception{
+ super.removeSession(context,info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
-
-
}
/**
* Adds a producer.
+ *
* @param context the enviorment the operation is being executed under.
- * @param info
- * @throws Exception
+ * @param info
+ * @throws Exception
*/
- public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+ public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception{
super.addProducer(context,info);
sendAsyncToSlave(info);
}
/**
* Removes a producer.
+ *
* @param context the enviorment the operation is being executed under.
- * @param info
- * @throws Exception
+ * @param info
+ * @throws Exception
*/
- public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
- super.removeProducer(context, info);
+ public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception{
+ super.removeProducer(context,info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
-
+
/**
* add a consumer
- * @param context
- * @param info
+ *
+ * @param context
+ * @param info
* @return the assocated subscription
- * @throws Exception
+ * @throws Exception
*/
- public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+ public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
sendAsyncToSlave(info);
- Subscription answer = super.addConsumer(context, info);
-
+ Subscription answer=super.addConsumer(context,info);
return answer;
}
/**
* remove a subscription
- * @param context
- * @param info
- * @throws Exception
+ *
+ * @param context
+ * @param info
+ * @throws Exception
*/
- public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
- super.removeSubscription(context, info);
+ public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception{
+ super.removeSubscription(context,info);
sendAsyncToSlave(info);
}
-
-
/**
* begin a transaction
- * @param context
- * @param xid
- * @throws Exception
+ *
+ * @param context
+ * @param xid
+ * @throws Exception
*/
- public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
+ public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+ TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
sendAsyncToSlave(info);
- super.beginTransaction(context, xid);
-
-
+ super.beginTransaction(context,xid);
}
/**
* Prepares a transaction. Only valid for xa transactions.
- * @param context
+ *
+ * @param context
* @param xid
* @return the state
- * @throws Exception
+ * @throws Exception
*/
- public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
+ public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+ TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
sendAsyncToSlave(info);
- int result = super.prepareTransaction(context, xid);
-
+ int result=super.prepareTransaction(context,xid);
return result;
}
/**
* Rollsback a transaction.
- * @param context
+ *
+ * @param context
* @param xid
- * @throws Exception
+ * @throws Exception
*/
-
- public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
+ public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+ TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
- super.rollbackTransaction(context, xid);
-
-
+ super.rollbackTransaction(context,xid);
}
/**
* Commits a transaction.
- * @param context
+ *
+ * @param context
* @param xid
* @param onePhase
- * @throws Exception
+ * @throws Exception
*/
- public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{
-
- TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
+ public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception{
+ TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
sendSyncToSlave(info);
- super.commitTransaction(context, xid,onePhase);
+ super.commitTransaction(context,xid,onePhase);
}
/**
* Forgets a transaction.
- * @param context
- * @param xid
- * @throws Exception
- */
- public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{
-
- TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
+ *
+ * @param context
+ * @param xid
+ * @throws Exception
+ */
+ public void forgetTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+ TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
sendAsyncToSlave(info);
- super.forgetTransaction(context, xid);
+ super.forgetTransaction(context,xid);
}
-
+
/**
* Notifiy the Broker that a dispatch has happened
+ *
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch){
-
- MessageDispatchNotification mdn = new MessageDispatchNotification();
+ MessageDispatchNotification mdn=new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination());
- if( messageDispatch.getMessage() != null )
+ if(messageDispatch.getMessage()!=null)
mdn.setMessageId(messageDispatch.getMessage().getMessageId());
sendAsyncToSlave(mdn);
super.processDispatch(messageDispatch);
}
-
+
/**
- * @param context
- * @param message
- * @throws Exception
+ * @param context
+ * @param message
+ * @throws Exception
*
*/
- public void send(ConnectionContext context, Message message) throws Exception{
+ public void send(ConnectionContext context,Message message) throws Exception{
/**
- * A message can be dispatched before the super.send() method returns
- * so - here the order is switched to avoid problems on the slave
- * with receiving acks for messages not received yey
+ * A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
+ * problems on the slave with receiving acks for messages not received yey
*/
sendToSlave(message);
super.send(context,message);
-
}
-
-
+
/**
- * @param context
- * @param ack
- * @throws Exception
+ * @param context
+ * @param ack
+ * @throws Exception
*
*/
- public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{
+ public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
sendToSlave(ack);
- super.acknowledge(context, ack);
-
+ super.acknowledge(context,ack);
}
-
+
public boolean isFaultTolerantConfiguration(){
return true;
}
-
protected void sendToSlave(Message message){
-
- if ( message.isResponseRequired() ){
+ if(message.isResponseRequired()){
sendSyncToSlave(message);
}else{
sendAsyncToSlave(message);
}
-
-
}
-
+
protected void sendToSlave(MessageAck ack){
- if ( ack.isResponseRequired() ){
+ if(ack.isResponseRequired()){
sendAsyncToSlave(ack);
}else{
sendSyncToSlave(ack);
@@ -348,9 +340,7 @@
protected void sendAsyncToSlave(Command command){
try{
-
slave.oneway(command);
-
}catch(Throwable e){
log.error("Slave Failed",e);
stopProcessing();
@@ -359,17 +349,15 @@
protected void sendSyncToSlave(Command command){
try{
-
- Response response=(Response) slave.request(command);
- if (response.isException()){
+ System.err.println("SEMNDING SYNC "+command);
+ Response response=(Response)slave.request(command);
+ System.out.println("GOT RESPONSE "+response);
+ if(response.isException()){
ExceptionResponse er=(ExceptionResponse)response;
log.error("Slave Failed",er.getException());
}
-
}catch(Throwable e){
log.error("Slave Failed",e);
-
}
}
-
}