You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/19 02:49:20 UTC
svn commit: r786344 -
/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Author: cmacnaug
Date: Fri Jun 19 00:49:19 2009
New Revision: 786344
URL: http://svn.apache.org/viewvc?rev=786344&view=rev
Log:
Changing isRemoveOnDispatch to return false for qeueue receivers. Otherwise messages are removed from the queue too soon.
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786344&r1=786343&r2=786344&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 00:49:19 2009
@@ -100,12 +100,12 @@
private Router router;
private VirtualHost host;
private final CommandVisitor visitor;
-
- ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>();
+
+ ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>();
public OpenwireProtocolHandler() {
setStoreWireFormat(new OpenWireFormat());
-
+
visitor = new CommandVisitor() {
// /////////////////////////////////////////////////////////////////
@@ -152,7 +152,7 @@
public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception {
ConsumerContext ctx = consumers.remove(info);
if (ctx != null) {
- ctx.stop();
+ ctx.stop();
}
ack(remove);
return null;
@@ -162,10 +162,10 @@
// Message Processing Methods.
// /////////////////////////////////////////////////////////////////
public Response processMessage(Message info) throws Exception {
- if( info.getOriginalDestination() == null ) {
- info.setOriginalDestination(info.getDestination());
- }
-
+ if (info.getOriginalDestination() == null) {
+ info.setOriginalDestination(info.getDestination());
+ }
+
ProducerId producerId = info.getProducerId();
ProducerContext producerContext = producers.get(producerId);
@@ -214,10 +214,10 @@
// broker.
BrokerInfo brokerInfo = new BrokerInfo();
Broker broker = connection.getBroker();
- brokerInfo.setBrokerId(new BrokerId(broker.getName()));
+ brokerInfo.setBrokerId(new BrokerId(broker.getName()));
brokerInfo.setBrokerName(broker.getName());
- if( !broker.getConnectUris().isEmpty() ) {
- brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
+ if (!broker.getConnectUris().isEmpty()) {
+ brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
}
connection.write(brokerInfo);
return ack(info);
@@ -256,13 +256,13 @@
// Methods for server management
// /////////////////////////////////////////////////////////////////
public Response processAddDestination(DestinationInfo info) throws Exception {
- ActiveMQDestination destination = info.getDestination();
- if( destination.isTemporary() ) {
- // Keep track of it so that we can remove them this connection
- // shuts down.
- temporaryDestinations.add(destination);
- }
- host.createQueue(destination);
+ ActiveMQDestination destination = info.getDestination();
+ if (destination.isTemporary()) {
+ // Keep track of it so that we can remove them this connection
+ // shuts down.
+ temporaryDestinations.add(destination);
+ }
+ host.createQueue(destination);
return ack(info);
}
@@ -348,6 +348,7 @@
Command command = (Command) o;
boolean responseRequired = command.isResponseRequired();
try {
+ //System.out.println(o);
command.visit(visitor);
} catch (Exception e) {
if (responseRequired) {
@@ -449,7 +450,7 @@
private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
- private BrokerSubscription brokerSubscription;
+ private BrokerSubscription brokerSubscription;
public ConsumerContext(final ConsumerInfo info) throws Exception {
this.info = info;
@@ -464,22 +465,22 @@
}
};
+ isQueueReceiver = info.getDestination().isQueue();
+
controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
controller.useOverFlowQueue(false);
controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
super.onFlowOpened(controller);
}
-
- public void start() throws Exception {
+ public void start() throws Exception {
brokerSubscription = host.createSubscription(this);
brokerSubscription.connect(this);
- }
-
- public void stop() throws Exception {
- brokerSubscription.disconnect(this);
- }
+ }
+ public void stop() throws Exception {
+ brokerSubscription.disconnect(this);
+ }
public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
if (!controller.offer(message, source)) {
@@ -587,7 +588,10 @@
* .Object)
*/
public boolean isRemoveOnDispatch(MessageDelivery elem) {
- return !elem.isPersistent() || !(isDurable || isQueueReceiver);
+ if (isQueueReceiver()) {
+ return false;
+ }
+ return !elem.isPersistent() || !isDurable;
}
/*
@@ -696,9 +700,9 @@
return offer(message, source, null);
}
- public boolean autoCreateDestination() {
- return true;
- }
+ public boolean autoCreateDestination() {
+ return true;
+ }
}