You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/09/26 22:36:55 UTC
activemq-artemis git commit: NO-JIRA Removing Thread usage for Pings
on Stomp
Repository: activemq-artemis
Updated Branches:
refs/heads/master d09aba4cb -> 1548a4e21
NO-JIRA Removing Thread usage for Pings on Stomp
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1548a4e2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1548a4e2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1548a4e2
Branch: refs/heads/master
Commit: 1548a4e21772ff70690136e5d7653630cef7a851
Parents: d09aba4
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Sep 26 18:32:17 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 26 18:32:25 2016 -0400
----------------------------------------------------------------------
.../core/protocol/stomp/StompConnection.java | 20 ++++++--
.../protocol/stomp/StompProtocolManager.java | 2 +-
.../stomp/VersionedStompFrameHandler.java | 17 +++++--
.../stomp/v10/StompFrameHandlerV10.java | 6 ++-
.../stomp/v11/StompFrameHandlerV11.java | 53 ++++++++------------
.../stomp/v12/StompFrameHandlerV12.java | 7 ++-
6 files changed, 59 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index b918b75..f1605cb 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -44,6 +45,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -96,6 +98,10 @@ public final class StompConnection implements RemotingConnection {
private final int minLargeMessageSize;
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private final ExecutorFactory factory;
+
@Override
public boolean isSupportReconnect() {
return false;
@@ -111,7 +117,7 @@ public final class StompConnection implements RemotingConnection {
case ActiveMQStompException.INVALID_EOL_V10:
if (version != null)
throw e;
- frameHandler = new StompFrameHandlerV12(this);
+ frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory);
buffer.resetReaderIndex();
frame = decode(buffer);
break;
@@ -136,12 +142,18 @@ public final class StompConnection implements RemotingConnection {
StompConnection(final Acceptor acceptorUsed,
final Connection transportConnection,
- final StompProtocolManager manager) {
+ final StompProtocolManager manager,
+ final ScheduledExecutorService scheduledExecutorService,
+ final ExecutorFactory factory) {
+ this.scheduledExecutorService = scheduledExecutorService;
+
+ this.factory = factory;
+
this.transportConnection = transportConnection;
this.manager = manager;
- this.frameHandler = new StompFrameHandlerV10(this);
+ this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory);
this.creationTime = System.currentTimeMillis();
@@ -452,7 +464,7 @@ public final class StompConnection implements RemotingConnection {
}
if (this.version != (StompVersions.V1_0)) {
- VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version);
+ VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory);
newHandler.initDecoder(this.frameHandler);
this.frameHandler = newHandler;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 5de63d3..235cddd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -117,7 +117,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
@Override
public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
- StompConnection conn = new StompConnection(acceptorUsed, connection, this);
+ StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory());
// Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 185f81f..25d32be 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.stomp;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
@@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -37,21 +39,26 @@ public abstract class VersionedStompFrameHandler {
protected StompConnection connection;
protected StompDecoder decoder;
- public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version) {
+ protected final ScheduledExecutorService scheduledExecutorService;
+ protected final ExecutorFactory executorFactory;
+
+ public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
if (version == StompVersions.V1_0) {
- return new StompFrameHandlerV10(connection);
+ return new StompFrameHandlerV10(connection, scheduledExecutorService, executorFactory);
}
if (version == StompVersions.V1_1) {
- return new StompFrameHandlerV11(connection);
+ return new StompFrameHandlerV11(connection, scheduledExecutorService, executorFactory);
}
if (version == StompVersions.V1_2) {
- return new StompFrameHandlerV12(connection);
+ return new StompFrameHandlerV12(connection, scheduledExecutorService, executorFactory);
}
return null;
}
- protected VersionedStompFrameHandler(StompConnection connection) {
+ protected VersionedStompFrameHandler(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
this.connection = connection;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.executorFactory = executorFactory;
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
index 25db3b0..0e32881 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v10;
import javax.security.cert.X509Certificate;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
@@ -30,13 +31,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.CertificateUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener {
- public StompFrameHandlerV10(StompConnection connection) {
- super(connection);
+ public StompFrameHandlerV10(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) {
+ super(connection, scheduledExecutorService, factory);
decoder = new StompDecoder(this);
decoder.init();
connection.addStompEventListener(this);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 974a889..35de63c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -18,6 +18,9 @@ package org.apache.activemq.artemis.core.protocol.stomp.v11;
import javax.security.cert.X509Certificate;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
@@ -31,9 +34,11 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.utils.CertificateUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -43,8 +48,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
private HeartBeater heartBeater;
- public StompFrameHandlerV11(StompConnection connection) {
- super(connection);
+ public StompFrameHandlerV11(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
+ super(connection, scheduledExecutorService, executorFactory);
connection.addStompEventListener(this);
decoder = new StompDecoderV11(this);
decoder.init();
@@ -127,19 +132,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
//client receive ping
long minAcceptInterval = Long.valueOf(params[1]);
- heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
+ heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval);
}
@Override
public StompFrame onDisconnect(StompFrame frame) {
if (this.heartBeater != null) {
heartBeater.shutdown();
- try {
- heartBeater.join();
- }
- catch (InterruptedException e) {
- ActiveMQServerLogger.LOGGER.errorOnStompHeartBeat(e);
- }
}
return null;
}
@@ -250,7 +249,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
* (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread
* can deal with closing connections which go stale
*/
- private class HeartBeater extends Thread {
+ private class HeartBeater extends ActiveMQScheduledComponent {
private static final int MIN_SERVER_PING = 500;
@@ -260,7 +259,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
AtomicLong lastPingTimestamp = new AtomicLong(0);
ConnectionEntry connectionEntry;
- private HeartBeater(final long clientPing, final long clientAcceptPing) {
+ private HeartBeater(ScheduledExecutorService scheduledExecutorService, Executor executor, final long clientPing, final long clientAcceptPing) {
+ super(scheduledExecutorService, executor, clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING, TimeUnit.MILLISECONDS, false);
+
+ if (clientAcceptPing != 0) {
+ serverPingPeriod = super.getPeriod();
+ }
+
connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID());
if (connectionEntry != null) {
@@ -299,14 +304,11 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
}
}
- if (clientAcceptPing != 0) {
- serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
- }
}
- public synchronized void shutdown() {
- shutdown = true;
- this.notify();
+ public void shutdown() {
+ this.stop();
+
}
public void pinged() {
@@ -315,21 +317,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
@Override
public void run() {
- synchronized (this) {
- while (!shutdown) {
- long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get();
- if (lastPingPeriod >= serverPingPeriod) {
- lastPingTimestamp.set(System.currentTimeMillis());
- connection.ping(createPingFrame());
- lastPingPeriod = 0;
- }
- try {
- this.wait(serverPingPeriod - lastPingPeriod);
- }
- catch (InterruptedException e) {
- }
- }
- }
+ lastPingTimestamp.set(System.currentTimeMillis());
+ connection.ping(createPingFrame());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 5127dd0..19149bf 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp.v12;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -26,13 +28,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
- public StompFrameHandlerV12(StompConnection connection) {
- super(connection);
+ public StompFrameHandlerV12(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) {
+ super(connection, scheduledExecutorService, factory);
decoder = new StompDecoderV12(this);
decoder.init();
}