You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 20:31:09 UTC

svn commit: r786196 - /activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Author: chirino
Date: Thu Jun 18 18:31:08 2009
New Revision: 786196

URL: http://svn.apache.org/viewvc?rev=786196&view=rev
Log:
Fix for the testConsumerClose test.

Modified:
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786196&r1=786195&r2=786196&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 18:31:08 2009
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.LinkedList;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.apollo.WindowLimiter;
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerConnection;
@@ -127,6 +128,7 @@
             public Response processAddConsumer(ConsumerInfo info) throws Exception {
                 ConsumerContext ctx = new ConsumerContext(info);
                 consumers.put(info.getConsumerId(), ctx);
+                ctx.start();
                 return ack(info);
             }
 
@@ -150,7 +152,7 @@
             public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
                 ConsumerContext ctx = consumers.remove(info);
                 if (ctx != null) {
-                    //TODO add close logic
+                	ctx.stop();
                 }
                 ack(remove);
                 return null;
@@ -434,7 +436,7 @@
         }
     }
 
-    class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext {
+    class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext, Service {
 
         private final ConsumerInfo info;
         private String name;
@@ -445,8 +447,9 @@
         private final FlowController<MessageDelivery> controller;
         private final WindowLimiter<MessageDelivery> limiter;
 
-        HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
-        LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+        private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
+        private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
+		private BrokerSubscription brokerSubscription;
 
         public ConsumerContext(final ConsumerInfo info) throws Exception {
             this.info = info;
@@ -465,11 +468,19 @@
             controller.useOverFlowQueue(false);
             controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
             super.onFlowOpened(controller);
-
-            BrokerSubscription sub = host.createSubscription(this);
-            sub.connect(this);
         }
 
+        
+		public void start() throws Exception {
+            brokerSubscription = host.createSubscription(this);
+            brokerSubscription.connect(this);
+		}
+
+		public void stop() throws Exception {
+			brokerSubscription.disconnect(this);
+		}
+
+
         public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
             if (!controller.offer(message, source)) {
                 return false;