You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/24 19:39:26 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6479

Repository: activemq
Updated Branches:
  refs/heads/master 566075309 -> 24a79413c


https://issues.apache.org/jira/browse/AMQ-6479

Allow a unit test to inspect AMQP frames as part of the test.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/24a79413
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/24a79413
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/24a79413

Branch: refs/heads/master
Commit: 24a79413c66013818795360cf3ecc4489d1e4d5f
Parents: 5660753
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 24 15:39:01 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 24 15:39:13 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpConnection.java   |  39 +++----
 .../amqp/client/AmqpFrameValidator.java         | 103 ++++++++++++++++
 .../amqp/client/AmqpProtocolTracer.java         | 116 +++++++++++++++++++
 .../transport/amqp/client/AmqpSession.java      |  36 +++++-
 .../amqp/interop/AmqpDurableReceiverTest.java   |  26 +++++
 5 files changed, 298 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 3328044..c6c994d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -49,9 +49,7 @@ import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.apache.qpid.proton.framing.TransportFrame;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +60,6 @@ import io.netty.util.ReferenceCountUtil;
 public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
-    private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES");
 
     private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
 
@@ -92,6 +89,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     private List<Symbol> offeredCapabilities = Collections.emptyList();
     private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
 
+    private volatile AmqpFrameValidator sentFrameInspector;
+    private volatile AmqpFrameValidator receivedFrameInspector;
     private AmqpConnectionListener listener;
     private SaslAuthenticator authenticator;
     private String mechanismRestriction;
@@ -159,7 +158,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                         sasl.client();
                     }
                     authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
-                    updateTracer();
+                    ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
                     open(future);
 
                     pumpToProtonTransport(future);
@@ -454,6 +453,22 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
         this.trace = trace;
     }
 
+    public AmqpFrameValidator getSentFrameInspector() {
+        return sentFrameInspector;
+    }
+
+    public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+        this.sentFrameInspector = amqpFrameInspector;
+    }
+
+    public AmqpFrameValidator getReceivedFrameInspector() {
+        return receivedFrameInspector;
+    }
+
+    public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+        this.receivedFrameInspector = amqpFrameInspector;
+    }
+
     //----- Internal getters used from the child AmqpResource classes --------//
 
     ScheduledExecutorService getScheduler() {
@@ -718,22 +733,6 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
         return containerId;
     }
 
-    private void updateTracer() {
-        if (isTraceFrames()) {
-            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
-                @Override
-                public void receivedFrame(TransportFrame transportFrame) {
-                    TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody());
-                }
-
-                @Override
-                public void sentFrame(TransportFrame transportFrame) {
-                    TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody());
-                }
-            });
-        }
-    }
-
     @Override
     public String toString() {
         return "AmqpConnection { " + connectionId + " }";

http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
new file mode 100644
index 0000000..7588572
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpFrameValidator {
+
+    private boolean valid = true;
+    private String errorMessage;
+
+    public void inspectOpen(Open open, Binary encoded) {
+
+    }
+
+    public void inspectBegin(Begin begin, Binary encoded) {
+
+    }
+
+    public void inspectAttach(Attach attach, Binary encoded) {
+
+    }
+
+    public void inspectFlow(Flow flow, Binary encoded) {
+
+    }
+
+    public void inspectTransfer(Transfer transfer, Binary encoded) {
+
+    }
+
+    public void inspectDisposition(Disposition disposition, Binary encoded) {
+
+    }
+
+    public void inspectDetach(Detach detach, Binary encoded) {
+
+    }
+
+    public void inspectEnd(End end, Binary encoded) {
+
+    }
+
+    public void inspectClose(Close close, Binary encoded) {
+
+    }
+
+    public boolean isValid() {
+        return valid;
+    }
+
+    protected void setValid(boolean valid) {
+        this.valid = valid;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    protected void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    protected void markAsInvalid(String errorMessage) {
+        if (valid) {
+            setValid(false);
+            setErrorMessage(errorMessage);
+        }
+    }
+
+    public void assertValid() {
+        if (!isValid()) {
+            throw new AssertionError(errorMessage);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
new file mode 100644
index 0000000..efda4d2
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracer used to spy on AMQP traffic
+ */
+public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
+
+    private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
+
+    private final AmqpConnection connection;
+
+    public AmqpProtocolTracer(AmqpConnection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public void receivedFrame(TransportFrame transportFrame) {
+        if (connection.isTraceFrames()) {
+            TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
+        }
+
+        AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
+        if (inspector != null) {
+            transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+        }
+    }
+
+    @Override
+    public void sentFrame(TransportFrame transportFrame) {
+        if (connection.isTraceFrames()) {
+            TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
+        }
+
+        AmqpFrameValidator inspector = connection.getSentFrameInspector();
+        if (inspector != null) {
+            transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+        }
+    }
+
+    @Override
+    public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
+        context.inspectOpen(open, payload);
+    }
+
+    @Override
+    public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
+        context.inspectBegin(begin, payload);
+    }
+
+    @Override
+    public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
+        context.inspectAttach(attach, payload);
+    }
+
+    @Override
+    public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
+        context.inspectFlow(flow, payload);
+    }
+
+    @Override
+    public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
+        context.inspectTransfer(transfer, payload);
+    }
+
+    @Override
+    public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
+        context.inspectDisposition(disposition, payload);
+    }
+
+    @Override
+    public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
+        context.inspectDetach(detach, payload);
+    }
+
+    @Override
+    public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
+        context.inspectEnd(end, payload);
+    }
+
+    @Override
+    public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
+        context.inspectClose(close, payload);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 7cb745c..b239dae 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -143,16 +143,32 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
      * Create a sender instance using the given Target
      *
      * @param target
-     *        the caller created and configured Traget used to create the sender link.
+     *        the caller created and configured Target used to create the sender link.
      *
      * @return a newly created sender that is ready for use.
      *
      * @throws Exception if an error occurs while creating the receiver.
      */
     public AmqpSender createSender(Target target) throws Exception {
+        return createSender(target, getNextSenderId());
+    }
+
+    /**
+     * Create a sender instance using the given Target
+     *
+     * @param target
+     *        the caller created and configured Target used to create the sender link.
+     * @param sender
+     *        the sender ID to assign to the newly created Sender.
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpSender createSender(Target target, String senderId) throws Exception {
         checkClosed();
 
-        final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
+        final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
         final ClientFuture request = new ClientFuture();
 
         connection.getScheduler().execute(new Runnable() {
@@ -274,6 +290,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
      * @throws Exception if an error occurs while creating the receiver.
      */
     public AmqpReceiver createReceiver(Source source) throws Exception {
+        return createReceiver(source, getNextReceiverId());
+    }
+
+    /**
+     * Create a receiver instance using the given Source
+     *
+     * @param source
+     *        the caller created and configured Source used to create the receiver link.
+     * @param receivedId
+     *        the ID value to assign to the newly created receiver
+     *
+     * @return a newly created receiver that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the receiver.
+     */
+    public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
         checkClosed();
 
         final ClientFuture request = new ClientFuture();

http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index 3db3301..c5636db 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -29,12 +29,15 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.Detach;
 import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Test;
 
@@ -81,6 +84,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         connection.setContainerId(getTestName());
         connection.connect();
 
+        connection.setReceivedFrameInspector(new AmqpFrameValidator() {
+
+            @Override
+            public void inspectDetach(Detach detach, Binary encoded) {
+                if (detach.getClosed()) {
+                    markAsInvalid("Remote should have detached but closed instead.");
+                }
+            }
+        });
+
+        connection.setSentFrameInspector(new AmqpFrameValidator() {
+
+            @Override
+            public void inspectDetach(Detach detach, Binary encoded) {
+                if (detach.getClosed()) {
+                    markAsInvalid("Client should have detached but closed instead.");
+                }
+            }
+        });
+
         AmqpSession session = connection.createSession();
         AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
 
@@ -94,6 +117,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         assertEquals(0, brokerView.getDurableTopicSubscribers().length);
         assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
 
+        connection.getSentFrameInspector().assertValid();
+        connection.getReceivedFrameInspector().assertValid();
+
         connection.close();
     }