You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/09 19:23:45 UTC

svn commit: r516492 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/ command/ state/

Author: chirino
Date: Fri Mar  9 10:23:44 2007
New Revision: 516492

URL: http://svn.apache.org/viewvc?view=rev&rev=516492
Log:
Enhanced the ActiveMQConnection to use the CommandVisitor instead of using a big if swtich
when handling commands from the broker.  This should be slightly more efficient.


Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Mar  9 10:23:44 2007
@@ -87,6 +87,7 @@
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.state.CommandVisitorAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -1540,53 +1541,81 @@
     public void onCommand(final Object o) {
     	final Command command = (Command) o;
         if (!closed.get() && command != null) {
-            if (command.isMessageDispatch()) {
-                MessageDispatch md = (MessageDispatch) command;
-                ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId());
-                if (dispatcher != null) {
-                    // Copy in case a embedded broker is dispatching via vm://
-                    // md.getMessage() == null to signal end of queue browse.
-                    Message msg = md.getMessage();
-                    if( msg!=null ) {
-                        msg = msg.copy();
-                        msg.setReadOnlyBody(true);
-                        msg.setReadOnlyProperties(true);
-                        msg.setRedeliveryCounter(md.getRedeliveryCounter());
-                        msg.setConnection(this);
-                        md.setMessage( msg );
-                    }
-                    dispatcher.dispatch(md);
-                }
-            } else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) {
-            	ProducerAck pa = (ProducerAck) command;
-            	ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
-            	if( producer!=null ) {
-            		producer.onProducerAck(pa);
-            	}
-            } else if ( command.isBrokerInfo() ) {
-                this.brokerInfo = (BrokerInfo)command;
-                brokerInfoReceived.countDown();
-                this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration();
-                getBlobTransferPolicy().setBrokerUploadUrl(brokerInfo.getBrokerUploadUrl());
-            }
-            else if (command instanceof ControlCommand) {
-                onControlCommand((ControlCommand) command);
-            }
-            else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
-                asyncConnectionThread.execute(new Runnable(){
-                    public void run() {
-                        onAsyncException(((ConnectionError)command).getException());
-                    }
-                });
-                new Thread("Async error worker") {
-                }.start();
-            }else if (command instanceof ConnectionControl){
-                onConnectionControl((ConnectionControl) command);
-            }else if (command instanceof ConsumerControl){
-                onConsumerControl((ConsumerControl) command);
-            }else if ( command.isWireFormatInfo() ) {
-            	onWireFormatInfo((WireFormatInfo)command);
-            }
+        	try {
+				command.visit(new CommandVisitorAdapter(){
+					@Override
+					public Response processMessageDispatch(MessageDispatch md) throws Exception {
+		                ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId());
+		                if (dispatcher != null) {
+		                    // Copy in case a embedded broker is dispatching via vm://
+		                    // md.getMessage() == null to signal end of queue browse.
+		                    Message msg = md.getMessage();
+		                    if( msg!=null ) {
+		                        msg = msg.copy();
+		                        msg.setReadOnlyBody(true);
+		                        msg.setReadOnlyProperties(true);
+		                        msg.setRedeliveryCounter(md.getRedeliveryCounter());
+		                        msg.setConnection(ActiveMQConnection.this);
+		                        md.setMessage( msg );
+		                    }
+		                    dispatcher.dispatch(md);
+		                }
+		                return null;
+					}
+					
+					@Override
+					public Response processProducerAck(ProducerAck pa) throws Exception {
+		            	ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
+		            	if( producer!=null ) {
+		            		producer.onProducerAck(pa);
+		            	}
+		            	return null;
+					}
+					
+					@Override
+					public Response processBrokerInfo(BrokerInfo info) throws Exception {
+		                brokerInfoReceived.countDown();
+		                optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
+		                getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
+		                return null;
+					}
+					
+					@Override
+					public Response processConnectionError(final ConnectionError error) throws Exception {
+		                asyncConnectionThread.execute(new Runnable(){
+		                    public void run() {
+		                        onAsyncException(error.getException());
+		                    }
+		                });
+		                new Thread("Async error worker") {
+		                }.start();
+		                return null;
+					}
+					@Override
+					public Response processControlCommand(ControlCommand command) throws Exception {
+		                onControlCommand(command);
+		                return null;
+					}
+					@Override
+					public Response processConnectionControl(ConnectionControl control) throws Exception {
+		                onConnectionControl((ConnectionControl) command);
+		                return null;
+					}
+					@Override
+					public Response processConsumerControl(ConsumerControl control) throws Exception {
+		                onConsumerControl((ConsumerControl) command);
+		                return null;
+					}
+					@Override
+					public Response processWireFormat(WireFormatInfo info) throws Exception {
+		                onConsumerControl((ConsumerControl) command);
+		                return null;
+					}
+				});
+			} catch (Exception e) {
+				onAsyncException(e);
+			}
+			
         }
         for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
             TransportListener listener = (TransportListener) iter.next();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Mar  9 10:23:44 2007
@@ -40,8 +40,10 @@
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
 import org.apache.activemq.command.DataArrayResponse;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
@@ -1176,6 +1178,28 @@
 	
 	public int getProtocolVersion() {
 		return protocolVersion.get();
+	}
+
+	public Response processControlCommand(ControlCommand command) throws Exception {
+	    if (command.equals("shutdown"))
+	        System.exit(0);
+        return null;
+	}
+
+	public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
+		return null;
+	}
+
+	public Response processConnectionControl(ConnectionControl control) throws Exception {
+		return null;
+	}
+
+	public Response processConnectionError(ConnectionError error) throws Exception {
+		return null;
+	}
+
+	public Response processConsumerControl(ConsumerControl control) throws Exception {
+		return null;
 	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Fri Mar  9 10:23:44 2007
@@ -38,7 +38,7 @@
     }
 
     public Response visit(CommandVisitor visitor) throws Exception{
-        return null;
+        return visitor.processConnectionControl(this);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java Fri Mar  9 10:23:44 2007
@@ -37,7 +37,7 @@
     }
        
     public Response visit(CommandVisitor visitor) throws Exception {
-        return null;
+        return visitor.processConnectionError(this);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java Fri Mar  9 10:23:44 2007
@@ -41,16 +41,9 @@
     }
 
     
-
-   
-
     public Response visit(CommandVisitor visitor) throws Exception {
-    return null;
+    	return visitor.processConsumerControl(this);
     }
-
-
-
-
 
     /**
      * @openwire:property version=1

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java Fri Mar  9 10:23:44 2007
@@ -48,8 +48,6 @@
     }
 
     public Response visit(CommandVisitor visitor) throws Exception {
-    if (command.equals("shutdown"))
-        System.exit(0);
-        return null;
+        return visitor.processControlCommand(this);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Fri Mar  9 10:23:44 2007
@@ -102,7 +102,7 @@
     }
 
     public Response visit(CommandVisitor visitor) throws Exception {
-        return null;
+        return visitor.processMessageDispatch(this);
     }
 
 	public Runnable getTransmitCallback() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java Fri Mar  9 10:23:44 2007
@@ -18,15 +18,20 @@
 package org.apache.activemq.state;
 
 import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.FlushCommand;
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerAck;
@@ -77,6 +82,11 @@
     Response processEndTransaction(TransactionInfo info) throws Exception;
     Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
 	Response processProducerAck(ProducerAck ack) throws Exception;
+	Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
+	Response processControlCommand(ControlCommand command) throws Exception;
+	Response processConnectionError(ConnectionError error) throws Exception;
+	Response processConnectionControl(ConnectionControl control) throws Exception;
+	Response processConsumerControl(ConsumerControl control) throws Exception;
     
 }
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java?view=auto&rev=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java Fri Mar  9 10:23:44 2007
@@ -0,0 +1,186 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+
+public class CommandVisitorAdapter implements CommandVisitor {
+
+	public Response processAddConnection(ConnectionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processAddConsumer(ConsumerInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processAddDestination(DestinationInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processAddProducer(ProducerInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processAddSession(SessionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processBeginTransaction(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processBrokerInfo(BrokerInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processEndTransaction(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processFlush(FlushCommand command) throws Exception {
+		return null;
+	}
+
+	public Response processForgetTransaction(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processMessage(Message send) throws Exception {
+		return null;
+	}
+
+	public Response processMessageAck(MessageAck ack) throws Exception {
+		return null;
+	}
+
+	public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
+		return null;
+	}
+
+	public Response processMessagePull(MessagePull pull) throws Exception {
+		return null;
+	}
+
+	public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processProducerAck(ProducerAck ack) throws Exception {
+		return null;
+	}
+
+	public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processRemoveConnection(ConnectionId id) throws Exception {
+		return null;
+	}
+
+	public Response processRemoveConsumer(ConsumerId id) throws Exception {
+		return null;
+	}
+
+	public Response processRemoveDestination(DestinationInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processRemoveProducer(ProducerId id) throws Exception {
+		return null;
+	}
+
+	public Response processRemoveSession(SessionId id) throws Exception {
+		return null;
+	}
+
+	public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processShutdown(ShutdownInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processWireFormat(WireFormatInfo info) throws Exception {
+		return null;
+	}
+
+	public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
+		return null;
+	}
+
+	public Response processControlCommand(ControlCommand command) throws Exception {
+		return null;
+	}
+
+	public Response processConnectionControl(ConnectionControl control) throws Exception {
+		return null;
+	}
+
+	public Response processConnectionError(ConnectionError error) throws Exception {
+		return null;
+	}
+
+	public Response processConsumerControl(ConsumerControl control) throws Exception {
+		return null;
+	}
+
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Fri Mar  9 10:23:44 2007
@@ -19,42 +19,32 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IOExceptionSupport;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Tracks the state of a connection so a newly established transport can 
  * be re-initialized to the state that was tracked.
  * 
  * @version $Revision$
  */
-public class ConnectionStateTracker implements CommandVisitor {
+public class ConnectionStateTracker extends CommandVisitorAdapter {
 
 	private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
     
@@ -311,9 +301,6 @@
         return TRACKED_RESPONSE_MARKER;
     }
 
-    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
-        return null;
-    }
     
     public Response processMessage(Message send) throws Exception{
         if(trackTransactions&&send!=null&&send.getTransactionId()!=null){
@@ -448,43 +435,6 @@
         return null;
     }
     
-    public Response processRecoverTransactions(TransactionInfo info) {
-        return null;
-    }
-    public Response processForgetTransaction(TransactionInfo info) throws Exception {
-        return null;
-    }
-
-    
-    public Response processWireFormat(WireFormatInfo info) throws Exception {
-        return null;
-    }
-    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
-        return null;
-    }
-    public Response processShutdown(ShutdownInfo info) throws Exception {
-        return null;
-    }
-    public Response processBrokerInfo(BrokerInfo info) throws Exception {
-        return null;
-    }
-
-    public Response processFlush(FlushCommand command) throws Exception {
-        return null;
-    }
-    
-    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
-        return null;
-    }
-    
-    public Response processMessagePull(MessagePull pull) throws Exception {
-        return null;
-    }
-
-    public Response processProducerAck(ProducerAck ack) throws Exception {
-		return null;
-	}
-
     public boolean isRestoreConsumers() {
         return restoreConsumers;
     }
@@ -524,6 +474,5 @@
 	public void setRestoreTransaction(boolean restoreTransaction) {
 		this.restoreTransaction = restoreTransaction;
 	}
-
 
 }