You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/03 22:40:18 UTC

[1/2] activemq-artemis git commit: [ARTEMIS-642] Disable slow client reconnecting with KILL slow client policy

Repository: activemq-artemis
Updated Branches:
  refs/heads/master c13a9764c -> d871dfe62


[ARTEMIS-642] Disable slow client reconnecting with KILL slow client policy


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a741642a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a741642a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a741642a

Branch: refs/heads/master
Commit: a741642a489d36dadb972feec39afbc4176be1e3
Parents: c13a976
Author: bayern39 <mc...@redhat.com>
Authored: Fri Jul 15 15:33:42 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 3 18:39:57 2016 -0400

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 15 +++++
 .../core/client/impl/ClientSessionImpl.java     |  5 ++
 .../core/client/impl/ClientSessionInternal.java |  3 +
 .../core/impl/ActiveMQSessionContext.java       | 21 +++++++
 .../core/protocol/core/impl/PacketImpl.java     |  2 +
 .../core/impl/RemotingConnectionImpl.java       | 12 ++++
 .../DisconnectConsumerWithKillMessage.java      | 60 ++++++++++++++++++++
 .../spi/core/protocol/RemotingConnection.java   |  5 ++
 .../main/resources/activemq-version.properties  |  2 +-
 .../ActiveMQProtonRemotingConnection.java       |  6 ++
 .../core/protocol/mqtt/MQTTConnection.java      |  6 ++
 .../protocol/openwire/OpenWireConnection.java   |  5 ++
 .../core/protocol/stomp/StompConnection.java    |  5 ++
 .../artemis/core/server/impl/QueueImpl.java     |  5 +-
 pom.xml                                         |  2 +-
 .../integration/client/SlowConsumerTest.java    | 38 +++++++++++++
 16 files changed, 189 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 96b4059..c944fa1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -528,6 +529,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                                     String scaleDownTargetNodeID) {
       ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);
 
+      for (ClientSessionInternal session : sessions) {
+         SessionContext context = session.getSessionContext();
+         if (context instanceof ActiveMQSessionContext) {
+            ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)context;
+            if (sessionContext.isKilled()) {
+               setReconnectAttempts(0);
+            }
+         }
+      }
+
       Set<ClientSessionInternal> sessionsToClose = null;
       if (!clientProtocolManager.isAlive())
          return;
@@ -1028,6 +1039,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       reconnectAttempts = attempts;
    }
 
+   public int getReconnectAttempts() {
+      return reconnectAttempts;
+   }
+
    @Override
    public Object getConnector() {
       return connector;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index bec10fb..8b0fc8e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1787,4 +1787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
          }
       });
    }
+
+   @Override
+   public SessionContext getSessionContext() {
+      return sessionContext;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index b25e1a8..b082786 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 
 public interface ClientSessionInternal extends ClientSession {
 
@@ -130,4 +131,6 @@ public interface ClientSessionInternal extends ClientSession {
    String getNodeId();
 
    boolean isWritable(ReadyListener callback);
+
+   SessionContext getSessionContext();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index f49a22a..32f2d14 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
@@ -117,6 +118,7 @@ public class ActiveMQSessionContext extends SessionContext {
    private final int serverVersion;
    private int confirmationWindow;
    private String name;
+   private boolean killed;
 
    protected Channel getSessionChannel() {
       return sessionChannel;
@@ -162,6 +164,14 @@ public class ActiveMQSessionContext extends SessionContext {
       return sessionChannel.getReconnectID();
    }
 
+   public boolean isKilled() {
+      return killed;
+   }
+
+   public void kill() {
+      this.killed = true;
+   }
+
    private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
       @Override
       public void commandConfirmed(final Packet packet) {
@@ -759,6 +769,12 @@ public class ActiveMQSessionContext extends SessionContext {
       handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
    }
 
+   protected void handleReceiveSlowConsumerKillMessage(DisconnectConsumerWithKillMessage message) {
+      if (message.getNodeID() != null) {
+         kill();
+      }
+   }
+
    class ClientSessionPacketHandler implements ChannelHandler {
 
       @Override
@@ -796,6 +812,11 @@ public class ActiveMQSessionContext extends SessionContext {
 
                   break;
                }
+               case PacketImpl.DISCONNECT_CONSUMER_KILL: {
+                  handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet);
+
+                  break;
+               }
                case EXCEPTION: {
                   // We can only log these exceptions
                   // maybe we should cache it on SessionContext and throw an exception on any next calls

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index ac1edf7..6dddf3b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -46,6 +46,8 @@ public class PacketImpl implements Packet {
 
    public static final byte DISCONNECT_CONSUMER = 12;
 
+   public static final byte DISCONNECT_CONSUMER_KILL = 13;
+
    // Miscellaneous
    public static final byte EXCEPTION = 20;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index b051519..f7dfa32 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
@@ -387,4 +388,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
    public String getClientID() {
       return clientID;
    }
+
+   @Override
+   public void killMessage(SimpleString nodeID) {
+      if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
+         return;
+      }
+      Channel clientChannel = getChannel(1, -1);
+      DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);
+
+      clientChannel.send(response, -1);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java
new file mode 100644
index 0000000..ce55bbd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+
+public class DisconnectConsumerWithKillMessage extends PacketImpl {
+
+   private SimpleString nodeID;
+
+   public  static final int VERSION_INTRODUCED = 128;
+
+   public DisconnectConsumerWithKillMessage(final SimpleString nodeID) {
+      super(DISCONNECT_CONSUMER_KILL);
+      this.nodeID = nodeID;
+   }
+
+   public DisconnectConsumerWithKillMessage() {
+      super(DISCONNECT_CONSUMER_KILL);
+   }
+
+   @Override
+   public void encodeRest(final ActiveMQBuffer buffer) {
+      buffer.writeNullableSimpleString(nodeID);
+   }
+
+   @Override
+   public void decodeRest(final ActiveMQBuffer buffer) {
+      nodeID = buffer.readNullableSimpleString();
+   }
+
+   @Override
+   public String toString() {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", nodeID=" + nodeID);
+      buff.append("]");
+      return buff.toString();
+   }
+
+   public SimpleString getNodeID() {
+      return nodeID;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index 078e42e..fe1a087 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@@ -184,4 +185,8 @@ public interface RemotingConnection extends BufferHandler {
 
    boolean isWritable(ReadyListener callback);
 
+   /**
+    *if slow consumer is killed,send the msessage to client.
+    */
+   void killMessage(SimpleString nodeID);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/resources/activemq-version.properties
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index 07685e1..b6f4af2 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
 activemq.version.microVersion=${activemq.version.microVersion}
 activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
 activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
index 0edd6b9..670ca5b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -133,4 +134,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
       // We close the underlying transport connection
       getTransportConnection().close();
    }
+
+   @Override
+   public void killMessage(SimpleString nodeID) {
+      //unsupported
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 6126bb3..f651d3d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -210,4 +211,9 @@ public class MQTTConnection implements RemotingConnection {
    public boolean getConnected() {
       return connected;
    }
+
+   @Override
+   public void killMessage(SimpleString nodeID) {
+      //unsupported
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index a6f00df..7c52b27 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1412,4 +1412,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       return xaException;
    }
 
+   @Override
+   public void killMessage(SimpleString nodeID) {
+      //unsupported
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 28d0de4..475a34c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -728,4 +728,9 @@ public final class StompConnection implements RemotingConnection {
       return manager;
    }
 
+   @Override
+   public void killMessage(SimpleString nodeID) {
+      //unsupported
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index b8d917a..16b8880 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
@@ -2989,7 +2990,8 @@ public class QueueImpl implements Queue {
                }
                else if (consumerRate < threshold) {
                   RemotingConnection connection = null;
-                  RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService();
+                  ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
+                  RemotingService remotingService = server.getRemotingService();
 
                   for (RemotingConnection potentialConnection : remotingService.getConnections()) {
                      if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
@@ -3002,6 +3004,7 @@ public class QueueImpl implements Queue {
                   if (connection != null) {
                      ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
                      if (policy.equals(SlowConsumerPolicy.KILL)) {
+                        connection.killMessage(server.getNodeID());
                         remotingService.removeConnection(connection.getID());
                         connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
                      }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9607ead..386a95e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,7 @@
       <activemq.version.majorVersion>1</activemq.version.majorVersion>
       <activemq.version.minorVersion>0</activemq.version.minorVersion>
       <activemq.version.microVersion>0</activemq.version.microVersion>
-      <activemq.version.incrementingVersion>127,126,125,124,123,122</activemq.version.incrementingVersion>
+      <activemq.version.incrementingVersion>128,127,126,125,124,123,122</activemq.version.incrementingVersion>
       <activemq.version.versionTag>${project.version}</activemq.version.versionTag>
       <ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 79eac1c..788a810 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,9 +37,12 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Before;
@@ -115,6 +119,40 @@ public class SlowConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      final int numMessages = 25;
+
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(createTextMessage(session, "m" + i));
+      }
+
+      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+      session.start();
+
+      Thread.sleep(3000);
+
+      RemotingService service = server.getRemotingService();
+      Set<RemotingConnection> connections = service.getConnections();
+      assertTrue(connections.isEmpty());
+
+      if (sf instanceof ClientSessionFactoryImpl) {
+         int reconnectAttemps = ((ClientSessionFactoryImpl)sf).getReconnectAttempts();
+         assertEquals(0, reconnectAttemps);
+      }
+      else {
+         fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
+      }
+   }
+
+   @Test
    public void testSlowConsumerNotification() throws Exception {
 
       ClientSessionFactory sf = createSessionFactory(locator);


[2/2] activemq-artemis git commit: This closes #672

Posted by cl...@apache.org.
This closes #672


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d871dfe6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d871dfe6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d871dfe6

Branch: refs/heads/master
Commit: d871dfe6225dc4f3c08d9fb9c41069b0849eff2f
Parents: c13a976 a741642
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Aug 3 18:39:58 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 3 18:39:58 2016 -0400

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 15 +++++
 .../core/client/impl/ClientSessionImpl.java     |  5 ++
 .../core/client/impl/ClientSessionInternal.java |  3 +
 .../core/impl/ActiveMQSessionContext.java       | 21 +++++++
 .../core/protocol/core/impl/PacketImpl.java     |  2 +
 .../core/impl/RemotingConnectionImpl.java       | 12 ++++
 .../DisconnectConsumerWithKillMessage.java      | 60 ++++++++++++++++++++
 .../spi/core/protocol/RemotingConnection.java   |  5 ++
 .../main/resources/activemq-version.properties  |  2 +-
 .../ActiveMQProtonRemotingConnection.java       |  6 ++
 .../core/protocol/mqtt/MQTTConnection.java      |  6 ++
 .../protocol/openwire/OpenWireConnection.java   |  5 ++
 .../core/protocol/stomp/StompConnection.java    |  5 ++
 .../artemis/core/server/impl/QueueImpl.java     |  5 +-
 pom.xml                                         |  2 +-
 .../integration/client/SlowConsumerTest.java    | 38 +++++++++++++
 16 files changed, 189 insertions(+), 3 deletions(-)
----------------------------------------------------------------------