You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/19 02:49:20 UTC

svn commit: r786344 - /activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Author: cmacnaug
Date: Fri Jun 19 00:49:19 2009
New Revision: 786344

URL: http://svn.apache.org/viewvc?rev=786344&view=rev
Log:
Changing isRemoveOnDispatch to return false for qeueue receivers. Otherwise messages are removed from the queue too soon.

Modified:
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786344&r1=786343&r2=786344&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 00:49:19 2009
@@ -100,12 +100,12 @@
     private Router router;
     private VirtualHost host;
     private final CommandVisitor visitor;
-    
-    ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>(); 
+
+    ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>();
 
     public OpenwireProtocolHandler() {
         setStoreWireFormat(new OpenWireFormat());
-        
+
         visitor = new CommandVisitor() {
 
             // /////////////////////////////////////////////////////////////////
@@ -152,7 +152,7 @@
             public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
                 ConsumerContext ctx = consumers.remove(info);
                 if (ctx != null) {
-                	ctx.stop();
+                    ctx.stop();
                 }
                 ack(remove);
                 return null;
@@ -162,10 +162,10 @@
             // Message Processing Methods.
             // /////////////////////////////////////////////////////////////////
             public Response processMessage(Message info) throws Exception {
-            	if( info.getOriginalDestination() == null ) {
-            		info.setOriginalDestination(info.getDestination());
-            	}
-            	
+                if (info.getOriginalDestination() == null) {
+                    info.setOriginalDestination(info.getDestination());
+                }
+
                 ProducerId producerId = info.getProducerId();
                 ProducerContext producerContext = producers.get(producerId);
 
@@ -214,10 +214,10 @@
                 // broker.
                 BrokerInfo brokerInfo = new BrokerInfo();
                 Broker broker = connection.getBroker();
-				brokerInfo.setBrokerId(new BrokerId(broker.getName()));
+                brokerInfo.setBrokerId(new BrokerId(broker.getName()));
                 brokerInfo.setBrokerName(broker.getName());
-                if( !broker.getConnectUris().isEmpty() ) {
-                	brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
+                if (!broker.getConnectUris().isEmpty()) {
+                    brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
                 }
                 connection.write(brokerInfo);
                 return ack(info);
@@ -256,13 +256,13 @@
             // Methods for server management
             // /////////////////////////////////////////////////////////////////
             public Response processAddDestination(DestinationInfo info) throws Exception {
-            	ActiveMQDestination destination = info.getDestination();
-				if( destination.isTemporary() ) {
-					// Keep track of it so that we can remove them this connection 
-					// shuts down.
-            		temporaryDestinations.add(destination);
-            	}
-            	host.createQueue(destination);
+                ActiveMQDestination destination = info.getDestination();
+                if (destination.isTemporary()) {
+                    // Keep track of it so that we can remove them this connection 
+                    // shuts down.
+                    temporaryDestinations.add(destination);
+                }
+                host.createQueue(destination);
                 return ack(info);
             }
 
@@ -348,6 +348,7 @@
         Command command = (Command) o;
         boolean responseRequired = command.isResponseRequired();
         try {
+            //System.out.println(o);
             command.visit(visitor);
         } catch (Exception e) {
             if (responseRequired) {
@@ -449,7 +450,7 @@
 
         private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
         private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
-		private BrokerSubscription brokerSubscription;
+        private BrokerSubscription brokerSubscription;
 
         public ConsumerContext(final ConsumerInfo info) throws Exception {
             this.info = info;
@@ -464,22 +465,22 @@
                 }
             };
 
+            isQueueReceiver = info.getDestination().isQueue();
+
             controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
             controller.useOverFlowQueue(false);
             controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
             super.onFlowOpened(controller);
         }
 
-        
-		public void start() throws Exception {
+        public void start() throws Exception {
             brokerSubscription = host.createSubscription(this);
             brokerSubscription.connect(this);
-		}
-
-		public void stop() throws Exception {
-			brokerSubscription.disconnect(this);
-		}
+        }
 
+        public void stop() throws Exception {
+            brokerSubscription.disconnect(this);
+        }
 
         public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
             if (!controller.offer(message, source)) {
@@ -587,7 +588,10 @@
          * .Object)
          */
         public boolean isRemoveOnDispatch(MessageDelivery elem) {
-            return !elem.isPersistent() || !(isDurable || isQueueReceiver);
+            if (isQueueReceiver()) {
+                return false;
+            }
+            return !elem.isPersistent() || !isDurable;
         }
 
         /*
@@ -696,9 +700,9 @@
             return offer(message, source, null);
         }
 
-		public boolean autoCreateDestination() {
-			return true;
-		}
+        public boolean autoCreateDestination() {
+            return true;
+        }
 
     }