You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:33 UTC
[04/34] activemq-artemis git commit: ARTEMIS-820 AMQP: Add frame
inspection capability to the test client.
ARTEMIS-820 AMQP: Add frame inspection capability to the test client.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/490bd31c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/490bd31c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/490bd31c
Branch: refs/heads/ARTEMIS-780
Commit: 490bd31c4b07df1e3d8b9636afdab9e9f89e81d6
Parents: 1ac69fd
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Oct 25 17:15:43 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Oct 25 14:15:28 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpConnection.java | 31 ++++-
.../amqp/client/AmqpFrameValidator.java | 103 ++++++++++++++++
.../amqp/client/AmqpProtocolTracer.java | 116 +++++++++++++++++++
.../transport/amqp/client/AmqpSession.java | 57 ++++++++-
.../amqp/AmqpDurableReceiverTest.java | 26 +++++
5 files changed, 328 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 53fb9f5..01c60bc 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -50,6 +50,7 @@ import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +88,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+ private volatile AmqpFrameValidator sentFrameInspector;
+ private volatile AmqpFrameValidator receivedFrameInspector;
private AmqpConnectionListener listener;
private SaslAuthenticator authenticator;
private String mechanismRestriction;
@@ -100,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
+ private boolean trace;
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
String username,
@@ -155,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
sasl.client();
}
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
+ ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
open(future);
pumpToProtonTransport(future);
@@ -439,6 +444,30 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return mechanismRestriction;
}
+ public boolean isTraceFrames() {
+ return trace;
+ }
+
+ public void setTraceFrames(boolean trace) {
+ this.trace = trace;
+ }
+
+ public AmqpFrameValidator getSentFrameInspector() {
+ return sentFrameInspector;
+ }
+
+ public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+ this.sentFrameInspector = amqpFrameInspector;
+ }
+
+ public AmqpFrameValidator getReceivedFrameInspector() {
+ return receivedFrameInspector;
+ }
+
+ public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+ this.receivedFrameInspector = amqpFrameInspector;
+ }
+
//----- Internal getters used from the child AmqpResource classes --------//
ScheduledExecutorService getScheduler() {
@@ -706,4 +735,4 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public String toString() {
return "AmqpConnection { " + connectionId + " }";
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
new file mode 100644
index 0000000..4796110
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpFrameValidator {
+
+ private boolean valid = true;
+ private String errorMessage;
+
+ public void inspectOpen(Open open, Binary encoded) {
+
+ }
+
+ public void inspectBegin(Begin begin, Binary encoded) {
+
+ }
+
+ public void inspectAttach(Attach attach, Binary encoded) {
+
+ }
+
+ public void inspectFlow(Flow flow, Binary encoded) {
+
+ }
+
+ public void inspectTransfer(Transfer transfer, Binary encoded) {
+
+ }
+
+ public void inspectDisposition(Disposition disposition, Binary encoded) {
+
+ }
+
+ public void inspectDetach(Detach detach, Binary encoded) {
+
+ }
+
+ public void inspectEnd(End end, Binary encoded) {
+
+ }
+
+ public void inspectClose(Close close, Binary encoded) {
+
+ }
+
+ public boolean isValid() {
+ return valid;
+ }
+
+ protected void setValid(boolean valid) {
+ this.valid = valid;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ protected void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ protected void markAsInvalid(String errorMessage) {
+ if (valid) {
+ setValid(false);
+ setErrorMessage(errorMessage);
+ }
+ }
+
+ public void assertValid() {
+ if (!isValid()) {
+ throw new AssertionError(errorMessage);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
new file mode 100644
index 0000000..68fcd85
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracer used to spy on AMQP traffic
+ */
+public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
+
+ private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
+
+ private final AmqpConnection connection;
+
+ public AmqpProtocolTracer(AmqpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void receivedFrame(TransportFrame transportFrame) {
+ if (connection.isTraceFrames()) {
+ TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
+ }
+
+ AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
+ if (inspector != null) {
+ transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+ }
+ }
+
+ @Override
+ public void sentFrame(TransportFrame transportFrame) {
+ if (connection.isTraceFrames()) {
+ TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
+ }
+
+ AmqpFrameValidator inspector = connection.getSentFrameInspector();
+ if (inspector != null) {
+ transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+ }
+ }
+
+ @Override
+ public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
+ context.inspectOpen(open, payload);
+ }
+
+ @Override
+ public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
+ context.inspectBegin(begin, payload);
+ }
+
+ @Override
+ public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
+ context.inspectAttach(attach, payload);
+ }
+
+ @Override
+ public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
+ context.inspectFlow(flow, payload);
+ }
+
+ @Override
+ public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
+ context.inspectTransfer(transfer, payload);
+ }
+
+ @Override
+ public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
+ context.inspectDisposition(disposition, payload);
+ }
+
+ @Override
+ public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
+ context.inspectDetach(detach, payload);
+ }
+
+ @Override
+ public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
+ context.inspectEnd(end, payload);
+ }
+
+ @Override
+ public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
+ context.inspectClose(close, payload);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 936d4ef..fc3fdf7 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,9 @@
*/
package org.apache.activemq.transport.amqp.client;
+import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
@@ -38,6 +40,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
private final AmqpConnection connection;
private final String sessionId;
private final AmqpTransactionContext txContext;
+ private final AtomicBoolean closed = new AtomicBoolean();
/**
* Create a new session instance.
@@ -52,6 +55,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
}
/**
+ * Close the receiver, a closed receiver will throw exceptions if any further send
+ * calls are made.
+ *
+ * @throws IOException if an error occurs while closing the receiver.
+ */
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ final ClientFuture request = new ClientFuture();
+ getScheduler().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ checkClosed();
+ close(request);
+ pumpToProtonTransport(request);
+ }
+ });
+
+ request.sync();
+ }
+ }
+
+ /**
+ * Create an anonymous sender.
+ *
+ * @return a newly created sender that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the sender.
+ */
+ public AmqpSender createSender() throws Exception {
+ return createSender(null, false);
+ }
+
+ /**
* Create a sender instance using the given address
*
* @param address the address to which the sender will produce its messages.
@@ -101,9 +138,21 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target) throws Exception {
+ return createSender(target, getNextSenderId());
+ }
+
+ /**
+ * Create a sender instance using the given Target
+ *
+ * @param target the caller created and configured Traget used to create the sender link.
+ * @param senderId the sender ID to assign to the newly created Sender.
+ * @return a newly created sender that is ready for use.
+ * @throws Exception if an error occurs while creating the receiver.
+ */
+ public AmqpSender createSender(Target target, String senderId) throws Exception {
checkClosed();
- final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
+ final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@@ -222,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
final ClientFuture request = new ClientFuture();
- final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
+ final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId());
connection.getScheduler().execute(new Runnable() {
@@ -465,4 +514,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
throw new IllegalStateException("Session is already closed");
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
index e0c6b6c..86a35a2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -26,14 +26,17 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
import org.slf4j.Logger;
@@ -90,6 +93,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
connection.setContainerId(getContainerID());
connection.connect();
+ connection.setReceivedFrameInspector(new AmqpFrameValidator() {
+
+ @Override
+ public void inspectDetach(Detach detach, Binary encoded) {
+ if (detach.getClosed()) {
+ markAsInvalid("Remote should have detached but closed instead.");
+ }
+ }
+ });
+
+ connection.setSentFrameInspector(new AmqpFrameValidator() {
+
+ @Override
+ public void inspectDetach(Detach detach, Binary encoded) {
+ if (detach.getClosed()) {
+ markAsInvalid("Client should have detached but closed instead.");
+ }
+ }
+ });
+
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
@@ -99,6 +122,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
assertEquals(getTopicName(), lookupSubscription());
+ connection.getSentFrameInspector().assertValid();
+ connection.getReceivedFrameInspector().assertValid();
+
connection.close();
}