You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/07 20:00:26 UTC
[1/3] activemq-artemis git commit: ARTEMIS-468 Fix openwire
redelivery related regressions under integration-tests
Repository: activemq-artemis
Updated Branches:
refs/heads/master cf4636e96 -> c4a092c1c
ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8a998ad8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8a998ad8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8a998ad8
Branch: refs/heads/master
Commit: 8a998ad805e88ef2bf4cdb01c19c1d0ba1217a30
Parents: cf4636e
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Apr 6 20:59:57 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 7 12:12:28 2016 -0400
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 8 ++++--
.../core/protocol/openwire/amq/AMQConsumer.java | 30 ++++++++++++++++----
.../artemis/core/server/ConsumerListener.java | 23 +++++++++++++++
.../core/server/impl/ServerConsumerImpl.java | 6 ++++
.../core/server/impl/ServerSessionImpl.java | 1 +
5 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 53464cc..4516253 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
- private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
+ public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter {
}
public static MessageDispatch createMessageDispatch(ServerMessage message,
- int deliveryCount,
AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
+ //we can use core message id for sequenceId
+ amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
+ md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
+ md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
md.setMessage(amqMessage);
- md.setRedeliveryCounter(deliveryCount);
ActiveMQDestination destination = amqMessage.getDestination();
md.setDestination(destination);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3093ed8..81cdec8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat;
-public class AMQConsumer {
+public class AMQConsumer implements ConsumerListener {
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
@@ -108,7 +110,6 @@ public class AMQConsumer {
}
serverConsumer.setProtocolData(this);
-
}
private SimpleString createTopicSubscription(boolean isDurable,
@@ -184,8 +185,8 @@ public class AMQConsumer {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
return 0;
}
- //decrement deliveryCount as AMQ client tends to add 1.
- dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
+
+ dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
@@ -215,7 +216,6 @@ public class AMQConsumer {
* Notice that we will start a new transaction on the cases where there is no transaction. */
public void acknowledge(MessageAck ack) throws Exception {
-
MessageId first = ack.getFirstMessageId();
MessageId last = ack.getLastMessageId();
@@ -252,6 +252,10 @@ public class AMQConsumer {
}
else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
+ Throwable poisonCause = ack.getPoisonCause();
+ if (poisonCause != null) {
+ ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
+ }
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
}
@@ -303,6 +307,22 @@ public class AMQConsumer {
}
}
+ @Override
+ public void updateForCanceledRef(MessageReference ref) {
+ long seqId = ref.getMessage().getMessageID();
+ long lastDelSeqId = info.getLastDeliveredSequenceId();
+ ServerMessage coreMessage = ref.getMessage();
+ int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
+ if (openwireDestination.isTopic()) {
+ redeliveryCounter++;
+ coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+ }
+ else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) {
+ redeliveryCounter++;
+ coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+ }
+ }
+
/**
* The MessagePullHandler is used with slow consumer policies.
* */
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
new file mode 100644
index 0000000..2b2be9c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/**
+ */
+public interface ConsumerListener {
+ void updateForCanceledRef(MessageReference ref);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7860ed8..12ca54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties;
* Concrete implementation of a ClientConsumer.
*/
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
+ //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
// Constants ------------------------------------------------------------------------------------
private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// before failure
ref.decrementDeliveryCount();
}
+
+ if (this.protocolData instanceof ConsumerListener) {
+ ((ConsumerListener)protocolData).updateForCanceledRef(ref);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 31102aa..7a817f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected void doClose(final boolean failed) throws Exception {
synchronized (this) {
+ this.setStarted(false);
if (closed)
return;
[2/3] activemq-artemis git commit: ARTEMIS-468 Amendments to how
redelivery count is handled on openwire
Posted by cl...@apache.org.
ARTEMIS-468 Amendments to how redelivery count is handled on openwire
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/50eac7c8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/50eac7c8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/50eac7c8
Branch: refs/heads/master
Commit: 50eac7c824e586aa858fb1f56676feffd4be7523
Parents: 8a998ad
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 7 12:12:16 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 7 13:56:31 2016 -0400
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 5 +++++
.../core/protocol/mqtt/MQTTSessionCallback.java | 6 +++++
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../openwire/OpenWireMessageConverter.java | 16 +++++---------
.../core/protocol/openwire/amq/AMQConsumer.java | 22 ++++++-------------
.../core/protocol/openwire/amq/AMQSession.java | 13 ++++++++++-
.../core/protocol/stomp/StompSession.java | 5 +++++
.../protocol/core/impl/CoreSessionCallback.java | 5 +++++
.../artemis/core/server/ConsumerListener.java | 23 --------------------
.../core/server/impl/ServerConsumerImpl.java | 21 +++++++++---------
.../spi/core/protocol/SessionCallback.java | 9 ++++++++
.../integration/client/HangConsumerTest.java | 5 +++++
12 files changed, 71 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 55bade9..1c6ea01 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -344,6 +344,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ return false;
+ }
+
+ @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 82b1ed6..57cb7fe 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -54,6 +54,12 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ return false;
+ }
+
+
+ @Override
public int sendLargeMessageContinuation(ServerConsumer consumerID,
byte[] body,
boolean continues,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3ccb98d..818d305 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
throw new IllegalStateException("Session not exist! : " + sessionId);
}
- List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+ List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection());
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 4516253..cfe5b47 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
- public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter {
}
}
- coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter());
ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
ByteSequence replyToBytes = marshaller.marshal(replyTo);
@@ -445,15 +444,15 @@ public class OpenWireMessageConverter implements MessageConverter {
}
}
- public static MessageDispatch createMessageDispatch(ServerMessage message,
+ public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message,
AMQConsumer consumer) throws IOException, JMSException {
- ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
+ ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
- md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
+ md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
md.setMessage(amqMessage);
ActiveMQDestination destination = amqMessage.getDestination();
@@ -462,7 +461,7 @@ public class OpenWireMessageConverter implements MessageConverter {
return md;
}
- private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
+ private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null;
byte coreType = coreMessage.getType();
switch (coreType) {
@@ -762,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
}
- Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER);
- if (redeliveryCounter != null) {
- amqMsg.setRedeliveryCounter(redeliveryCounter);
- }
+ amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1);
byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
if (replyToBytes != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 81cdec8..01820d6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -44,10 +43,9 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat;
-public class AMQConsumer implements ConsumerListener {
+public class AMQConsumer {
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
@@ -186,7 +184,7 @@ public class AMQConsumer implements ConsumerListener {
return 0;
}
- dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
+ dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
@@ -307,19 +305,13 @@ public class AMQConsumer implements ConsumerListener {
}
}
- @Override
- public void updateForCanceledRef(MessageReference ref) {
+ public void updateDeliveryCountAfterCancel(MessageReference ref) {
long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
- ServerMessage coreMessage = ref.getMessage();
- int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
- if (openwireDestination.isTopic()) {
- redeliveryCounter++;
- coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
- }
- else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) {
- redeliveryCounter++;
- coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+
+ // This is a specific rule of the protocol
+ if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
+ ref.decrementDeliveryCount();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 74dd951..84354cd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback {
}
+ @Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ if (consumer.getProtocolData() != null) {
+ ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
+ return true;
+ }
+ else {
+ return false;
+ }
+
+ }
+
public List<AMQConsumer> createConsumer(ConsumerInfo info,
- AMQSession amqSession,
SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination
ActiveMQDestination dest = info.getDestination();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 9b5c70d..8db5720 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -119,6 +119,11 @@ public class StompSession implements SessionCallback {
}
@Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ return false;
+ }
+
+ @Override
public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
LargeServerMessageImpl largeMessage = null;
ServerMessage newServerMessage = serverMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 9d6125b..f4d69d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -57,6 +57,11 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ return false;
+ }
+
+ @Override
public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
deleted file mode 100644
index 2b2be9c..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-/**
- */
-public interface ConsumerListener {
- void updateForCanceledRef(MessageReference ref);
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 12ca54c..5fb6018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -367,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.incrementDeliveryCount();
// If updateDeliveries = false (set by strict-update),
- // the updateDeliveryCount would still be updated after c
+ // the updateDeliveryCountAfterCancel would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged()) {
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
!ref.getQueue().isInternalQueue() &&
@@ -596,15 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
- if (!failed) {
- // We don't decrement delivery count if the client failed, since there's a possibility that refs
- // were actually delivered but we just didn't get any acks for them
- // before failure
- ref.decrementDeliveryCount();
- }
-
- if (this.protocolData instanceof ConsumerListener) {
- ((ConsumerListener)protocolData).updateForCanceledRef(ref);
+ // We first update the deliveryCount at the protocol callback...
+ // if that wasn't updated (if there is no specific logic, then we apply the default logic used on most protocols
+ if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) {
+ if (!failed) {
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
+ ref.decrementDeliveryCount();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index cf0ec69..9f23f80 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -33,6 +33,15 @@ public interface SessionCallback {
* like acks or other operations. */
void afterDelivery() throws Exception;
+ /**
+ * Use this to updates specifics on the message after a redelivery happened.
+ * Return true if there was specific logic applied on the protocol, so the ServerConsumer won't make any adjustments.
+ * @param consumer
+ * @param ref
+ * @param failed
+ */
+ boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed);
+
void sendProducerCreditsMessage(int credits, SimpleString address);
void sendProducerCreditsFailMessage(int credits, SimpleString address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index a3bae65..2c88896 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+ return false;
+ }
+
+ @Override
public void browserFinished(ServerConsumer consumer) {
}
[3/3] activemq-artemis git commit: This closes #440
Posted by cl...@apache.org.
This closes #440
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4a092c1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4a092c1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4a092c1
Branch: refs/heads/master
Commit: c4a092c1c80fe83c475cddba8ae4ad5dba37c5ce
Parents: cf4636e 50eac7c
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 7 14:00:06 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 7 14:00:06 2016 -0400
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 5 +++++
.../core/protocol/mqtt/MQTTSessionCallback.java | 6 ++++++
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../openwire/OpenWireMessageConverter.java | 20 +++++++++-----------
.../core/protocol/openwire/amq/AMQConsumer.java | 20 ++++++++++++++++----
.../core/protocol/openwire/amq/AMQSession.java | 13 ++++++++++++-
.../core/protocol/stomp/StompSession.java | 5 +++++
.../protocol/core/impl/CoreSessionCallback.java | 5 +++++
.../core/server/impl/ServerConsumerImpl.java | 17 +++++++++++------
.../core/server/impl/ServerSessionImpl.java | 1 +
.../spi/core/protocol/SessionCallback.java | 9 +++++++++
.../integration/client/HangConsumerTest.java | 5 +++++
12 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------