You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/03 22:40:18 UTC
[1/2] activemq-artemis git commit: [ARTEMIS-642] Disable slow client
reconnecting with KILL slow client policy
Repository: activemq-artemis
Updated Branches:
refs/heads/master c13a9764c -> d871dfe62
[ARTEMIS-642] Disable slow client reconnecting with KILL slow client policy
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a741642a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a741642a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a741642a
Branch: refs/heads/master
Commit: a741642a489d36dadb972feec39afbc4176be1e3
Parents: c13a976
Author: bayern39 <mc...@redhat.com>
Authored: Fri Jul 15 15:33:42 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 3 18:39:57 2016 -0400
----------------------------------------------------------------------
.../client/impl/ClientSessionFactoryImpl.java | 15 +++++
.../core/client/impl/ClientSessionImpl.java | 5 ++
.../core/client/impl/ClientSessionInternal.java | 3 +
.../core/impl/ActiveMQSessionContext.java | 21 +++++++
.../core/protocol/core/impl/PacketImpl.java | 2 +
.../core/impl/RemotingConnectionImpl.java | 12 ++++
.../DisconnectConsumerWithKillMessage.java | 60 ++++++++++++++++++++
.../spi/core/protocol/RemotingConnection.java | 5 ++
.../main/resources/activemq-version.properties | 2 +-
.../ActiveMQProtonRemotingConnection.java | 6 ++
.../core/protocol/mqtt/MQTTConnection.java | 6 ++
.../protocol/openwire/OpenWireConnection.java | 5 ++
.../core/protocol/stomp/StompConnection.java | 5 ++
.../artemis/core/server/impl/QueueImpl.java | 5 +-
pom.xml | 2 +-
.../integration/client/SlowConsumerTest.java | 38 +++++++++++++
16 files changed, 189 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 96b4059..c944fa1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -528,6 +529,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
String scaleDownTargetNodeID) {
ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);
+ for (ClientSessionInternal session : sessions) {
+ SessionContext context = session.getSessionContext();
+ if (context instanceof ActiveMQSessionContext) {
+ ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)context;
+ if (sessionContext.isKilled()) {
+ setReconnectAttempts(0);
+ }
+ }
+ }
+
Set<ClientSessionInternal> sessionsToClose = null;
if (!clientProtocolManager.isAlive())
return;
@@ -1028,6 +1039,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
reconnectAttempts = attempts;
}
+ public int getReconnectAttempts() {
+ return reconnectAttempts;
+ }
+
@Override
public Object getConnector() {
return connector;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index bec10fb..8b0fc8e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1787,4 +1787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
});
}
+
+ @Override
+ public SessionContext getSessionContext() {
+ return sessionContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index b25e1a8..b082786 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public interface ClientSessionInternal extends ClientSession {
@@ -130,4 +131,6 @@ public interface ClientSessionInternal extends ClientSession {
String getNodeId();
boolean isWritable(ReadyListener callback);
+
+ SessionContext getSessionContext();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index f49a22a..32f2d14 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
@@ -117,6 +118,7 @@ public class ActiveMQSessionContext extends SessionContext {
private final int serverVersion;
private int confirmationWindow;
private String name;
+ private boolean killed;
protected Channel getSessionChannel() {
return sessionChannel;
@@ -162,6 +164,14 @@ public class ActiveMQSessionContext extends SessionContext {
return sessionChannel.getReconnectID();
}
+ public boolean isKilled() {
+ return killed;
+ }
+
+ public void kill() {
+ this.killed = true;
+ }
+
private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() {
@Override
public void commandConfirmed(final Packet packet) {
@@ -759,6 +769,12 @@ public class ActiveMQSessionContext extends SessionContext {
handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
}
+ protected void handleReceiveSlowConsumerKillMessage(DisconnectConsumerWithKillMessage message) {
+ if (message.getNodeID() != null) {
+ kill();
+ }
+ }
+
class ClientSessionPacketHandler implements ChannelHandler {
@Override
@@ -796,6 +812,11 @@ public class ActiveMQSessionContext extends SessionContext {
break;
}
+ case PacketImpl.DISCONNECT_CONSUMER_KILL: {
+ handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet);
+
+ break;
+ }
case EXCEPTION: {
// We can only log these exceptions
// maybe we should cache it on SessionContext and throw an exception on any next calls
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index ac1edf7..6dddf3b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -46,6 +46,8 @@ public class PacketImpl implements Packet {
public static final byte DISCONNECT_CONSUMER = 12;
+ public static final byte DISCONNECT_CONSUMER_KILL = 13;
+
// Miscellaneous
public static final byte EXCEPTION = 20;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index b051519..f7dfa32 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
@@ -387,4 +388,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
public String getClientID() {
return clientID;
}
+
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ if (clientVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
+ return;
+ }
+ Channel clientChannel = getChannel(1, -1);
+ DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);
+
+ clientChannel.send(response, -1);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java
new file mode 100644
index 0000000..ce55bbd
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/DisconnectConsumerWithKillMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+
+public class DisconnectConsumerWithKillMessage extends PacketImpl {
+
+ private SimpleString nodeID;
+
+ public static final int VERSION_INTRODUCED = 128;
+
+ public DisconnectConsumerWithKillMessage(final SimpleString nodeID) {
+ super(DISCONNECT_CONSUMER_KILL);
+ this.nodeID = nodeID;
+ }
+
+ public DisconnectConsumerWithKillMessage() {
+ super(DISCONNECT_CONSUMER_KILL);
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ buffer.writeNullableSimpleString(nodeID);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ nodeID = buffer.readNullableSimpleString();
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", nodeID=" + nodeID);
+ buff.append("]");
+ return buff.toString();
+ }
+
+ public SimpleString getNodeID() {
+ return nodeID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index 078e42e..fe1a087 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@@ -184,4 +185,8 @@ public interface RemotingConnection extends BufferHandler {
boolean isWritable(ReadyListener callback);
+ /**
+ *if slow consumer is killed,send the msessage to client.
+ */
+ void killMessage(SimpleString nodeID);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-core-client/src/main/resources/activemq-version.properties
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index 07685e1..b6f4af2 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
index 0edd6b9..670ca5b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ActiveMQProtonRemotingConnection.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -133,4 +134,9 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
// We close the underlying transport connection
getTransportConnection().close();
}
+
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ //unsupported
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 6126bb3..f651d3d 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -210,4 +211,9 @@ public class MQTTConnection implements RemotingConnection {
public boolean getConnected() {
return connected;
}
+
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ //unsupported
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index a6f00df..7c52b27 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1412,4 +1412,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return xaException;
}
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ //unsupported
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 28d0de4..475a34c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -728,4 +728,9 @@ public final class StompConnection implements RemotingConnection {
return manager;
}
+ @Override
+ public void killMessage(SimpleString nodeID) {
+ //unsupported
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index b8d917a..16b8880 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
@@ -2989,7 +2990,8 @@ public class QueueImpl implements Queue {
}
else if (consumerRate < threshold) {
RemotingConnection connection = null;
- RemotingService remotingService = ((PostOfficeImpl) postOffice).getServer().getRemotingService();
+ ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
+ RemotingService remotingService = server.getRemotingService();
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
@@ -3002,6 +3004,7 @@ public class QueueImpl implements Queue {
if (connection != null) {
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
if (policy.equals(SlowConsumerPolicy.KILL)) {
+ connection.killMessage(server.getNodeID());
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9607ead..386a95e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
- <activemq.version.incrementingVersion>127,126,125,124,123,122</activemq.version.incrementingVersion>
+ <activemq.version.incrementingVersion>128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a741642a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 79eac1c..788a810 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,9 +37,12 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Before;
@@ -115,6 +119,40 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
@Test
+ public void testDisableSlowConsumerReconnectWithKilled() throws Exception {
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+ final int numMessages = 25;
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(createTextMessage(session, "m" + i));
+ }
+
+ ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+ session.start();
+
+ Thread.sleep(3000);
+
+ RemotingService service = server.getRemotingService();
+ Set<RemotingConnection> connections = service.getConnections();
+ assertTrue(connections.isEmpty());
+
+ if (sf instanceof ClientSessionFactoryImpl) {
+ int reconnectAttemps = ((ClientSessionFactoryImpl)sf).getReconnectAttempts();
+ assertEquals(0, reconnectAttemps);
+ }
+ else {
+ fail("ClientSessionFactory is not the instance of ClientSessionFactoryImpl");
+ }
+ }
+
+ @Test
public void testSlowConsumerNotification() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
[2/2] activemq-artemis git commit: This closes #672
Posted by cl...@apache.org.
This closes #672
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d871dfe6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d871dfe6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d871dfe6
Branch: refs/heads/master
Commit: d871dfe6225dc4f3c08d9fb9c41069b0849eff2f
Parents: c13a976 a741642
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Aug 3 18:39:58 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 3 18:39:58 2016 -0400
----------------------------------------------------------------------
.../client/impl/ClientSessionFactoryImpl.java | 15 +++++
.../core/client/impl/ClientSessionImpl.java | 5 ++
.../core/client/impl/ClientSessionInternal.java | 3 +
.../core/impl/ActiveMQSessionContext.java | 21 +++++++
.../core/protocol/core/impl/PacketImpl.java | 2 +
.../core/impl/RemotingConnectionImpl.java | 12 ++++
.../DisconnectConsumerWithKillMessage.java | 60 ++++++++++++++++++++
.../spi/core/protocol/RemotingConnection.java | 5 ++
.../main/resources/activemq-version.properties | 2 +-
.../ActiveMQProtonRemotingConnection.java | 6 ++
.../core/protocol/mqtt/MQTTConnection.java | 6 ++
.../protocol/openwire/OpenWireConnection.java | 5 ++
.../core/protocol/stomp/StompConnection.java | 5 ++
.../artemis/core/server/impl/QueueImpl.java | 5 +-
pom.xml | 2 +-
.../integration/client/SlowConsumerTest.java | 38 +++++++++++++
16 files changed, 189 insertions(+), 3 deletions(-)
----------------------------------------------------------------------