You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/09 19:23:45 UTC
svn commit: r516492 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/
command/ state/
Author: chirino
Date: Fri Mar 9 10:23:44 2007
New Revision: 516492
URL: http://svn.apache.org/viewvc?view=rev&rev=516492
Log:
Enhanced the ActiveMQConnection to use the CommandVisitor instead of using a big if swtich
when handling commands from the broker. This should be slightly more efficient.
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Mar 9 10:23:44 2007
@@ -87,6 +87,7 @@
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -1540,53 +1541,81 @@
public void onCommand(final Object o) {
final Command command = (Command) o;
if (!closed.get() && command != null) {
- if (command.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch) command;
- ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId());
- if (dispatcher != null) {
- // Copy in case a embedded broker is dispatching via vm://
- // md.getMessage() == null to signal end of queue browse.
- Message msg = md.getMessage();
- if( msg!=null ) {
- msg = msg.copy();
- msg.setReadOnlyBody(true);
- msg.setReadOnlyProperties(true);
- msg.setRedeliveryCounter(md.getRedeliveryCounter());
- msg.setConnection(this);
- md.setMessage( msg );
- }
- dispatcher.dispatch(md);
- }
- } else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) {
- ProducerAck pa = (ProducerAck) command;
- ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
- if( producer!=null ) {
- producer.onProducerAck(pa);
- }
- } else if ( command.isBrokerInfo() ) {
- this.brokerInfo = (BrokerInfo)command;
- brokerInfoReceived.countDown();
- this.optimizeAcknowledge &= !this.brokerInfo.isFaultTolerantConfiguration();
- getBlobTransferPolicy().setBrokerUploadUrl(brokerInfo.getBrokerUploadUrl());
- }
- else if (command instanceof ControlCommand) {
- onControlCommand((ControlCommand) command);
- }
- else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
- asyncConnectionThread.execute(new Runnable(){
- public void run() {
- onAsyncException(((ConnectionError)command).getException());
- }
- });
- new Thread("Async error worker") {
- }.start();
- }else if (command instanceof ConnectionControl){
- onConnectionControl((ConnectionControl) command);
- }else if (command instanceof ConsumerControl){
- onConsumerControl((ConsumerControl) command);
- }else if ( command.isWireFormatInfo() ) {
- onWireFormatInfo((WireFormatInfo)command);
- }
+ try {
+ command.visit(new CommandVisitorAdapter(){
+ @Override
+ public Response processMessageDispatch(MessageDispatch md) throws Exception {
+ ActiveMQDispatcher dispatcher = (ActiveMQDispatcher) dispatchers.get(md.getConsumerId());
+ if (dispatcher != null) {
+ // Copy in case a embedded broker is dispatching via vm://
+ // md.getMessage() == null to signal end of queue browse.
+ Message msg = md.getMessage();
+ if( msg!=null ) {
+ msg = msg.copy();
+ msg.setReadOnlyBody(true);
+ msg.setReadOnlyProperties(true);
+ msg.setRedeliveryCounter(md.getRedeliveryCounter());
+ msg.setConnection(ActiveMQConnection.this);
+ md.setMessage( msg );
+ }
+ dispatcher.dispatch(md);
+ }
+ return null;
+ }
+
+ @Override
+ public Response processProducerAck(ProducerAck pa) throws Exception {
+ ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
+ if( producer!=null ) {
+ producer.onProducerAck(pa);
+ }
+ return null;
+ }
+
+ @Override
+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
+ brokerInfoReceived.countDown();
+ optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
+ getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
+ return null;
+ }
+
+ @Override
+ public Response processConnectionError(final ConnectionError error) throws Exception {
+ asyncConnectionThread.execute(new Runnable(){
+ public void run() {
+ onAsyncException(error.getException());
+ }
+ });
+ new Thread("Async error worker") {
+ }.start();
+ return null;
+ }
+ @Override
+ public Response processControlCommand(ControlCommand command) throws Exception {
+ onControlCommand(command);
+ return null;
+ }
+ @Override
+ public Response processConnectionControl(ConnectionControl control) throws Exception {
+ onConnectionControl((ConnectionControl) command);
+ return null;
+ }
+ @Override
+ public Response processConsumerControl(ConsumerControl control) throws Exception {
+ onConsumerControl((ConsumerControl) command);
+ return null;
+ }
+ @Override
+ public Response processWireFormat(WireFormatInfo info) throws Exception {
+ onConsumerControl((ConsumerControl) command);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ onAsyncException(e);
+ }
+
}
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = (TransportListener) iter.next();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Mar 9 10:23:44 2007
@@ -40,8 +40,10 @@
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
@@ -1176,6 +1178,28 @@
public int getProtocolVersion() {
return protocolVersion.get();
+ }
+
+ public Response processControlCommand(ControlCommand command) throws Exception {
+ if (command.equals("shutdown"))
+ System.exit(0);
+ return null;
+ }
+
+ public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
+ return null;
+ }
+
+ public Response processConnectionControl(ConnectionControl control) throws Exception {
+ return null;
+ }
+
+ public Response processConnectionError(ConnectionError error) throws Exception {
+ return null;
+ }
+
+ public Response processConsumerControl(ConsumerControl control) throws Exception {
+ return null;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Fri Mar 9 10:23:44 2007
@@ -38,7 +38,7 @@
}
public Response visit(CommandVisitor visitor) throws Exception{
- return null;
+ return visitor.processConnectionControl(this);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionError.java Fri Mar 9 10:23:44 2007
@@ -37,7 +37,7 @@
}
public Response visit(CommandVisitor visitor) throws Exception {
- return null;
+ return visitor.processConnectionError(this);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConsumerControl.java Fri Mar 9 10:23:44 2007
@@ -41,16 +41,9 @@
}
-
-
-
public Response visit(CommandVisitor visitor) throws Exception {
- return null;
+ return visitor.processConsumerControl(this);
}
-
-
-
-
/**
* @openwire:property version=1
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ControlCommand.java Fri Mar 9 10:23:44 2007
@@ -48,8 +48,6 @@
}
public Response visit(CommandVisitor visitor) throws Exception {
- if (command.equals("shutdown"))
- System.exit(0);
- return null;
+ return visitor.processControlCommand(this);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java Fri Mar 9 10:23:44 2007
@@ -102,7 +102,7 @@
}
public Response visit(CommandVisitor visitor) throws Exception {
- return null;
+ return visitor.processMessageDispatch(this);
}
public Runnable getTransmitCallback() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java Fri Mar 9 10:23:44 2007
@@ -18,15 +18,20 @@
package org.apache.activemq.state;
import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
@@ -77,6 +82,11 @@
Response processEndTransaction(TransactionInfo info) throws Exception;
Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
Response processProducerAck(ProducerAck ack) throws Exception;
+ Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
+ Response processControlCommand(ControlCommand command) throws Exception;
+ Response processConnectionError(ConnectionError error) throws Exception;
+ Response processConnectionControl(ConnectionControl control) throws Exception;
+ Response processConsumerControl(ConsumerControl control) throws Exception;
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java?view=auto&rev=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java Fri Mar 9 10:23:44 2007
@@ -0,0 +1,186 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ControlCommand;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.FlushCommand;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+
+public class CommandVisitorAdapter implements CommandVisitor {
+
+ public Response processAddConnection(ConnectionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processAddConsumer(ConsumerInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processAddDestination(DestinationInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processAddProducer(ProducerInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processAddSession(SessionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processBeginTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processFlush(FlushCommand command) throws Exception {
+ return null;
+ }
+
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processMessage(Message send) throws Exception {
+ return null;
+ }
+
+ public Response processMessageAck(MessageAck ack) throws Exception {
+ return null;
+ }
+
+ public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
+ return null;
+ }
+
+ public Response processMessagePull(MessagePull pull) throws Exception {
+ return null;
+ }
+
+ public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processProducerAck(ProducerAck ack) throws Exception {
+ return null;
+ }
+
+ public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processRemoveConnection(ConnectionId id) throws Exception {
+ return null;
+ }
+
+ public Response processRemoveConsumer(ConsumerId id) throws Exception {
+ return null;
+ }
+
+ public Response processRemoveDestination(DestinationInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processRemoveProducer(ProducerId id) throws Exception {
+ return null;
+ }
+
+ public Response processRemoveSession(SessionId id) throws Exception {
+ return null;
+ }
+
+ public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processShutdown(ShutdownInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processWireFormat(WireFormatInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
+ return null;
+ }
+
+ public Response processControlCommand(ControlCommand command) throws Exception {
+ return null;
+ }
+
+ public Response processConnectionControl(ConnectionControl control) throws Exception {
+ return null;
+ }
+
+ public Response processConnectionError(ConnectionError error) throws Exception {
+ return null;
+ }
+
+ public Response processConsumerControl(ConsumerControl control) throws Exception {
+ return null;
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=516492&r1=516491&r2=516492
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Fri Mar 9 10:23:44 2007
@@ -19,42 +19,32 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.FlushCommand;
-import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IOExceptionSupport;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Tracks the state of a connection so a newly established transport can
* be re-initialized to the state that was tracked.
*
* @version $Revision$
*/
-public class ConnectionStateTracker implements CommandVisitor {
+public class ConnectionStateTracker extends CommandVisitorAdapter {
private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
@@ -311,9 +301,6 @@
return TRACKED_RESPONSE_MARKER;
}
- public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
- return null;
- }
public Response processMessage(Message send) throws Exception{
if(trackTransactions&&send!=null&&send.getTransactionId()!=null){
@@ -448,43 +435,6 @@
return null;
}
- public Response processRecoverTransactions(TransactionInfo info) {
- return null;
- }
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
- return null;
- }
-
-
- public Response processWireFormat(WireFormatInfo info) throws Exception {
- return null;
- }
- public Response processKeepAlive(KeepAliveInfo info) throws Exception {
- return null;
- }
- public Response processShutdown(ShutdownInfo info) throws Exception {
- return null;
- }
- public Response processBrokerInfo(BrokerInfo info) throws Exception {
- return null;
- }
-
- public Response processFlush(FlushCommand command) throws Exception {
- return null;
- }
-
- public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
- return null;
- }
-
- public Response processMessagePull(MessagePull pull) throws Exception {
- return null;
- }
-
- public Response processProducerAck(ProducerAck ack) throws Exception {
- return null;
- }
-
public boolean isRestoreConsumers() {
return restoreConsumers;
}
@@ -524,6 +474,5 @@
public void setRestoreTransaction(boolean restoreTransaction) {
this.restoreTransaction = restoreTransaction;
}
-
}