You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/16 22:51:53 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5666

Repository: activemq
Updated Branches:
  refs/heads/master 559f52a2a -> 934ad44ad


https://issues.apache.org/jira/browse/AMQ-5666

Add some initial tests for durable subscription handling.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/934ad44a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/934ad44a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/934ad44a

Branch: refs/heads/master
Commit: 934ad44add2a73b9bc3b5ac4e9ce2925f747f8a1
Parents: 559f52a
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Mar 16 17:51:45 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Mar 16 17:51:45 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpTestSupport.java         |   7 +-
 .../amqp/client/AmqpAbstractResource.java       |  60 ++++++--
 .../transport/amqp/client/AmqpReceiver.java     |  38 ++++-
 .../transport/amqp/client/AmqpResource.java     |   9 ++
 .../transport/amqp/client/AmqpSession.java      |  34 +++++
 .../amqp/interop/AmqpDurableReceiverTest.java   | 139 +++++++++++++++++++
 6 files changed, 268 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 33ae799..cf4fa95 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -42,6 +42,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spring.SpringSslContext;
 import org.apache.activemq.store.kahadb.KahaDBStore;
@@ -320,10 +321,10 @@ public class AmqpTestSupport {
         return proxy;
     }
 
-    protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
+    protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
         ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
-        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
-                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true);
         return proxy;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 8a5a587..b5a6324 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -66,25 +66,57 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
     }
 
     @Override
-    public void close(AsyncResult request) {
+    public void detach(AsyncResult request) {
         // If already closed signal success or else the caller might never get notified.
         if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
             getEndpoint().getRemoteState() == EndpointState.CLOSED) {
 
             if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
-                // Remote already closed this resource, close locally and free.
-                if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
-                    doClose();
-                    getEndpoint().free();
-                }
+                doDetach();
+                getEndpoint().free();
             }
 
             request.onSuccess();
-            return;
+        } else {
+            this.closeRequest = request;
+            doDetach();
         }
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // If already closed signal success or else the caller might never get notified.
+        if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
+            getEndpoint().getRemoteState() == EndpointState.CLOSED) {
 
-        this.closeRequest = request;
-        doClose();
+            if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+                doClose();
+                getEndpoint().free();
+            }
+
+            request.onSuccess();
+        } else {
+            this.closeRequest = request;
+            doClose();
+        }
+//        // If already closed signal success or else the caller might never get notified.
+//        if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
+//            getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+//
+//            if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+//                // Remote already closed this resource, close locally and free.
+//                if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
+//                    doClose();
+//                    getEndpoint().free();
+//                }
+//            }
+//
+//            request.onSuccess();
+//            return;
+//        }
+//
+//        this.closeRequest = request;
+//        doClose();
     }
 
     @Override
@@ -278,6 +310,16 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
     }
 
     /**
+     * Perform the detach operation on the managed endpoint.
+     *
+     * By default this method throws an UnsupportedOperationException, a subclass
+     * must implement this and do a detach if its resource supports that.
+     */
+    protected void doDetach() {
+        throw new UnsupportedOperationException("Endpoint cannot be detached.");
+    }
+
+    /**
      * Complete the open operation on the managed endpoint. A subclass may
      * override this method to provide additional verification actions or configuration
      * updates.

http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index ec37710..ff530b9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -94,10 +94,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
-     * Close the sender, a closed sender will throw exceptions if any further send
+     * Close the receiver, a closed receiver will throw exceptions if any further send
      * calls are made.
      *
-     * @throws IOException if an error occurs while closing the sender.
+     * @throws IOException if an error occurs while closing the receiver.
      */
     public void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
@@ -117,6 +117,29 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
+     * Detach the receiver, a closed receiver will throw exceptions if any further send
+     * calls are made.
+     *
+     * @throws IOException if an error occurs while closing the receiver.
+     */
+    public void detach() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            final ClientFuture request = new ClientFuture();
+            session.getScheduler().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    checkClosed();
+                    detach(request);
+                    session.pumpToProtonTransport();
+                }
+            });
+
+            request.sync();
+        }
+    }
+
+    /**
      * @return this session's parent AmqpSession.
      */
     public AmqpSession getSession() {
@@ -442,11 +465,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
 
     @Override
     protected void doClose() {
-        if (isDurable()) {
-            getEndpoint().detach();
-        } else {
-            getEndpoint().close();
-        }
+        getEndpoint().close();
+    }
+
+    @Override
+    protected void doDetach() {
+        getEndpoint().detach();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
index b4e6215..f20fd7c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java
@@ -58,6 +58,15 @@ public interface AmqpResource {
     void close(AsyncResult request);
 
     /**
+     * Perform all work needed to detach this resource and store the request
+     * until such time as the remote peer indicates the resource has been detached.
+     *
+     * @param request
+     *        The initiating request that triggered this detach call.
+     */
+    void detach(AsyncResult request);
+
+    /**
      * @return if the resource has moved to the closed state on the remote.
      */
     boolean isClosed();

http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 9368b26..b2fc2f1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -111,6 +111,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     }
 
     /**
+     * Create a receiver instance using the given address that creates a durable subscription.
+     *
+     * @param address
+     *        the address to which the receiver will subscribe for its messages.
+     * @param subscriptionName
+     *        the name of the subscription that is being created.
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createDurableReceiver(String address, String subscriptionName) throws Exception {
+        checkClosed();
+
+        final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId());
+        receiver.setSubscriptionName(subscriptionName);
+        final ClientFuture request = new ClientFuture();
+
+        connection.getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+                checkClosed();
+                receiver.open(request);
+                pumpToProtonTransport();
+            }
+        });
+
+        request.sync();
+
+        return receiver;
+    }
+
+    /**
      * @return this session's parent AmqpConnection.
      */
     public AmqpConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
new file mode 100644
index 0000000..7fd6080
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+/**
+ * Tests for broker side support of the Durable Subscription mapping for JMS.
+ */
+public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
+
+    @Override
+    protected boolean isUseOpenWireConnector() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateDurableReceiver() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getDurableTopicSubscribers().length == 1;
+            }
+
+        }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testDetachedDurableReceiverRemainsActive() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getDurableTopicSubscribers().length == 1;
+            }
+
+        }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        receiver.detach();
+
+        assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getInactiveDurableTopicSubscribers().length == 1;
+            }
+
+        }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCloseDurableReceiverRemovesSubscription() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getDurableTopicSubscribers().length == 1;
+            }
+
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        receiver.close();
+
+        assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getDurableTopicSubscribers().length == 0 &&
+                       brokerView.getInactiveDurableTopicSubscribers().length == 0;
+            }
+
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
+
+        connection.close();
+    }
+}
\ No newline at end of file