You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/03 16:12:13 UTC
svn commit: r391049 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Author: rajdavies
Date: Mon Apr 3 07:12:11 2006
New Revision: 391049
URL: http://svn.apache.org/viewcvs?rev=391049&view=rev
Log:
resolve some timing issues
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=391049&r1=391048&r2=391049&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Mon Apr 3 07:12:11 2006
@@ -51,19 +51,37 @@
private Transport slave;
private AtomicBoolean started=new AtomicBoolean(false);
+ /**
+ * Constructor
+ * @param parent
+ * @param slave
+ */
public MasterBroker(MutableBrokerFilter parent,Transport slave){
super(parent);
this.slave=slave;
}
+ /**
+ * start processing this broker
+ *
+ */
public void startProcessing(){
started.set(true);
}
+ /**
+ * stop the broker
+ * @throws Exception
+ */
public void stop() throws Exception{
super.stop();
stopProcessing();
}
+
+ /**
+ * stop processing this broker
+ *
+ */
public void stopProcessing(){
if (started.compareAndSet(true,false)){
remove();
@@ -76,7 +94,7 @@
* A client is establishing a connection with the broker.
* @param context
* @param info
- * @param client
+ * @throws Exception
*/
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{
super.addConnection(context,info);
@@ -87,8 +105,8 @@
* A client is disconnecting from the broker.
* @param context the environment the operation is being executed under.
* @param info
- * @param client
* @param error null if the client requested the disconnect or the error that caused the client to disconnect.
+ * @throws Exception
*/
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{
super.removeConnection(context,info,error);
@@ -99,8 +117,10 @@
* Adds a session.
* @param context
* @param info
+ * @throws Exception
*/
public void addSession(ConnectionContext context, SessionInfo info) throws Exception{
+
super.addSession(context, info);
sendAsyncToSlave(info);
}
@@ -109,15 +129,20 @@
* Removes a session.
* @param context
* @param info
+ * @throws Exception
*/
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{
super.removeSession(context, info);
sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
+
+
}
/**
* Adds a producer.
* @param context the enviorment the operation is being executed under.
+ * @param info
+ * @throws Exception
*/
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
super.addProducer(context,info);
@@ -127,19 +152,34 @@
/**
* Removes a producer.
* @param context the enviorment the operation is being executed under.
+ * @param info
+ * @throws Exception
*/
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
super.removeProducer(context, info);
sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
}
+ /**
+ * add a consumer
+ * @param context
+ * @param info
+ * @return the assocated subscription
+ * @throws Exception
+ */
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
- Subscription answer = super.addConsumer(context, info);
sendAsyncToSlave(info);
+ Subscription answer = super.addConsumer(context, info);
+
return answer;
}
-
+ /**
+ * remove a subscription
+ * @param context
+ * @param info
+ * @throws Exception
+ */
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
super.removeSubscription(context, info);
sendAsyncToSlave(info);
@@ -147,60 +187,75 @@
+ /**
+ * begin a transaction
+ * @param context
+ * @param xid
+ * @throws Exception
+ */
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- super.beginTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
sendAsyncToSlave(info);
+ super.beginTransaction(context, xid);
+
+
}
/**
* Prepares a transaction. Only valid for xa transactions.
- * @param client
+ * @param context
* @param xid
- * @return
+ * @return the state
+ * @throws Exception
*/
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- int result = super.prepareTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
sendAsyncToSlave(info);
+ int result = super.prepareTransaction(context, xid);
+
return result;
}
/**
* Rollsback a transaction.
- * @param client
+ * @param context
* @param xid
+ * @throws Exception
*/
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- super.rollbackTransaction(context, xid);
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
sendAsyncToSlave(info);
+ super.rollbackTransaction(context, xid);
+
}
/**
* Commits a transaction.
- * @param client
+ * @param context
* @param xid
* @param onePhase
+ * @throws Exception
*/
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{
- super.commitTransaction(context, xid,onePhase);
+
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
sendSyncToSlave(info);
+ super.commitTransaction(context, xid,onePhase);
}
/**
* Forgets a transaction.
- * @param client
- * @param xid
- * @param onePhase
+ * @param context
+ * @param xid
+ * @throws Exception
*/
public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{
- super.forgetTransaction(context, xid);
+
TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
sendAsyncToSlave(info);
+ super.forgetTransaction(context, xid);
}
/**
@@ -208,15 +263,22 @@
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch){
- super.processDispatch(messageDispatch);
+
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setConsumerId(messageDispatch.getConsumerId());
mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
mdn.setDestination(messageDispatch.getDestination());
mdn.setMessageId(messageDispatch.getMessage().getMessageId());
sendAsyncToSlave(mdn);
+ super.processDispatch(messageDispatch);
}
+ /**
+ * @param context
+ * @param message
+ * @throws Exception
+ *
+ */
public void send(ConnectionContext context, Message message) throws Exception{
/**
* A message can be dispatched before the super.send() method returns
@@ -225,12 +287,20 @@
*/
sendToSlave(message);
super.send(context,message);
+
}
+ /**
+ * @param context
+ * @param ack
+ * @throws Exception
+ *
+ */
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{
- super.acknowledge(context, ack);
sendToSlave(ack);
+ super.acknowledge(context, ack);
+
}