You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/11 09:03:33 UTC
svn commit: r783648 - in /activemq/sandbox/activemq-flow/activemq-openwire:
./ src/main/java/org/apache/activemq/broker/openwire/
src/main/java/org/apache/activemq/command/
src/main/java/org/apache/activemq/state/
Author: cmacnaug
Date: Thu Jun 11 07:03:33 2009
New Revision: 783648
URL: http://svn.apache.org/viewvc?rev=783648&view=rev
Log:
Openwire protocol handler was acking too early
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/ (props changed)
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
Propchange: activemq/sandbox/activemq-flow/activemq-openwire/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Jun 11 07:03:33 2009
@@ -1,4 +1,3 @@
-
.project
.classpath
.settings
@@ -7,3 +6,4 @@
junit*.properties
eclipse-classes
target
+test-data
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 11 07:03:33 2009
@@ -57,6 +57,7 @@
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
@@ -126,24 +127,29 @@
return ack(info);
}
- public Response processRemoveConnection(ConnectionId info, long arg1) throws Exception {
+ public Response processRemoveConnection(RemoveInfo remove, ConnectionId info, long arg1) throws Exception {
+ ack(remove);
return null;
}
- public Response processRemoveSession(SessionId info, long arg1) throws Exception {
+ public Response processRemoveSession(RemoveInfo remove, SessionId info, long arg1) throws Exception {
+ ack(remove);
return null;
}
- public Response processRemoveProducer(ProducerId info) throws Exception {
+ public Response processRemoveProducer(RemoveInfo remove, ProducerId info) throws Exception {
producers.remove(info);
+ //TODO add close logic?
+ ack(remove);
return null;
}
- public Response processRemoveConsumer(ConsumerId info, long arg1) throws Exception {
+ public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
ConsumerContext ctx = consumers.remove(info);
- if (ctx == null) {
-
+ if (ctx != null) {
+ //TODO add close logic
}
+ ack(remove);
return null;
}
@@ -320,14 +326,12 @@
public void onCommand(Object o) {
- final Command command = (Command) o;
+ Command command = (Command) o;
boolean responseRequired = command.isResponseRequired();
try {
- Response response = command.visit(visitor);
- if (responseRequired && response == null) {
- ack(command);
- }
+ command.visit(visitor);
+
} catch (Exception e) {
if (responseRequired) {
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java Thu Jun 11 07:03:33 2009
@@ -69,13 +69,13 @@
public Response visit(CommandVisitor visitor) throws Exception {
switch (objectId.getDataStructureType()) {
case ConnectionId.DATA_STRUCTURE_TYPE:
- return visitor.processRemoveConnection((ConnectionId)objectId, lastDeliveredSequenceId);
+ return visitor.processRemoveConnection(this, (ConnectionId)objectId, lastDeliveredSequenceId);
case SessionId.DATA_STRUCTURE_TYPE:
- return visitor.processRemoveSession((SessionId)objectId, lastDeliveredSequenceId);
+ return visitor.processRemoveSession(this, (SessionId)objectId, lastDeliveredSequenceId);
case ConsumerId.DATA_STRUCTURE_TYPE:
- return visitor.processRemoveConsumer((ConsumerId)objectId, lastDeliveredSequenceId);
+ return visitor.processRemoveConsumer(this, (ConsumerId)objectId, lastDeliveredSequenceId);
case ProducerId.DATA_STRUCTURE_TYPE:
- return visitor.processRemoveProducer((ProducerId)objectId);
+ return visitor.processRemoveProducer(this, (ProducerId)objectId);
default:
throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitor.java Thu Jun 11 07:03:33 2009
@@ -36,6 +36,7 @@
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
@@ -54,13 +55,13 @@
Response processAddConsumer(ConsumerInfo info) throws Exception;
- Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception;
+ Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception;
- Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception;
+ Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception;
- Response processRemoveProducer(ProducerId id) throws Exception;
+ Response processRemoveProducer(RemoveInfo info,ProducerId id) throws Exception;
- Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception;
+ Response processRemoveConsumer(RemoveInfo info,ConsumerId id, long lastDeliveredSequenceId) throws Exception;
Response processAddDestination(DestinationInfo info) throws Exception;
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java?rev=783648&r1=783647&r2=783648&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/state/CommandVisitorAdapter.java Thu Jun 11 07:03:33 2009
@@ -36,6 +36,7 @@
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
@@ -127,11 +128,11 @@
return null;
}
- public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
+ public Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception {
return null;
}
- public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
+ public Response processRemoveConsumer(RemoveInfo info, ConsumerId id, long lastDeliveredSequenceId) throws Exception {
return null;
}
@@ -139,11 +140,11 @@
return null;
}
- public Response processRemoveProducer(ProducerId id) throws Exception {
+ public Response processRemoveProducer(RemoveInfo info, ProducerId id) throws Exception {
return null;
}
- public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
+ public Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception {
return null;
}