You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 20:31:09 UTC
svn commit: r786196 -
/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Author: chirino
Date: Thu Jun 18 18:31:08 2009
New Revision: 786196
URL: http://svn.apache.org/viewvc?rev=786196&view=rev
Log:
Fix for the testConsumerClose test.
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786196&r1=786195&r2=786196&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 18:31:08 2009
@@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.LinkedList;
+import org.apache.activemq.Service;
import org.apache.activemq.apollo.WindowLimiter;
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerConnection;
@@ -127,6 +128,7 @@
public Response processAddConsumer(ConsumerInfo info) throws Exception {
ConsumerContext ctx = new ConsumerContext(info);
consumers.put(info.getConsumerId(), ctx);
+ ctx.start();
return ack(info);
}
@@ -150,7 +152,7 @@
public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
ConsumerContext ctx = consumers.remove(info);
if (ctx != null) {
- //TODO add close logic
+ ctx.stop();
}
ack(remove);
return null;
@@ -434,7 +436,7 @@
}
}
- class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext {
+ class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext, Service {
private final ConsumerInfo info;
private String name;
@@ -445,8 +447,9 @@
private final FlowController<MessageDelivery> controller;
private final WindowLimiter<MessageDelivery> limiter;
- HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
- LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+ private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
+ private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+ private BrokerSubscription brokerSubscription;
public ConsumerContext(final ConsumerInfo info) throws Exception {
this.info = info;
@@ -465,11 +468,19 @@
controller.useOverFlowQueue(false);
controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
super.onFlowOpened(controller);
-
- BrokerSubscription sub = host.createSubscription(this);
- sub.connect(this);
}
+
+ public void start() throws Exception {
+ brokerSubscription = host.createSubscription(this);
+ brokerSubscription.connect(this);
+ }
+
+ public void stop() throws Exception {
+ brokerSubscription.disconnect(this);
+ }
+
+
public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
if (!controller.offer(message, source)) {
return false;