You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/07 20:00:26 UTC

[1/3] activemq-artemis git commit: ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests

Repository: activemq-artemis
Updated Branches:
  refs/heads/master cf4636e96 -> c4a092c1c


ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8a998ad8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8a998ad8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8a998ad8

Branch: refs/heads/master
Commit: 8a998ad805e88ef2bf4cdb01c19c1d0ba1217a30
Parents: cf4636e
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Apr 6 20:59:57 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 7 12:12:28 2016 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      |  8 ++++--
 .../core/protocol/openwire/amq/AMQConsumer.java | 30 ++++++++++++++++----
 .../artemis/core/server/ConsumerListener.java   | 23 +++++++++++++++
 .../core/server/impl/ServerConsumerImpl.java    |  6 ++++
 .../core/server/impl/ServerSessionImpl.java     |  1 +
 5 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 53464cc..4516253 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
    private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
    private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
-   private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
+   public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
    private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
 
    private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter {
    }
 
    public static MessageDispatch createMessageDispatch(ServerMessage message,
-                                                       int deliveryCount,
                                                        AMQConsumer consumer) throws IOException, JMSException {
       ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
+      //we can use core message id for sequenceId
+      amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());
+      md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
+      md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
       md.setMessage(amqMessage);
-      md.setRedeliveryCounter(deliveryCount);
       ActiveMQDestination destination = amqMessage.getDestination();
       md.setDestination(destination);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3093ed8..81cdec8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.wireformat.WireFormat;
 
-public class AMQConsumer {
+public class AMQConsumer implements ConsumerListener {
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
@@ -108,7 +110,6 @@ public class AMQConsumer {
       }
 
       serverConsumer.setProtocolData(this);
-
    }
 
    private SimpleString createTopicSubscription(boolean isDurable,
@@ -184,8 +185,8 @@ public class AMQConsumer {
          if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
             return 0;
          }
-         //decrement deliveryCount as AMQ client tends to add 1.
-         dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this);
+
+         dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
@@ -215,7 +216,6 @@ public class AMQConsumer {
     *  Notice that we will start a new transaction on the cases where there is no transaction. */
    public void acknowledge(MessageAck ack) throws Exception {
 
-
       MessageId first = ack.getFirstMessageId();
       MessageId last = ack.getLastMessageId();
 
@@ -252,6 +252,10 @@ public class AMQConsumer {
          }
          else if (ack.isPoisonAck()) {
             for (MessageReference ref : ackList) {
+               Throwable poisonCause = ack.getPoisonCause();
+               if (poisonCause != null) {
+                  ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
+               }
                ref.getQueue().sendToDeadLetterAddress(transaction, ref);
             }
          }
@@ -303,6 +307,22 @@ public class AMQConsumer {
       }
    }
 
+   @Override
+   public void updateForCanceledRef(MessageReference ref) {
+      long seqId = ref.getMessage().getMessageID();
+      long lastDelSeqId = info.getLastDeliveredSequenceId();
+      ServerMessage coreMessage = ref.getMessage();
+      int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
+      if (openwireDestination.isTopic()) {
+         redeliveryCounter++;
+         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+      }
+      else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) {
+         redeliveryCounter++;
+         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+      }
+   }
+
    /**
     * The MessagePullHandler is used with slow consumer policies.
     * */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
new file mode 100644
index 0000000..2b2be9c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/**
+ */
+public interface ConsumerListener {
+   void updateForCanceledRef(MessageReference ref);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7860ed8..12ca54c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties;
  * Concrete implementation of a ClientConsumer.
  */
 public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
+   //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
    // Constants ------------------------------------------------------------------------------------
 
    private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          // before failure
          ref.decrementDeliveryCount();
       }
+
+      if (this.protocolData instanceof ConsumerListener) {
+         ((ConsumerListener)protocolData).updateForCanceledRef(ref);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a998ad8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 31102aa..7a817f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    protected void doClose(final boolean failed) throws Exception {
       synchronized (this) {
+         this.setStarted(false);
          if (closed)
             return;
 


[2/3] activemq-artemis git commit: ARTEMIS-468 Amendments to how redelivery count is handled on openwire

Posted by cl...@apache.org.
ARTEMIS-468 Amendments to how redelivery count is handled on openwire


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/50eac7c8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/50eac7c8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/50eac7c8

Branch: refs/heads/master
Commit: 50eac7c824e586aa858fb1f56676feffd4be7523
Parents: 8a998ad
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 7 12:12:16 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 7 13:56:31 2016 -0400

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |  5 +++++
 .../core/protocol/mqtt/MQTTSessionCallback.java |  6 +++++
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../openwire/OpenWireMessageConverter.java      | 16 +++++---------
 .../core/protocol/openwire/amq/AMQConsumer.java | 22 ++++++-------------
 .../core/protocol/openwire/amq/AMQSession.java  | 13 ++++++++++-
 .../core/protocol/stomp/StompSession.java       |  5 +++++
 .../protocol/core/impl/CoreSessionCallback.java |  5 +++++
 .../artemis/core/server/ConsumerListener.java   | 23 --------------------
 .../core/server/impl/ServerConsumerImpl.java    | 21 +++++++++---------
 .../spi/core/protocol/SessionCallback.java      |  9 ++++++++
 .../integration/client/HangConsumerTest.java    |  5 +++++
 12 files changed, 71 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 55bade9..1c6ea01 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -344,6 +344,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+      return false;
+   }
+
+   @Override
    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 82b1ed6..57cb7fe 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -54,6 +54,12 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+      return false;
+   }
+
+
+   @Override
    public int sendLargeMessageContinuation(ServerConsumer consumerID,
                                            byte[] body,
                                            boolean continues,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 3ccb98d..818d305 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             throw new IllegalStateException("Session not exist! : " + sessionId);
          }
 
-         List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+         List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection());
 
          this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
          ss.addConsumer(info);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 4516253..cfe5b47 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
    private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
    private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
-   public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
    private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
 
    private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter {
          }
       }
 
-      coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter());
       ActiveMQDestination replyTo = messageSend.getReplyTo();
       if (replyTo != null) {
          ByteSequence replyToBytes = marshaller.marshal(replyTo);
@@ -445,15 +444,15 @@ public class OpenWireMessageConverter implements MessageConverter {
       }
    }
 
-   public static MessageDispatch createMessageDispatch(ServerMessage message,
+   public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message,
                                                        AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());
-      md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
+      md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
       md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
       md.setMessage(amqMessage);
       ActiveMQDestination destination = amqMessage.getDestination();
@@ -462,7 +461,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       return md;
    }
 
-   private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
+   private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
       ActiveMQMessage amqMsg = null;
       byte coreType = coreMessage.getType();
       switch (coreType) {
@@ -762,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter {
          amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
       }
 
-      Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER);
-      if (redeliveryCounter != null) {
-         amqMsg.setRedeliveryCounter(redeliveryCounter);
-      }
+      amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1);
 
       byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
       if (replyToBytes != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 81cdec8..01820d6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -44,10 +43,9 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.wireformat.WireFormat;
 
-public class AMQConsumer implements ConsumerListener {
+public class AMQConsumer {
    private AMQSession session;
    private org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private ConsumerInfo info;
@@ -186,7 +184,7 @@ public class AMQConsumer implements ConsumerListener {
             return 0;
          }
 
-         dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
+         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
@@ -307,19 +305,13 @@ public class AMQConsumer implements ConsumerListener {
       }
    }
 
-   @Override
-   public void updateForCanceledRef(MessageReference ref) {
+   public void updateDeliveryCountAfterCancel(MessageReference ref) {
       long seqId = ref.getMessage().getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
-      ServerMessage coreMessage = ref.getMessage();
-      int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
-      if (openwireDestination.isTopic()) {
-         redeliveryCounter++;
-         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
-      }
-      else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) {
-         redeliveryCounter++;
-         coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
+
+      // This is a specific rule of the protocol
+      if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
+         ref.decrementDeliveryCount();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 74dd951..84354cd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback {
 
    }
 
+   @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+      if (consumer.getProtocolData() != null) {
+         ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
+         return true;
+      }
+      else {
+         return false;
+      }
+
+   }
+
    public List<AMQConsumer> createConsumer(ConsumerInfo info,
-                                           AMQSession amqSession,
                                            SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
       //check destination
       ActiveMQDestination dest = info.getDestination();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 9b5c70d..8db5720 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -119,6 +119,11 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+      return false;
+   }
+
+   @Override
    public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
       LargeServerMessageImpl largeMessage = null;
       ServerMessage newServerMessage = serverMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 9d6125b..f4d69d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -57,6 +57,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
+   public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+      return false;
+   }
+
+   @Override
    public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
       Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
deleted file mode 100644
index 2b2be9c..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-/**
- */
-public interface ConsumerListener {
-   void updateForCanceledRef(MessageReference ref);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 12ca54c..5fb6018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ConsumerListener;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -367,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             ref.incrementDeliveryCount();
 
             // If updateDeliveries = false (set by strict-update),
-            // the updateDeliveryCount would still be updated after c
+            // the updateDeliveryCountAfterCancel would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged()) {
                if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
                   !ref.getQueue().isInternalQueue() &&
@@ -596,15 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
-      if (!failed) {
-         // We don't decrement delivery count if the client failed, since there's a possibility that refs
-         // were actually delivered but we just didn't get any acks for them
-         // before failure
-         ref.decrementDeliveryCount();
-      }
-
-      if (this.protocolData instanceof ConsumerListener) {
-         ((ConsumerListener)protocolData).updateForCanceledRef(ref);
+      // We first update the deliveryCount at the protocol callback...
+      // if that wasn't updated (if there is no specific logic, then we apply the default logic used on most protocols
+      if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) {
+         if (!failed) {
+            // We don't decrement delivery count if the client failed, since there's a possibility that refs
+            // were actually delivered but we just didn't get any acks for them
+            // before failure
+            ref.decrementDeliveryCount();
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index cf0ec69..9f23f80 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -33,6 +33,15 @@ public interface SessionCallback {
     *  like acks or other operations. */
    void afterDelivery() throws Exception;
 
+   /**
+    * Use this to updates specifics on the message after a redelivery happened.
+    * Return true if there was specific logic applied on the protocol, so the ServerConsumer won't make any adjustments.
+    * @param consumer
+    * @param ref
+    * @param failed
+    */
+   boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed);
+
    void sendProducerCreditsMessage(int credits, SimpleString address);
 
    void sendProducerCreditsFailMessage(int credits, SimpleString address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/50eac7c8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index a3bae65..2c88896 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -484,6 +484,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
+         return false;
+      }
+
+      @Override
       public void browserFinished(ServerConsumer consumer) {
 
       }


[3/3] activemq-artemis git commit: This closes #440

Posted by cl...@apache.org.
This closes #440


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4a092c1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4a092c1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4a092c1

Branch: refs/heads/master
Commit: c4a092c1c80fe83c475cddba8ae4ad5dba37c5ce
Parents: cf4636e 50eac7c
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 7 14:00:06 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 7 14:00:06 2016 -0400

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |  5 +++++
 .../core/protocol/mqtt/MQTTSessionCallback.java |  6 ++++++
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../openwire/OpenWireMessageConverter.java      | 20 +++++++++-----------
 .../core/protocol/openwire/amq/AMQConsumer.java | 20 ++++++++++++++++----
 .../core/protocol/openwire/amq/AMQSession.java  | 13 ++++++++++++-
 .../core/protocol/stomp/StompSession.java       |  5 +++++
 .../protocol/core/impl/CoreSessionCallback.java |  5 +++++
 .../core/server/impl/ServerConsumerImpl.java    | 17 +++++++++++------
 .../core/server/impl/ServerSessionImpl.java     |  1 +
 .../spi/core/protocol/SessionCallback.java      |  9 +++++++++
 .../integration/client/HangConsumerTest.java    |  5 +++++
 12 files changed, 85 insertions(+), 23 deletions(-)
----------------------------------------------------------------------