You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:33 UTC

[04/34] activemq-artemis git commit: ARTEMIS-820 AMQP: Add frame inspection capability to the test client.

ARTEMIS-820 AMQP: Add frame inspection capability to the test client.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/490bd31c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/490bd31c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/490bd31c

Branch: refs/heads/ARTEMIS-780
Commit: 490bd31c4b07df1e3d8b9636afdab9e9f89e81d6
Parents: 1ac69fd
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Oct 25 17:15:43 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Oct 25 14:15:28 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpConnection.java   |  31 ++++-
 .../amqp/client/AmqpFrameValidator.java         | 103 ++++++++++++++++
 .../amqp/client/AmqpProtocolTracer.java         | 116 +++++++++++++++++++
 .../transport/amqp/client/AmqpSession.java      |  57 ++++++++-
 .../amqp/AmqpDurableReceiverTest.java           |  26 +++++
 5 files changed, 328 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 53fb9f5..01c60bc 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -50,6 +50,7 @@ import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +88,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private List<Symbol> offeredCapabilities = Collections.emptyList();
    private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
 
+   private volatile AmqpFrameValidator sentFrameInspector;
+   private volatile AmqpFrameValidator receivedFrameInspector;
    private AmqpConnectionListener listener;
    private SaslAuthenticator authenticator;
    private String mechanismRestriction;
@@ -100,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
    private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
    private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
+   private boolean trace;
 
    public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
                          String username,
@@ -155,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                   sasl.client();
                }
                authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
+               ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
                open(future);
 
                pumpToProtonTransport(future);
@@ -439,6 +444,30 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
       return mechanismRestriction;
    }
 
+   public boolean isTraceFrames() {
+      return trace;
+   }
+
+   public void setTraceFrames(boolean trace) {
+      this.trace = trace;
+   }
+
+   public AmqpFrameValidator getSentFrameInspector() {
+      return sentFrameInspector;
+   }
+
+   public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+      this.sentFrameInspector = amqpFrameInspector;
+   }
+
+   public AmqpFrameValidator getReceivedFrameInspector() {
+      return receivedFrameInspector;
+   }
+
+   public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+      this.receivedFrameInspector = amqpFrameInspector;
+   }
+
    //----- Internal getters used from the child AmqpResource classes --------//
 
    ScheduledExecutorService getScheduler() {
@@ -706,4 +735,4 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    public String toString() {
       return "AmqpConnection { " + connectionId + " }";
    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
new file mode 100644
index 0000000..4796110
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpFrameValidator {
+
+   private boolean valid = true;
+   private String errorMessage;
+
+   public void inspectOpen(Open open, Binary encoded) {
+
+   }
+
+   public void inspectBegin(Begin begin, Binary encoded) {
+
+   }
+
+   public void inspectAttach(Attach attach, Binary encoded) {
+
+   }
+
+   public void inspectFlow(Flow flow, Binary encoded) {
+
+   }
+
+   public void inspectTransfer(Transfer transfer, Binary encoded) {
+
+   }
+
+   public void inspectDisposition(Disposition disposition, Binary encoded) {
+
+   }
+
+   public void inspectDetach(Detach detach, Binary encoded) {
+
+   }
+
+   public void inspectEnd(End end, Binary encoded) {
+
+   }
+
+   public void inspectClose(Close close, Binary encoded) {
+
+   }
+
+   public boolean isValid() {
+      return valid;
+   }
+
+   protected void setValid(boolean valid) {
+      this.valid = valid;
+   }
+
+   public String getErrorMessage() {
+      return errorMessage;
+   }
+
+   protected void setErrorMessage(String errorMessage) {
+      this.errorMessage = errorMessage;
+   }
+
+   protected void markAsInvalid(String errorMessage) {
+      if (valid) {
+         setValid(false);
+         setErrorMessage(errorMessage);
+      }
+   }
+
+   public void assertValid() {
+      if (!isValid()) {
+         throw new AssertionError(errorMessage);
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
new file mode 100644
index 0000000..68fcd85
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracer used to spy on AMQP traffic
+ */
+public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
+
+   private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
+
+   private final AmqpConnection connection;
+
+   public AmqpProtocolTracer(AmqpConnection connection) {
+      this.connection = connection;
+   }
+
+   @Override
+   public void receivedFrame(TransportFrame transportFrame) {
+      if (connection.isTraceFrames()) {
+         TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
+      }
+
+      AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
+      if (inspector != null) {
+         transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+      }
+   }
+
+   @Override
+   public void sentFrame(TransportFrame transportFrame) {
+      if (connection.isTraceFrames()) {
+         TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
+      }
+
+      AmqpFrameValidator inspector = connection.getSentFrameInspector();
+      if (inspector != null) {
+         transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+      }
+   }
+
+   @Override
+   public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
+      context.inspectOpen(open, payload);
+   }
+
+   @Override
+   public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
+      context.inspectBegin(begin, payload);
+   }
+
+   @Override
+   public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
+      context.inspectAttach(attach, payload);
+   }
+
+   @Override
+   public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
+      context.inspectFlow(flow, payload);
+   }
+
+   @Override
+   public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
+      context.inspectTransfer(transfer, payload);
+   }
+
+   @Override
+   public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
+      context.inspectDisposition(disposition, payload);
+   }
+
+   @Override
+   public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
+      context.inspectDetach(detach, payload);
+   }
+
+   @Override
+   public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
+      context.inspectEnd(end, payload);
+   }
+
+   @Override
+   public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
+      context.inspectClose(close, payload);
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 936d4ef..fc3fdf7 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
+import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
@@ -38,6 +40,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
    private final AmqpConnection connection;
    private final String sessionId;
    private final AmqpTransactionContext txContext;
+   private final AtomicBoolean closed = new AtomicBoolean();
 
    /**
     * Create a new session instance.
@@ -52,6 +55,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
    }
 
    /**
+    * Close the receiver, a closed receiver will throw exceptions if any further send
+    * calls are made.
+    *
+    * @throws IOException if an error occurs while closing the receiver.
+    */
+   public void close() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         final ClientFuture request = new ClientFuture();
+         getScheduler().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               checkClosed();
+               close(request);
+               pumpToProtonTransport(request);
+            }
+         });
+
+         request.sync();
+      }
+   }
+
+   /**
+    * Create an anonymous sender.
+    *
+    * @return a newly created sender that is ready for use.
+    *
+    * @throws Exception if an error occurs while creating the sender.
+    */
+   public AmqpSender createSender() throws Exception {
+      return createSender(null, false);
+   }
+
+   /**
     * Create a sender instance using the given address
     *
     * @param address the address to which the sender will produce its messages.
@@ -101,9 +138,21 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     * @throws Exception if an error occurs while creating the receiver.
     */
    public AmqpSender createSender(Target target) throws Exception {
+      return createSender(target, getNextSenderId());
+   }
+
+   /**
+    * Create a sender instance using the given Target
+    *
+    * @param target the caller created and configured Traget used to create the sender link.
+    * @param senderId the sender ID to assign to the newly created Sender.
+    * @return a newly created sender that is ready for use.
+    * @throws Exception if an error occurs while creating the receiver.
+    */
+   public AmqpSender createSender(Target target, String senderId) throws Exception {
       checkClosed();
 
-      final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
+      final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
       final ClientFuture request = new ClientFuture();
 
       connection.getScheduler().execute(new Runnable() {
@@ -222,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
-      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+      final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
 
       connection.getScheduler().execute(new Runnable() {
 
@@ -465,4 +514,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
          throw new IllegalStateException("Session is already closed");
       }
    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
index e0c6b6c..86a35a2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -26,14 +26,17 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.Detach;
 import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -90,6 +93,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
       connection.setContainerId(getContainerID());
       connection.connect();
 
+      connection.setReceivedFrameInspector(new AmqpFrameValidator() {
+
+         @Override
+         public void inspectDetach(Detach detach, Binary encoded) {
+            if (detach.getClosed()) {
+               markAsInvalid("Remote should have detached but closed instead.");
+            }
+         }
+      });
+
+      connection.setSentFrameInspector(new AmqpFrameValidator() {
+
+         @Override
+         public void inspectDetach(Detach detach, Binary encoded) {
+            if (detach.getClosed()) {
+               markAsInvalid("Client should have detached but closed instead.");
+            }
+         }
+      });
+
       AmqpSession session = connection.createSession();
       AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
 
@@ -99,6 +122,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
 
       assertEquals(getTopicName(), lookupSubscription());
 
+      connection.getSentFrameInspector().assertValid();
+      connection.getReceivedFrameInspector().assertValid();
+
       connection.close();
    }