You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/24 13:42:42 UTC

[1/5] activemq-artemis git commit: This closes #855

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 224770168 -> 0f69bbdf0


This closes #855


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0f69bbdf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0f69bbdf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0f69bbdf

Branch: refs/heads/master
Commit: 0f69bbdf0360f1a4f7057a29e939bd0dcffed353
Parents: 2247701 2d1bdcd
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 24 09:42:22 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 24 09:42:22 2016 -0400

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnector.java     |   1 +
 ...ProtonClientConnectionLifeCycleListener.java | 110 +++++++++++++++++
 .../client/ProtonClientProtocolManager.java     | 119 +++++++++++++++++++
 .../protocol/amqp/client/package-info.java      |  22 ++++
 .../amqp/proton/AMQPConnectionContext.java      |   8 +-
 .../amqp/proton/handler/ProtonHandler.java      |   7 ++
 .../tests/integration/amqp/ProtonTest.java      |  55 +++++++++
 .../tests/integration/amqp/ProtonTestBase.java  |  19 +--
 8 files changed, 333 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/5] activemq-artemis git commit: ARTEMIS-814: Add support for outgoing AMQP connections

Posted by cl...@apache.org.
ARTEMIS-814: Add support for outgoing AMQP connections


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e65fd5d6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e65fd5d6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e65fd5d6

Branch: refs/heads/master
Commit: e65fd5d6748060ed8b54e05ef5b915964a0522b2
Parents: 6e5b917
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Fri Oct 21 15:24:16 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 24 09:42:22 2016 -0400

----------------------------------------------------------------------
 ...ProtonClientConnectionLifeCycleListener.java | 107 +++++++++++++++++
 .../broker/ProtonClientProtocolManager.java     | 117 +++++++++++++++++++
 .../amqp/proton/AMQPConnectionContext.java      |   4 +
 .../amqp/proton/handler/ProtonHandler.java      |   7 ++
 .../tests/integration/amqp/ProtonTest.java      |  55 +++++++++
 .../tests/integration/amqp/ProtonTestBase.java  |  19 +--
 6 files changed, 302 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e65fd5d6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java
new file mode 100644
index 0000000..cf43444
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.jboss.logging.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Manages the lifecycle of a proton client connection.
+ */
+public class ProtonClientConnectionLifeCycleListener implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
+   private final Map<Object, ConnectionEntry> connectionMap = new ConcurrentHashMap<>();
+   private final ActiveMQServer server;
+   private final int ttl;
+   private static final Logger log = Logger.getLogger(ProtonClientConnectionLifeCycleListener.class);
+
+   public ProtonClientConnectionLifeCycleListener(ActiveMQServer server, int ttl) {
+      this.server = server;
+      this.ttl = ttl;
+   }
+
+   @Override
+   public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
+      AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
+
+      String id = server.getConfiguration().getName();
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+      Executor executor = server.getExecutorFactory().getExecutor();
+      amqpConnection.open();
+
+      ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
+
+      connectionCallback.setProtonConnectionDelegate(delegate);
+
+      ConnectionEntry connectionEntry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
+      connectionMap.put(connection.getID(), connectionEntry);
+      log.info("Connection " + connection.getRemoteAddress() + " created");
+   }
+
+   @Override
+   public void connectionDestroyed(Object connectionID) {
+      ConnectionEntry connection = connectionMap.remove(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.connection.getRemoteAddress() + " destroyed");
+         connection.connection.disconnect(false);
+      }
+   }
+
+   @Override
+   public void connectionException(Object connectionID, ActiveMQException me) {
+      ConnectionEntry connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.connection.getRemoteAddress() + " exception: " + me.getMessage());
+         connection.connection.fail(me);
+      }
+   }
+
+   @Override
+   public void connectionReadyForWrites(Object connectionID, boolean ready) {
+      ConnectionEntry connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.connection.getRemoteAddress() + " ready");
+         connection.connection.getTransportConnection().fireReady(true);
+      }
+   }
+
+   public void stop() {
+      for (ConnectionEntry entry : connectionMap.values()) {
+         entry.connection.disconnect(false);
+      }
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+      ConnectionEntry entry = connectionMap.get(connectionID);
+      if (entry != null) {
+         entry.connection.bufferReceived(connectionID, buffer);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e65fd5d6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java
new file mode 100644
index 0000000..eca02d7
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.broker;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
+
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Handles proton protocol management for clients, mapping the {@link ProtonProtocolManager} to the {@link org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager} API.
+ *
+ * TODO: Find a common base for ProtocolManager and ClientProtocolManager.
+ */
+public class ProtonClientProtocolManager extends ProtonProtocolManager implements ClientProtocolManager {
+
+   public ProtonClientProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
+      super(factory, server);
+   }
+
+   @Override
+   public void stop() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public RemotingConnection getCurrentConnection() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Lock lockSessionCreation() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean waitOnLatch(long milliseconds) throws InterruptedException {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean isAlive() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline) {
+   }
+
+   @Override
+   public void sendSubscribeTopology(boolean isServer) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void ping(long connectionTTL) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public SessionContext createSessionContext(String name, String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean cleanupBeforeFailover(ActiveMQException cause) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean checkForFailover(String liveNodeID) throws ActiveMQException {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void setSessionFactory(ClientSessionFactory factory) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public ClientSessionFactory getSessionFactory() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public String getName() {
+      throw new UnsupportedOperationException();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e65fd5d6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 65ed836..8957d99 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -205,6 +205,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
       return ExtCapability.getCapabilities();
    }
 
+   public void open() {
+      handler.open(containerId);
+   }
+
    // This listener will perform a bunch of things here
    class LocalListener implements EventHandler {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e65fd5d6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 3c088d5..b4ddda0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -357,4 +357,11 @@ public class ProtonHandler extends ProtonInitializable {
       }
 
    }
+
+   public void open(String containerId) {
+      this.transport.open();
+      this.connection.setContainer(containerId);
+      this.connection.open();
+      flush();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e65fd5d6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 15271f6..c1d1b71 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
@@ -62,9 +63,15 @@ import javax.jms.TopicSubscriber;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonClientProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -949,6 +956,54 @@ public class ProtonTest extends ProtonTestBase {
 
    }
 
+   @Test
+   public void testOutboundConnection() throws Throwable {
+      final ActiveMQServer remote = createAMQPServer(5673);
+      remote.start();
+      try {
+         Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+               return remote.isActive();
+            }
+         });
+      } catch (Exception e) {
+         remote.stop();
+         throw e;
+      }
+
+      final Map<String, Object> config = new LinkedHashMap<>();
+      config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      config.put(TransportConstants.PORT_PROP_NAME, "5673");
+      ProtonClientConnectionLifeCycleListener lifeCycleListener = new ProtonClientConnectionLifeCycleListener(server, 5000);
+      ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+      NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
+      connector.start();
+      connector.createConnection();
+
+      try {
+         Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+               return remote.getConnectionCount() > 0;
+            }
+         });
+         assertEquals(1, remote.getConnectionCount());
+
+         lifeCycleListener.stop();
+         Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+               return remote.getConnectionCount() == 0;
+            }
+         });
+         assertEquals(0, remote.getConnectionCount());
+      } finally {
+         lifeCycleListener.stop();
+         remote.stop();
+      }
+   }
+
    /*
    // Uncomment testLoopBrowser to validate the hunging on the test
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e65fd5d6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
index 9dacd66..a57e2ea 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,23 +43,27 @@ public class ProtonTestBase extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      server = this.createServer(true, true);
+      server = this.createAMQPServer(5672);
+      server.start();
+   }
+
+   protected ActiveMQServer createAMQPServer(int port) throws Exception {
+      final ActiveMQServer amqpServer = this.createServer(true, true);
       HashMap<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PORT_PROP_NAME, "5672");
+      params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
       HashMap<String, Object> amqpParams = new HashMap<>();
       configureAmqp(amqpParams);
       TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
 
-      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
-      server.getConfiguration().setName(brokerName);
+      amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration));
+      amqpServer.getConfiguration().setName(brokerName);
 
       // Default Page
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      server.getConfiguration().getAddressesSettings().put("#", addressSettings);
-
-      server.start();
+      amqpServer.getConfiguration().getAddressesSettings().put("#", addressSettings);
+      return amqpServer;
    }
 
    protected void configureAmqp(Map<String, Object> params) {


[3/5] activemq-artemis git commit: ARTEMIS-814: Notify connection listener of connection created

Posted by cl...@apache.org.
ARTEMIS-814: Notify connection listener of connection created


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b2a5fe19
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b2a5fe19
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b2a5fe19

Branch: refs/heads/master
Commit: b2a5fe19d30ad6114db2c4cc1d560e8be9945cd2
Parents: 2247701
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Fri Oct 21 15:14:51 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 24 09:42:22 2016 -0400

----------------------------------------------------------------------
 .../activemq/artemis/core/remoting/impl/netty/NettyConnector.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2a5fe19/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 346e866..c809b96 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -912,6 +912,7 @@ public class NettyConnector extends AbstractConnector {
          if (connections.putIfAbsent(connection.getID(), connection) != null) {
             throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID());
          }
+         listener.connectionCreated(component, connection, protocol);
       }
 
       @Override


[5/5] activemq-artemis git commit: ARTEMIS-814: Fix a bug where context could be null in case a connection was closed before the flow arrived

Posted by cl...@apache.org.
ARTEMIS-814: Fix a bug where context could be null in case a connection was closed before the flow arrived


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6e5b917c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6e5b917c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6e5b917c

Branch: refs/heads/master
Commit: 6e5b917cc5c83ba9771e8f884b07eb271affbeff
Parents: b2a5fe1
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Fri Oct 21 15:22:46 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 24 09:42:22 2016 -0400

----------------------------------------------------------------------
 .../artemis/protocol/amqp/proton/AMQPConnectionContext.java      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e5b917c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 1f193eb..65ed836 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -372,7 +372,9 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
       @Override
       public void onFlow(Link link) throws Exception {
-         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
+         if (link.getContext() != null) {
+            ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
+         }
       }
 
       @Override


[4/5] activemq-artemis git commit: ARTEMIS-814: Moving classes around and adding docs

Posted by cl...@apache.org.
ARTEMIS-814: Moving classes around and adding docs


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2d1bdcd5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2d1bdcd5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2d1bdcd5

Branch: refs/heads/master
Commit: 2d1bdcd5bde39817b635e899061c6639ad62e754
Parents: e65fd5d
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 24 09:00:52 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 24 09:42:22 2016 -0400

----------------------------------------------------------------------
 ...ProtonClientConnectionLifeCycleListener.java | 107 -----------------
 .../broker/ProtonClientProtocolManager.java     | 117 ------------------
 ...ProtonClientConnectionLifeCycleListener.java | 110 +++++++++++++++++
 .../client/ProtonClientProtocolManager.java     | 119 +++++++++++++++++++
 .../protocol/amqp/client/package-info.java      |  22 ++++
 .../tests/integration/amqp/ProtonTest.java      |   4 +-
 6 files changed, 253 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d1bdcd5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java
deleted file mode 100644
index cf43444..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientConnectionLifeCycleListener.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.broker;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.server.ActiveMQComponent;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
-import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
-import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
-import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.jboss.logging.Logger;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-/**
- * Manages the lifecycle of a proton client connection.
- */
-public class ProtonClientConnectionLifeCycleListener implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
-   private final Map<Object, ConnectionEntry> connectionMap = new ConcurrentHashMap<>();
-   private final ActiveMQServer server;
-   private final int ttl;
-   private static final Logger log = Logger.getLogger(ProtonClientConnectionLifeCycleListener.class);
-
-   public ProtonClientConnectionLifeCycleListener(ActiveMQServer server, int ttl) {
-      this.server = server;
-      this.ttl = ttl;
-   }
-
-   @Override
-   public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
-      AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
-
-      String id = server.getConfiguration().getName();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
-      Executor executor = server.getExecutorFactory().getExecutor();
-      amqpConnection.open();
-
-      ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
-
-      connectionCallback.setProtonConnectionDelegate(delegate);
-
-      ConnectionEntry connectionEntry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
-      connectionMap.put(connection.getID(), connectionEntry);
-      log.info("Connection " + connection.getRemoteAddress() + " created");
-   }
-
-   @Override
-   public void connectionDestroyed(Object connectionID) {
-      ConnectionEntry connection = connectionMap.remove(connectionID);
-      if (connection != null) {
-         log.info("Connection " + connection.connection.getRemoteAddress() + " destroyed");
-         connection.connection.disconnect(false);
-      }
-   }
-
-   @Override
-   public void connectionException(Object connectionID, ActiveMQException me) {
-      ConnectionEntry connection = connectionMap.get(connectionID);
-      if (connection != null) {
-         log.info("Connection " + connection.connection.getRemoteAddress() + " exception: " + me.getMessage());
-         connection.connection.fail(me);
-      }
-   }
-
-   @Override
-   public void connectionReadyForWrites(Object connectionID, boolean ready) {
-      ConnectionEntry connection = connectionMap.get(connectionID);
-      if (connection != null) {
-         log.info("Connection " + connection.connection.getRemoteAddress() + " ready");
-         connection.connection.getTransportConnection().fireReady(true);
-      }
-   }
-
-   public void stop() {
-      for (ConnectionEntry entry : connectionMap.values()) {
-         entry.connection.disconnect(false);
-      }
-   }
-
-   @Override
-   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
-      ConnectionEntry entry = connectionMap.get(connectionID);
-      if (entry != null) {
-         entry.connection.bufferReceived(connectionID, buffer);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d1bdcd5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java
deleted file mode 100644
index eca02d7..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonClientProtocolManager.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.broker;
-
-import io.netty.channel.ChannelPipeline;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
-import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
-
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-
-/**
- * Handles proton protocol management for clients, mapping the {@link ProtonProtocolManager} to the {@link org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager} API.
- *
- * TODO: Find a common base for ProtocolManager and ClientProtocolManager.
- */
-public class ProtonClientProtocolManager extends ProtonProtocolManager implements ClientProtocolManager {
-
-   public ProtonClientProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
-      super(factory, server);
-   }
-
-   @Override
-   public void stop() {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public RemotingConnection getCurrentConnection() {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public Lock lockSessionCreation() {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public boolean waitOnLatch(long milliseconds) throws InterruptedException {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public boolean isAlive() {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void addChannelHandlers(ChannelPipeline pipeline) {
-   }
-
-   @Override
-   public void sendSubscribeTopology(boolean isServer) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void ping(long connectionTTL) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public SessionContext createSessionContext(String name, String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public boolean cleanupBeforeFailover(ActiveMQException cause) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public boolean checkForFailover(String liveNodeID) throws ActiveMQException {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void setSessionFactory(ClientSessionFactory factory) {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public ClientSessionFactory getSessionFactory() {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public String getName() {
-      throw new UnsupportedOperationException();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d1bdcd5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java
new file mode 100644
index 0000000..12da243
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionLifeCycleListener.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.client;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.jboss.logging.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Manages the lifecycle of a proton client connection.
+ */
+public class ProtonClientConnectionLifeCycleListener implements BaseConnectionLifeCycleListener<ProtonProtocolManager>, BufferHandler {
+   private final Map<Object, ConnectionEntry> connectionMap = new ConcurrentHashMap<>();
+   private final ActiveMQServer server;
+   private final int ttl;
+   private static final Logger log = Logger.getLogger(ProtonClientConnectionLifeCycleListener.class);
+
+   public ProtonClientConnectionLifeCycleListener(ActiveMQServer server, int ttl) {
+      this.server = server;
+      this.ttl = ttl;
+   }
+
+   @Override
+   public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
+      AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
+
+      String id = server.getConfiguration().getName();
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+      Executor executor = server.getExecutorFactory().getExecutor();
+      amqpConnection.open();
+
+      ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
+
+      connectionCallback.setProtonConnectionDelegate(delegate);
+
+      ConnectionEntry connectionEntry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
+      connectionMap.put(connection.getID(), connectionEntry);
+      log.info("Connection " + connection.getRemoteAddress() + " created");
+   }
+
+   @Override
+   public void connectionDestroyed(Object connectionID) {
+      ConnectionEntry connection = connectionMap.remove(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.connection.getRemoteAddress() + " destroyed");
+         connection.connection.disconnect(false);
+      }
+   }
+
+   @Override
+   public void connectionException(Object connectionID, ActiveMQException me) {
+      ConnectionEntry connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.connection.getRemoteAddress() + " exception: " + me.getMessage());
+         connection.connection.fail(me);
+      }
+   }
+
+   @Override
+   public void connectionReadyForWrites(Object connectionID, boolean ready) {
+      ConnectionEntry connection = connectionMap.get(connectionID);
+      if (connection != null) {
+         log.info("Connection " + connection.connection.getRemoteAddress() + " ready");
+         connection.connection.getTransportConnection().fireReady(true);
+      }
+   }
+
+   public void stop() {
+      for (ConnectionEntry entry : connectionMap.values()) {
+         entry.connection.disconnect(false);
+      }
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+      ConnectionEntry entry = connectionMap.get(connectionID);
+      if (entry != null) {
+         entry.connection.bufferReceived(connectionID, buffer);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d1bdcd5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
new file mode 100644
index 0000000..d158841
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientProtocolManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.client;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
+
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Handles proton protocol management for clients, mapping the {@link ProtonProtocolManager} to the {@link org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager} API.
+ * This is currently very basic and only supports Connecting to a broker,
+ * which will be useful in scenarios where the broker needs to connect to another broker through AMQP into another broker (like Interconnect) that will perform extra functionality.
+ */
+public class ProtonClientProtocolManager extends ProtonProtocolManager implements ClientProtocolManager {
+
+   public ProtonClientProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
+      super(factory, server);
+   }
+
+   @Override
+   public void stop() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public RemotingConnection getCurrentConnection() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Lock lockSessionCreation() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean waitOnLatch(long milliseconds) throws InterruptedException {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean isAlive() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline) {
+   }
+
+   @Override
+   public void sendSubscribeTopology(boolean isServer) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void ping(long connectionTTL) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public SessionContext createSessionContext(String name, String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean cleanupBeforeFailover(ActiveMQException cause) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean checkForFailover(String liveNodeID) throws ActiveMQException {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void setSessionFactory(ClientSessionFactory factory) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public ClientSessionFactory getSessionFactory() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public String getName() {
+      throw new UnsupportedOperationException();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d1bdcd5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/package-info.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/package-info.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/package-info.java
new file mode 100644
index 0000000..ec8855e
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This contains a very limited ClientProtocolmanager for AMQP / Proton
+ * Where it only satisfies very basic functionality.
+ */
+package org.apache.activemq.artemis.protocol.amqp.client;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d1bdcd5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index c1d1b71..53ced4e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -69,8 +69,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonClientConnectionLifeCycleListener;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonClientProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;