You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/27 08:40:14 UTC

svn commit: r490452 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java

Author: rajdavies
Date: Tue Dec 26 23:40:13 2006
New Revision: 490452

URL: http://svn.apache.org/viewvc?view=rev&rev=490452
Log:
somehow, somewhere, the ResponseCorrelator got dropped from the transport used by the 
MasterBroker - so I've added it back here

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?view=diff&rev=490452&r1=490451&r2=490452
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Tue Dec 26 23:40:13 2006
@@ -1,24 +1,20 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.ft;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.InsertableMutableBrokerFilter;
@@ -40,6 +36,8 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,23 +48,28 @@
  * @version $Revision: 1.8 $
  */
 public class MasterBroker extends InsertableMutableBrokerFilter{
+
     private static final Log log=LogFactory.getLog(MasterBroker.class);
     private Transport slave;
     private AtomicBoolean started=new AtomicBoolean(false);
 
     /**
      * Constructor
+     * 
      * @param parent
-     * @param slave
+     * @param transport
      */
-    public MasterBroker(MutableBrokerFilter parent,Transport slave){
+    public MasterBroker(MutableBrokerFilter parent,Transport transport){
         super(parent);
-        this.slave=slave;
+        this.slave=transport;
+        this.slave=new MutexTransport(slave);
+        this.slave=new ResponseCorrelator(slave);
+        this.slave.setTransportListener(transport.getTransportListener());
     }
 
     /**
      * start processing this broker
-     *
+     * 
      */
     public void startProcessing(){
         started.set(true);
@@ -95,251 +98,240 @@
         super.stop();
         stopProcessing();
     }
-    
+
     /**
      * stop processing this broker
-     *
+     * 
      */
     public void stopProcessing(){
-        if (started.compareAndSet(true,false)){
+        if(started.compareAndSet(true,false)){
             remove();
         }
     }
 
-    
-    
     /**
      * A client is establishing a connection with the broker.
+     * 
      * @param context
-     * @param info 
-     * @throws Exception 
+     * @param info
+     * @throws Exception
      */
-    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception{
+    public void addConnection(ConnectionContext context,ConnectionInfo info) throws Exception{
         super.addConnection(context,info);
         sendAsyncToSlave(info);
     }
-    
+
     /**
      * A client is disconnecting from the broker.
+     * 
      * @param context the environment the operation is being executed under.
-     * @param info 
+     * @param info
      * @param error null if the client requested the disconnect or the error that caused the client to disconnect.
-     * @throws Exception 
+     * @throws Exception
      */
-    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception{
+    public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Exception{
         super.removeConnection(context,info,error);
         sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
     }
 
     /**
      * Adds a session.
+     * 
      * @param context
      * @param info
-     * @throws Exception 
+     * @throws Exception
      */
-    public void addSession(ConnectionContext context, SessionInfo info) throws Exception{
-       
-        super.addSession(context, info);
+    public void addSession(ConnectionContext context,SessionInfo info) throws Exception{
+        super.addSession(context,info);
         sendAsyncToSlave(info);
     }
 
     /**
      * Removes a session.
+     * 
      * @param context
      * @param info
-     * @throws Exception 
+     * @throws Exception
      */
-    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception{
-        super.removeSession(context, info);
+    public void removeSession(ConnectionContext context,SessionInfo info) throws Exception{
+        super.removeSession(context,info);
         sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
-       
-        
     }
 
     /**
      * Adds a producer.
+     * 
      * @param context the enviorment the operation is being executed under.
-     * @param info 
-     * @throws Exception 
+     * @param info
+     * @throws Exception
      */
-    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+    public void addProducer(ConnectionContext context,ProducerInfo info) throws Exception{
         super.addProducer(context,info);
         sendAsyncToSlave(info);
     }
 
     /**
      * Removes a producer.
+     * 
      * @param context the enviorment the operation is being executed under.
-     * @param info 
-     * @throws Exception 
+     * @param info
+     * @throws Exception
      */
-    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
-        super.removeProducer(context, info);
+    public void removeProducer(ConnectionContext context,ProducerInfo info) throws Exception{
+        super.removeProducer(context,info);
         sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
     }
-    
+
     /**
      * add a consumer
-     * @param context 
-     * @param info 
+     * 
+     * @param context
+     * @param info
      * @return the assocated subscription
-     * @throws Exception 
+     * @throws Exception
      */
-    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+    public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
         sendAsyncToSlave(info);
-        Subscription answer = super.addConsumer(context, info);
-        
+        Subscription answer=super.addConsumer(context,info);
         return answer;
     }
 
     /**
      * remove a subscription
-     * @param context 
-     * @param info 
-     * @throws Exception 
+     * 
+     * @param context
+     * @param info
+     * @throws Exception
      */
-    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
-        super.removeSubscription(context, info);
+    public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Exception{
+        super.removeSubscription(context,info);
         sendAsyncToSlave(info);
     }
-      
-    
 
     /**
      * begin a transaction
-     * @param context 
-     * @param xid 
-     * @throws Exception 
+     * 
+     * @param context
+     * @param xid
+     * @throws Exception
      */
-    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception{
-        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
+    public void beginTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
         sendAsyncToSlave(info);
-        super.beginTransaction(context, xid);
-       
-        
+        super.beginTransaction(context,xid);
     }
 
     /**
      * Prepares a transaction. Only valid for xa transactions.
-     * @param context 
+     * 
+     * @param context
      * @param xid
      * @return the state
-     * @throws Exception 
+     * @throws Exception
      */
-    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception{
-        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
+    public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
         sendAsyncToSlave(info);
-        int result = super.prepareTransaction(context, xid);
-        
+        int result=super.prepareTransaction(context,xid);
         return result;
     }
 
     /**
      * Rollsback a transaction.
-     * @param context 
+     * 
+     * @param context
      * @param xid
-     * @throws Exception 
+     * @throws Exception
      */
-
-    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception{
-        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
+    public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
         sendAsyncToSlave(info);
-        super.rollbackTransaction(context, xid);
-        
-        
+        super.rollbackTransaction(context,xid);
     }
 
     /**
      * Commits a transaction.
-     * @param context 
+     * 
+     * @param context
      * @param xid
      * @param onePhase
-     * @throws Exception 
+     * @throws Exception
      */
-    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception{
-        
-        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
+    public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Exception{
+        TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
         sendSyncToSlave(info);
-        super.commitTransaction(context, xid,onePhase);
+        super.commitTransaction(context,xid,onePhase);
     }
 
     /**
      * Forgets a transaction.
-     * @param context 
-     * @param xid 
-     * @throws Exception 
-     */
-    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception{
-       
-        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
+     * 
+     * @param context
+     * @param xid
+     * @throws Exception
+     */
+    public void forgetTransaction(ConnectionContext context,TransactionId xid) throws Exception{
+        TransactionInfo info=new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
         sendAsyncToSlave(info);
-        super.forgetTransaction(context, xid);
+        super.forgetTransaction(context,xid);
     }
-    
+
     /**
      * Notifiy the Broker that a dispatch has happened
+     * 
      * @param messageDispatch
      */
     public void processDispatch(MessageDispatch messageDispatch){
-        
-        MessageDispatchNotification mdn = new MessageDispatchNotification();
+        MessageDispatchNotification mdn=new MessageDispatchNotification();
         mdn.setConsumerId(messageDispatch.getConsumerId());
         mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
         mdn.setDestination(messageDispatch.getDestination());
-        if( messageDispatch.getMessage() != null )
+        if(messageDispatch.getMessage()!=null)
             mdn.setMessageId(messageDispatch.getMessage().getMessageId());
         sendAsyncToSlave(mdn);
         super.processDispatch(messageDispatch);
     }
-    
+
     /**
-     * @param context 
-     * @param message 
-     * @throws Exception 
+     * @param context
+     * @param message
+     * @throws Exception
      * 
      */
-    public void send(ConnectionContext context, Message message) throws Exception{
+    public void send(ConnectionContext context,Message message) throws Exception{
         /**
-         * A message can be dispatched before the super.send() method returns
-         * so - here the order is switched to avoid problems on the slave
-         * with receiving acks for messages not received yey
+         * A message can be dispatched before the super.send() method returns so - here the order is switched to avoid
+         * problems on the slave with receiving acks for messages not received yey
          */
         sendToSlave(message);
         super.send(context,message);
-        
     }
-    
-   
+
     /**
-     * @param context 
-     * @param ack 
-     * @throws Exception 
+     * @param context
+     * @param ack
+     * @throws Exception
      * 
      */
-    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception{
+    public void acknowledge(ConnectionContext context,MessageAck ack) throws Exception{
         sendToSlave(ack);
-        super.acknowledge(context, ack);
-       
+        super.acknowledge(context,ack);
     }
-    
+
     public boolean isFaultTolerantConfiguration(){
         return true;
     }
-    
 
     protected void sendToSlave(Message message){
-        
-        if ( message.isResponseRequired() ){
+        if(message.isResponseRequired()){
             sendSyncToSlave(message);
         }else{
             sendAsyncToSlave(message);
         }
-        
-        
     }
-    
+
     protected void sendToSlave(MessageAck ack){
-        if ( ack.isResponseRequired() ){
+        if(ack.isResponseRequired()){
             sendAsyncToSlave(ack);
         }else{
             sendSyncToSlave(ack);
@@ -348,9 +340,7 @@
 
     protected void sendAsyncToSlave(Command command){
         try{
-
             slave.oneway(command);
-
         }catch(Throwable e){
             log.error("Slave Failed",e);
             stopProcessing();
@@ -359,17 +349,15 @@
 
     protected void sendSyncToSlave(Command command){
         try{
-
-            Response response=(Response) slave.request(command);
-            if (response.isException()){
+            System.err.println("SEMNDING SYNC "+command);
+            Response response=(Response)slave.request(command);
+            System.out.println("GOT RESPONSE "+response);
+            if(response.isException()){
                 ExceptionResponse er=(ExceptionResponse)response;
                 log.error("Slave Failed",er.getException());
             }
-
         }catch(Throwable e){
             log.error("Slave Failed",e);
-           
         }
     }
-
 }