You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/02 18:29:45 UTC

[1/3] activemq-artemis git commit: ARTEMIS-814: Refactor client connection and allow adding custom event handlers

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 28645b1b8 -> 8b57516e1


ARTEMIS-814: Refactor client connection and allow adding custom event handlers


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/52a462d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/52a462d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/52a462d1

Branch: refs/heads/master
Commit: 52a462d15518423456c10ffca577b8b33ee42ed3
Parents: 28645b1
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Mon Oct 31 11:30:29 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 2 14:24:44 2016 -0400

----------------------------------------------------------------------
 .../client/AMQPClientConnectionFactory.java     |  61 ++++++++++
 ...ProtonClientConnectionLifeCycleListener.java | 110 -------------------
 .../client/ProtonClientConnectionManager.java   |  97 ++++++++++++++++
 .../amqp/proton/AMQPConnectionContext.java      |   8 ++
 .../amqp/proton/AMQPSessionContext.java         |   5 +-
 .../amqp/proton/ProtonClientSenderContext.java  |  34 ++++++
 .../amqp/proton/ProtonServerSenderContext.java  |  13 ++-
 .../tests/integration/amqp/ProtonTest.java      |   6 +-
 8 files changed, 217 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
new file mode 100644
index 0000000..5807809
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.client;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/**
+ * Connection factory for outgoing AMQP connections.
+ */
+public class AMQPClientConnectionFactory {
+
+   private final ActiveMQServer server;
+   private final String containerId;
+   private final int ttl;
+
+   public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, int ttl) {
+      this.server = server;
+      this.containerId = containerId;
+      this.ttl = ttl;
+   }
+
+   public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
+      AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
+
+      Executor executor = server.getExecutorFactory().getExecutor();
+
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
+      eventHandler.ifPresent(amqpConnection::addEventHandler);
+
+      ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
+
+      connectionCallback.setProtonConnectionDelegate(delegate);
+
+      amqpConnection.open();
+      return delegate;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java
deleted file mode 100644
index 12da243..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.client;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.server.ActiveMQComponent;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
-import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
-import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
-import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
-import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
-import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.jboss.logging.Logger;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-/**
- * Manages the lifecycle of a proton client connection.
- */
-public class ProtonClientConnectionLifeCycleListener implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
-   private final Map<Object, ConnectionEntry> connectionMap = new ConcurrentHashMap<>();
-   private final ActiveMQServer server;
-   private final int ttl;
-   private static final Logger log = Logger.getLogger(ProtonClientConnectionLifeCycleListener.class);
-
-   public ProtonClientConnectionLifeCycleListener(ActiveMQServer server, int ttl) {
-      this.server = server;
-      this.ttl = ttl;
-   }
-
-   @Override
-   public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
-      AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
-
-      String id = server.getConfiguration().getName();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
-      Executor executor = server.getExecutorFactory().getExecutor();
-      amqpConnection.open();
-
-      ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
-
-      connectionCallback.setProtonConnectionDelegate(delegate);
-
-      ConnectionEntry connectionEntry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
-      connectionMap.put(connection.getID(), connectionEntry);
-      log.info("Connection " + connection.getRemoteAddress() + " created");
-   }
-
-   @Override
-   public void connectionDestroyed(Object connectionID) {
-      ConnectionEntry connection = connectionMap.remove(connectionID);
-      if (connection != null) {
-         log.info("Connection " + connection.connection.getRemoteAddress() + " destroyed");
-         connection.connection.disconnect(false);
-      }
-   }
-
-   @Override
-   public void connectionException(Object connectionID, ActiveMQException me) {
-      ConnectionEntry connection = connectionMap.get(connectionID);
-      if (connection != null) {
-         log.info("Connection " + connection.connection.getRemoteAddress() + " exception: " + me.getMessage());
-         connection.connection.fail(me);
-      }
-   }
-
-   @Override
-   public void connectionReadyForWrites(Object connectionID, boolean ready) {
-      ConnectionEntry connection = connectionMap.get(connectionID);
-      if (connection != null) {
-         log.info("Connection " + connection.connection.getRemoteAddress() + " ready");
-         connection.connection.getTransportConnection().fireReady(true);
-      }
-   }
-
-   public void stop() {
-      for (ConnectionEntry entry : connectionMap.values()) {
-         entry.connection.disconnect(false);
-      }
-   }
-
-   @Override
-   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
-      ConnectionEntry entry = connectionMap.get(connectionID);
-      if (entry != null) {
-         entry.connection.bufferReceived(connectionID, buffer);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
new file mode 100644
index 0000000..ec9136f
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.client;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.jboss.logging.Logger;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages the lifecycle of a proton client connection.
+ */
+public class ProtonClientConnectionManager implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
+   private final Map<Object, ActiveMQProtonRemotingConnection> connectionMap = new ConcurrentHashMap<>();
+   private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class);
+   private final AMQPClientConnectionFactory connectionFactory;
+   private final Optional<EventHandler> eventHandler;
+
+   public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler) {
+      this.connectionFactory = connectionFactory;
+      this.eventHandler = eventHandler;
+   }
+
+   @Override
+   public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
+      ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler);
+      connectionMap.put(connection.getID(), amqpConnection);
+
+      log.info("Connection " + amqpConnection.getRemoteAddress() + " created");
+   }
+
+   @Override
+   public void connectionDestroyed(Object connectionID) {
+      RemotingConnection connection = connectionMap.remove(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.getRemoteAddress() + " destroyed");
+         connection.disconnect(false);
+      }
+   }
+
+   @Override
+   public void connectionException(Object connectionID, ActiveMQException me) {
+      RemotingConnection connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage());
+         connection.fail(me);
+      }
+   }
+
+   @Override
+   public void connectionReadyForWrites(Object connectionID, boolean ready) {
+      RemotingConnection connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.getRemoteAddress() + " ready");
+         connection.getTransportConnection().fireReady(true);
+      }
+   }
+
+   public void stop() {
+      for (RemotingConnection connection : connectionMap.values()) {
+         connection.disconnect(false);
+      }
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+      RemotingConnection connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         connection.bufferReceived(connectionID, buffer);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 8957d99..bdccd96 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -209,6 +209,14 @@ public class AMQPConnectionContext extends ProtonInitializable {
       handler.open(containerId);
    }
 
+   public String getContainer() {
+      return containerId;
+   }
+
+   public void addEventHandler(EventHandler eventHandler) {
+      handler.addEventHandler(eventHandler);
+   }
+
    // This listener will perform a bunch of things here
    class LocalListener implements EventHandler {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 6a6c1fa..dfc8031 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -166,7 +166,9 @@ public class AMQPSessionContext extends ProtonInitializable {
    }
 
    public void addSender(Sender sender) throws Exception {
-      ProtonServerSenderContext protonSender = new ProtonServerSenderContext(connection, sender, this, sessionSPI);
+      // TODO: Remove this check when we have support for global link names
+      boolean outgoing = (sender.getContext() != null && sender.getContext().equals(true));
+      ProtonServerSenderContext protonSender = outgoing ? new ProtonClientSenderContext(connection, sender, this, sessionSPI) : new ProtonServerSenderContext(connection, sender, this, sessionSPI);
 
       try {
          protonSender.initialise();
@@ -205,5 +207,4 @@ public class AMQPSessionContext extends ProtonInitializable {
          receiver.close();
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java
new file mode 100644
index 0000000..10dc87f
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
+ */
+public class ProtonClientSenderContext extends ProtonServerSenderContext {
+   public ProtonClientSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext amqpSessionContext, AMQPSessionCallback sessionSPI) {
+      super(connection, sender, amqpSessionContext, sessionSPI);
+   }
+
+   @Override
+   protected String getClientId() {
+      return connection.getContainer();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index adb7acc..b8f0f2a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -56,6 +56,9 @@ import org.jboss.logging.Logger;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 
+/**
+ * TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
+ */
 public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
@@ -167,7 +170,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       if (source == null) {
          // Attempt to recover a previous subscription happens when a link reattach happens on a
          // subscription queue
-         String clientId = connection.getRemoteContainer();
+         String clientId = getClientId();
          String pubId = sender.getName();
          queue = createQueueName(clientId, pubId);
          QueueQueryResult result = sessionSPI.queueQuery(queue, false);
@@ -232,7 +235,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             // if we are a subscription and durable create a durable queue using the container
             // id and link name
             if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
-               String clientId = connection.getRemoteContainer();
+               String clientId = getClientId();
                String pubId = sender.getName();
                queue = createQueueName(clientId, pubId);
                QueueQueryResult result = sessionSPI.queueQuery(queue, false);
@@ -295,6 +298,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       }
    }
 
+   protected String getClientId() {
+      return connection.getRemoteContainer();
+   }
+
    private boolean isPubSub(Source source) {
       String pubSubPrefix = sessionSPI.getPubSubPrefix();
       return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
@@ -337,7 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
                } else {
-                  String clientId = connection.getRemoteContainer();
+                  String clientId = getClientId();
                   String pubId = sender.getName();
                   String queue = createQueueName(clientId, pubId);
                   result = sessionSPI.queueQuery(queue, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/52a462d1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 53ced4e..e5d2f64 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Enumeration;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -69,7 +70,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
 import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@@ -975,7 +977,7 @@ public class ProtonTest extends ProtonTestBase {
       final Map<String, Object> config = new LinkedHashMap<>();
       config.put(TransportConstants.HOST_PROP_NAME, "localhost");
       config.put(TransportConstants.PORT_PROP_NAME, "5673");
-      ProtonClientConnectionLifeCycleListener lifeCycleListener = new ProtonClientConnectionLifeCycleListener(server, 5000);
+      ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, server.getConfiguration().getName(), 5000), Optional.empty());
       ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
       NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
       connector.start();


[2/3] activemq-artemis git commit: ARTEMIS-814 Moving ProtonClientSenderContext towards the client package

Posted by cl...@apache.org.
ARTEMIS-814 Moving ProtonClientSenderContext towards the client package

I have done it this way to be consistent to what's being documented on the package-info.java for the client package


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ab7adbbd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ab7adbbd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ab7adbbd

Branch: refs/heads/master
Commit: ab7adbbd7a6e6e389c0a36a82bf05824fbd9af7e
Parents: 52a462d
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Nov 2 14:19:51 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 2 14:24:51 2016 -0400

----------------------------------------------------------------------
 .../amqp/client/ProtonClientSenderContext.java  | 37 ++++++++++++++++++++
 .../amqp/proton/AMQPSessionContext.java         |  1 +
 .../amqp/proton/ProtonClientSenderContext.java  | 34 ------------------
 .../amqp/proton/ProtonServerSenderContext.java  |  2 +-
 4 files changed, 39 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab7adbbd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientSenderContext.java
new file mode 100644
index 0000000..c942fe5
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientSenderContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.client;
+
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
+ */
+public class ProtonClientSenderContext extends ProtonServerSenderContext {
+   public ProtonClientSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext amqpSessionContext, AMQPSessionCallback sessionSPI) {
+      super(connection, sender, amqpSessionContext, sessionSPI);
+   }
+
+   @Override
+   protected String getClientId() {
+      return connection.getContainer();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab7adbbd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index dfc8031..b45f4bb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.qpid.proton.amqp.Symbol;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab7adbbd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java
deleted file mode 100644
index 10dc87f..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonClientSenderContext.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.proton;
-
-import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
-import org.apache.qpid.proton.engine.Sender;
-
-/**
- * TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
- */
-public class ProtonClientSenderContext extends ProtonServerSenderContext {
-   public ProtonClientSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext amqpSessionContext, AMQPSessionCallback sessionSPI) {
-      super(connection, sender, amqpSessionContext, sessionSPI);
-   }
-
-   @Override
-   protected String getClientId() {
-      return connection.getContainer();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ab7adbbd/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index b8f0f2a..ef075fc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -57,7 +57,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 
 /**
- * TODO: Merge {@link ProtonServerSenderContext} and {@link ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
+ * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
  */
 public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
 


[3/3] activemq-artemis git commit: This closes #876

Posted by cl...@apache.org.
This closes #876


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8b57516e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8b57516e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8b57516e

Branch: refs/heads/master
Commit: 8b57516e1fc6910af0b497a4b1551a034212571f
Parents: 28645b1 ab7adbb
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Nov 2 14:29:39 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 2 14:29:39 2016 -0400

----------------------------------------------------------------------
 .../client/AMQPClientConnectionFactory.java     |  61 ++++++++++
 ...ProtonClientConnectionLifeCycleListener.java | 110 -------------------
 .../client/ProtonClientConnectionManager.java   |  97 ++++++++++++++++
 .../amqp/client/ProtonClientSenderContext.java  |  37 +++++++
 .../amqp/proton/AMQPConnectionContext.java      |   8 ++
 .../amqp/proton/AMQPSessionContext.java         |   6 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  13 ++-
 .../tests/integration/amqp/ProtonTest.java      |   6 +-
 8 files changed, 221 insertions(+), 117 deletions(-)
----------------------------------------------------------------------