You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/11 09:03:33 UTC

svn commit: r783648 - in /activemq/sandbox/activemq-flow/activemq-openwire: ./ src/main/java/org/apache/activemq/broker/openwire/ src/main/java/org/apache/activemq/command/ src/main/java/org/apache/activemq/state/

Author: cmacnaug
Date: Thu Jun 11 07:03:33 2009
New Revision: 783648

URL: http://svn.apache.org/viewvc?rev=783648&view=rev
Log:
Openwire protocol handler was acking too early

Modified:
    activemq/sandbox/activemq-flow/activemq-openwire/   (props changed)
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Jun 11 07:03:33 2009
@@ -1,4 +1,3 @@
-
 .project
 .classpath
 .settings
@@ -7,3 +6,4 @@
 junit*.properties
 eclipse-classes
 target
+test-data

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 11 07:03:33 2009
@@ -57,6 +57,7 @@
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -126,24 +127,29 @@
                 return ack(info);
             }
 
-            public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+            public Response processRemoveConnection(RemoveInfo remove, ConnectionId info, long arg1) throws Exception {
+                ack(remove);
                 return null;
             }
 
-            public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+            public Response processRemoveSession(RemoveInfo remove, SessionId info, long arg1) throws Exception {
+                ack(remove);
                 return null;
             }
 
-            public Response processRemoveProducer(ProducerId info) throws Exception {
+            public Response processRemoveProducer(RemoveInfo remove, ProducerId info) throws Exception {
                 producers.remove(info);
+                //TODO add close logic?
+                ack(remove);
                 return null;
             }
 
-            public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+            public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
                 ConsumerContext ctx = consumers.remove(info);
-                if (ctx == null) {
-                    
+                if (ctx != null) {
+                    //TODO add close logic
                 }
+                ack(remove);
                 return null;
             }
 
@@ -320,14 +326,12 @@
 
     public void onCommand(Object o) {
 
-        final Command command = (Command) o;
+        Command command = (Command) o;
         boolean responseRequired = command.isResponseRequired();
         try {
-            Response response = command.visit(visitor);
             
-            if (responseRequired && response == null) {
-                ack(command);
-            }
+            command.visit(visitor);
+            
             
         } catch (Exception e) {
             if (responseRequired) {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java Thu Jun 11 07:03:33 2009
@@ -69,13 +69,13 @@
     public Response visit(CommandVisitor visitor) throws Exception {
         switch (objectId.getDataStructureType()) {
         case ConnectionId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveConnection((ConnectionId)objectId, lastDeliveredSequenceId);
+            return visitor.processRemoveConnection(this, (ConnectionId)objectId, lastDeliveredSequenceId);
         case SessionId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveSession((SessionId)objectId, lastDeliveredSequenceId);
+            return visitor.processRemoveSession(this, (SessionId)objectId, lastDeliveredSequenceId);
         case ConsumerId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveConsumer((ConsumerId)objectId, lastDeliveredSequenceId);
+            return visitor.processRemoveConsumer(this, (ConsumerId)objectId, lastDeliveredSequenceId);
         case ProducerId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveProducer((ProducerId)objectId);
+            return visitor.processRemoveProducer(this, (ProducerId)objectId);
         default:
             throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
         }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java Thu Jun 11 07:03:33 2009
@@ -36,6 +36,7 @@
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -54,13 +55,13 @@
 
     Response processAddConsumer(ConsumerInfo info) throws Exception;
 
-    Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception;
+    Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception;
 
-    Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception;
+    Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception;
 
-    Response processRemoveProducer(ProducerId id) throws Exception;
+    Response processRemoveProducer(RemoveInfo info,ProducerId id) throws Exception;
 
-    Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception;
+    Response processRemoveConsumer(RemoveInfo info,ConsumerId id, long lastDeliveredSequenceId) throws Exception;
 
     Response processAddDestination(DestinationInfo info) throws Exception;
 

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java Thu Jun 11 07:03:33 2009
@@ -36,6 +36,7 @@
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -127,11 +128,11 @@
         return null;
     }
 
-    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+    public Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception {
         return null;
     }
 
-    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
+    public Response processRemoveConsumer(RemoveInfo info, ConsumerId id, long lastDeliveredSequenceId) throws Exception {
         return null;
     }
 
@@ -139,11 +140,11 @@
         return null;
     }
 
-    public Response processRemoveProducer(ProducerId id) throws Exception {
+    public Response processRemoveProducer(RemoveInfo info, ProducerId id) throws Exception {
         return null;
     }
 
-    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
+    public Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception {
         return null;
     }