You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:31:31 UTC

[49/69] [abbrv] activemq-artemis git commit: fixing SlowConsumerDetection

fixing SlowConsumerDetection


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c3c034f4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c3c034f4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c3c034f4

Branch: refs/heads/refactor-openwire
Commit: c3c034f472f4b30938c75c1d00016233029a46e7
Parents: 46f5d2c
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Feb 25 14:40:04 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 30 22:29:44 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/OpenWireConnection.java | 12 +++++++++---
 .../protocol/openwire/OpenWireProtocolManager.java | 17 ++++-------------
 .../core/server/impl/ServerConsumerImpl.java       | 17 +++++++++++++++++
 3 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3c034f4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6f2e3be..dc2a8a6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -204,6 +204,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
+
+
+         // TODO-NOW: the server should send packets to the client based on the requested times
+         //           need to look at what Andy did on AMQP
+
          // the connection handles pings, negotiations directly.
          // and delegate all other commands to manager.
          if (command.getClass() == KeepAliveInfo.class) {
@@ -1196,12 +1201,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processMessageDispatch(MessageDispatch arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         return null;
       }
 
       @Override
       public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         return null;
       }
 
       @Override
@@ -1222,7 +1227,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processProducerAck(ProducerAck arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         // a broker doesn't do producers.. this shouldn't happen
+         return null;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3c034f4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index bdf27f8..514a2b9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,14 +17,12 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -44,7 +42,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -60,7 +57,6 @@ import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageDispatch;
@@ -91,21 +87,14 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
 
    private OpenWireFormatFactory wireFactory;
 
-   private boolean tightEncodingEnabled = true;
-
    private boolean prefixPacketSize = true;
 
    private BrokerId brokerId;
    protected final ProducerId advisoryProducerId = new ProducerId();
 
-   // from broker
-   protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
-
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
 
-   protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
-
-   // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available on Artemis upstream (unique-client-id)
+   // TODO-NOW: this can probably go away
    private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
 
    private String brokerName;
@@ -133,11 +122,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       // preferred prop, should be done via config
       wireFactory.setCacheEnabled(false);
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
-      ManagementService service = server.getManagementService();
       scheduledPool = server.getScheduledPool();
 
       final ClusterManager clusterManager = this.server.getClusterManager();
+
+      // TODO-NOW: use a property name for the cluster connection
       ClusterConnection cc = clusterManager.getDefaultConnection(null);
+
       if (cc != null) {
          cc.addClusterTopologyListener(this);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3c034f4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 545b4dc..b5ea5d9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -89,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private Object protocolContext;
 
+   private SlowConsumerDetectionListener slowConsumerListener;
+
    /**
     * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
     * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@@ -223,6 +225,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    // ----------------------------------------------------------------------
 
    @Override
+   public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
+      this.slowConsumerListener = listener;
+   }
+
+   @Override
+   public SlowConsumerDetectionListener getSlowConsumerDetecion() {
+      return slowConsumerListener;
+   }
+
+   @Override
+   public void fireSlowConsumer() {
+      slowConsumerListener.onSlowConsumer(this);
+   }
+
+   @Override
    public Object getProtocolContext() {
       return protocolContext;
    }