You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/09/26 22:36:55 UTC

activemq-artemis git commit: NO-JIRA Removing Thread usage for Pings on Stomp

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d09aba4cb -> 1548a4e21


NO-JIRA Removing Thread usage for Pings on Stomp


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1548a4e2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1548a4e2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1548a4e2

Branch: refs/heads/master
Commit: 1548a4e21772ff70690136e5d7653630cef7a851
Parents: d09aba4
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Sep 26 18:32:17 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 26 18:32:25 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    | 20 ++++++--
 .../protocol/stomp/StompProtocolManager.java    |  2 +-
 .../stomp/VersionedStompFrameHandler.java       | 17 +++++--
 .../stomp/v10/StompFrameHandlerV10.java         |  6 ++-
 .../stomp/v11/StompFrameHandlerV11.java         | 53 ++++++++------------
 .../stomp/v12/StompFrameHandlerV12.java         |  7 ++-
 6 files changed, 59 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index b918b75..f1605cb 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@@ -44,6 +45,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.VersionLoader;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
@@ -96,6 +98,10 @@ public final class StompConnection implements RemotingConnection {
 
    private final int minLargeMessageSize;
 
+   private final ScheduledExecutorService scheduledExecutorService;
+
+   private final ExecutorFactory factory;
+
    @Override
    public boolean isSupportReconnect() {
       return false;
@@ -111,7 +117,7 @@ public final class StompConnection implements RemotingConnection {
             case ActiveMQStompException.INVALID_EOL_V10:
                if (version != null)
                   throw e;
-               frameHandler = new StompFrameHandlerV12(this);
+               frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory);
                buffer.resetReaderIndex();
                frame = decode(buffer);
                break;
@@ -136,12 +142,18 @@ public final class StompConnection implements RemotingConnection {
 
    StompConnection(final Acceptor acceptorUsed,
                    final Connection transportConnection,
-                   final StompProtocolManager manager) {
+                   final StompProtocolManager manager,
+                   final ScheduledExecutorService scheduledExecutorService,
+                   final ExecutorFactory factory) {
+      this.scheduledExecutorService = scheduledExecutorService;
+
+      this.factory = factory;
+
       this.transportConnection = transportConnection;
 
       this.manager = manager;
 
-      this.frameHandler = new StompFrameHandlerV10(this);
+      this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory);
 
       this.creationTime = System.currentTimeMillis();
 
@@ -452,7 +464,7 @@ public final class StompConnection implements RemotingConnection {
       }
 
       if (this.version != (StompVersions.V1_0)) {
-         VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version);
+         VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory);
          newHandler.initDecoder(this.frameHandler);
          this.frameHandler = newHandler;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 5de63d3..235cddd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -117,7 +117,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
 
    @Override
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) {
-      StompConnection conn = new StompConnection(acceptorUsed, connection, this);
+      StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory());
 
       // Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
       // will be timed out and closed!

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 185f81f..25d32be 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.stomp;
 
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
@@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
 
@@ -37,21 +39,26 @@ public abstract class VersionedStompFrameHandler {
    protected StompConnection connection;
    protected StompDecoder decoder;
 
-   public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version) {
+   protected final ScheduledExecutorService scheduledExecutorService;
+   protected final ExecutorFactory executorFactory;
+
+   public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
       if (version == StompVersions.V1_0) {
-         return new StompFrameHandlerV10(connection);
+         return new StompFrameHandlerV10(connection, scheduledExecutorService, executorFactory);
       }
       if (version == StompVersions.V1_1) {
-         return new StompFrameHandlerV11(connection);
+         return new StompFrameHandlerV11(connection, scheduledExecutorService, executorFactory);
       }
       if (version == StompVersions.V1_2) {
-         return new StompFrameHandlerV12(connection);
+         return new StompFrameHandlerV12(connection, scheduledExecutorService, executorFactory);
       }
       return null;
    }
 
-   protected VersionedStompFrameHandler(StompConnection connection) {
+   protected VersionedStompFrameHandler(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
       this.connection = connection;
+      this.scheduledExecutorService = scheduledExecutorService;
+      this.executorFactory = executorFactory;
    }
 
    public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
index 25db3b0..0e32881 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v10;
 
 import javax.security.cert.X509Certificate;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener;
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
@@ -30,13 +31,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.utils.CertificateUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
 
 public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener {
 
-   public StompFrameHandlerV10(StompConnection connection) {
-      super(connection);
+   public StompFrameHandlerV10(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) {
+      super(connection, scheduledExecutorService, factory);
       decoder = new StompDecoder(this);
       decoder.init();
       connection.addStompEventListener(this);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 974a889..35de63c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -18,6 +18,9 @@ package org.apache.activemq.artemis.core.protocol.stomp.v11;
 
 import javax.security.cert.X509Certificate;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
@@ -31,9 +34,11 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.utils.CertificateUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
 
@@ -43,8 +48,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
 
    private HeartBeater heartBeater;
 
-   public StompFrameHandlerV11(StompConnection connection) {
-      super(connection);
+   public StompFrameHandlerV11(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) {
+      super(connection, scheduledExecutorService, executorFactory);
       connection.addStompEventListener(this);
       decoder = new StompDecoderV11(this);
       decoder.init();
@@ -127,19 +132,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
       //client receive ping
       long minAcceptInterval = Long.valueOf(params[1]);
 
-      heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
+      heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval);
    }
 
    @Override
    public StompFrame onDisconnect(StompFrame frame) {
       if (this.heartBeater != null) {
          heartBeater.shutdown();
-         try {
-            heartBeater.join();
-         }
-         catch (InterruptedException e) {
-            ActiveMQServerLogger.LOGGER.errorOnStompHeartBeat(e);
-         }
       }
       return null;
    }
@@ -250,7 +249,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
     * (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread
     *     can deal with closing connections which go stale
     */
-   private class HeartBeater extends Thread {
+   private class HeartBeater extends ActiveMQScheduledComponent {
 
       private static final int MIN_SERVER_PING = 500;
 
@@ -260,7 +259,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
       AtomicLong lastPingTimestamp = new AtomicLong(0);
       ConnectionEntry connectionEntry;
 
-      private HeartBeater(final long clientPing, final long clientAcceptPing) {
+      private HeartBeater(ScheduledExecutorService scheduledExecutorService, Executor executor, final long clientPing, final long clientAcceptPing) {
+         super(scheduledExecutorService, executor, clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING, TimeUnit.MILLISECONDS, false);
+
+         if (clientAcceptPing != 0) {
+            serverPingPeriod = super.getPeriod();
+         }
+
          connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID());
 
          if (connectionEntry != null) {
@@ -299,14 +304,11 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
             }
          }
 
-         if (clientAcceptPing != 0) {
-            serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
-         }
       }
 
-      public synchronized void shutdown() {
-         shutdown = true;
-         this.notify();
+      public void shutdown() {
+         this.stop();
+
       }
 
       public void pinged() {
@@ -315,21 +317,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
 
       @Override
       public void run() {
-         synchronized (this) {
-            while (!shutdown) {
-               long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get();
-               if (lastPingPeriod >= serverPingPeriod) {
-                  lastPingTimestamp.set(System.currentTimeMillis());
-                  connection.ping(createPingFrame());
-                  lastPingPeriod = 0;
-               }
-               try {
-                  this.wait(serverPingPeriod - lastPingPeriod);
-               }
-               catch (InterruptedException e) {
-               }
-            }
-         }
+         lastPingTimestamp.set(System.currentTimeMillis());
+         connection.ping(createPingFrame());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 5127dd0..19149bf 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp.v12;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
@@ -26,13 +28,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 
 import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
 
 public class StompFrameHandlerV12 extends StompFrameHandlerV11 {
 
-   public StompFrameHandlerV12(StompConnection connection) {
-      super(connection);
+   public StompFrameHandlerV12(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) {
+      super(connection, scheduledExecutorService, factory);
       decoder = new StompDecoderV12(this);
       decoder.init();
    }