You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/24 19:39:26 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6479
Repository: activemq
Updated Branches:
refs/heads/master 566075309 -> 24a79413c
https://issues.apache.org/jira/browse/AMQ-6479
Allow a unit test to inspect AMQP frames as part of the test.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/24a79413
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/24a79413
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/24a79413
Branch: refs/heads/master
Commit: 24a79413c66013818795360cf3ecc4489d1e4d5f
Parents: 5660753
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 24 15:39:01 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 24 15:39:13 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpConnection.java | 39 +++----
.../amqp/client/AmqpFrameValidator.java | 103 ++++++++++++++++
.../amqp/client/AmqpProtocolTracer.java | 116 +++++++++++++++++++
.../transport/amqp/client/AmqpSession.java | 36 +++++-
.../amqp/interop/AmqpDurableReceiverTest.java | 26 +++++
5 files changed, 298 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 3328044..c6c994d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -49,9 +49,7 @@ import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.apache.qpid.proton.framing.TransportFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +60,6 @@ import io.netty.util.ReferenceCountUtil;
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
- private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES");
private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
@@ -92,6 +89,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
+ private volatile AmqpFrameValidator sentFrameInspector;
+ private volatile AmqpFrameValidator receivedFrameInspector;
private AmqpConnectionListener listener;
private SaslAuthenticator authenticator;
private String mechanismRestriction;
@@ -159,7 +158,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
sasl.client();
}
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
- updateTracer();
+ ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this));
open(future);
pumpToProtonTransport(future);
@@ -454,6 +453,22 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.trace = trace;
}
+ public AmqpFrameValidator getSentFrameInspector() {
+ return sentFrameInspector;
+ }
+
+ public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+ this.sentFrameInspector = amqpFrameInspector;
+ }
+
+ public AmqpFrameValidator getReceivedFrameInspector() {
+ return receivedFrameInspector;
+ }
+
+ public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) {
+ this.receivedFrameInspector = amqpFrameInspector;
+ }
+
//----- Internal getters used from the child AmqpResource classes --------//
ScheduledExecutorService getScheduler() {
@@ -718,22 +733,6 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return containerId;
}
- private void updateTracer() {
- if (isTraceFrames()) {
- ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
- @Override
- public void receivedFrame(TransportFrame transportFrame) {
- TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody());
- }
-
- @Override
- public void sentFrame(TransportFrame transportFrame) {
- TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody());
- }
- });
- }
- }
-
@Override
public String toString() {
return "AmqpConnection { " + connectionId + " }";
http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
new file mode 100644
index 0000000..7588572
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+
+/**
+ * Abstract base for a validation hook that is used in tests to check
+ * the state of a remote resource after a variety of lifecycle events.
+ */
+public class AmqpFrameValidator {
+
+ private boolean valid = true;
+ private String errorMessage;
+
+ public void inspectOpen(Open open, Binary encoded) {
+
+ }
+
+ public void inspectBegin(Begin begin, Binary encoded) {
+
+ }
+
+ public void inspectAttach(Attach attach, Binary encoded) {
+
+ }
+
+ public void inspectFlow(Flow flow, Binary encoded) {
+
+ }
+
+ public void inspectTransfer(Transfer transfer, Binary encoded) {
+
+ }
+
+ public void inspectDisposition(Disposition disposition, Binary encoded) {
+
+ }
+
+ public void inspectDetach(Detach detach, Binary encoded) {
+
+ }
+
+ public void inspectEnd(End end, Binary encoded) {
+
+ }
+
+ public void inspectClose(Close close, Binary encoded) {
+
+ }
+
+ public boolean isValid() {
+ return valid;
+ }
+
+ protected void setValid(boolean valid) {
+ this.valid = valid;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ protected void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ protected void markAsInvalid(String errorMessage) {
+ if (valid) {
+ setValid(false);
+ setErrorMessage(errorMessage);
+ }
+ }
+
+ public void assertValid() {
+ if (!isValid()) {
+ throw new AssertionError(errorMessage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
new file mode 100644
index 0000000..efda4d2
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.Attach;
+import org.apache.qpid.proton.amqp.transport.Begin;
+import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.End;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler;
+import org.apache.qpid.proton.amqp.transport.Open;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracer used to spy on AMQP traffic
+ */
+public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> {
+
+ private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES");
+
+ private final AmqpConnection connection;
+
+ public AmqpProtocolTracer(AmqpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void receivedFrame(TransportFrame transportFrame) {
+ if (connection.isTraceFrames()) {
+ TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody());
+ }
+
+ AmqpFrameValidator inspector = connection.getReceivedFrameInspector();
+ if (inspector != null) {
+ transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+ }
+ }
+
+ @Override
+ public void sentFrame(TransportFrame transportFrame) {
+ if (connection.isTraceFrames()) {
+ TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody());
+ }
+
+ AmqpFrameValidator inspector = connection.getSentFrameInspector();
+ if (inspector != null) {
+ transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector);
+ }
+ }
+
+ @Override
+ public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) {
+ context.inspectOpen(open, payload);
+ }
+
+ @Override
+ public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) {
+ context.inspectBegin(begin, payload);
+ }
+
+ @Override
+ public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) {
+ context.inspectAttach(attach, payload);
+ }
+
+ @Override
+ public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) {
+ context.inspectFlow(flow, payload);
+ }
+
+ @Override
+ public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) {
+ context.inspectTransfer(transfer, payload);
+ }
+
+ @Override
+ public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) {
+ context.inspectDisposition(disposition, payload);
+ }
+
+ @Override
+ public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) {
+ context.inspectDetach(detach, payload);
+ }
+
+ @Override
+ public void handleEnd(End end, Binary payload, AmqpFrameValidator context) {
+ context.inspectEnd(end, payload);
+ }
+
+ @Override
+ public void handleClose(Close close, Binary payload, AmqpFrameValidator context) {
+ context.inspectClose(close, payload);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 7cb745c..b239dae 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -143,16 +143,32 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* Create a sender instance using the given Target
*
* @param target
- * the caller created and configured Traget used to create the sender link.
+ * the caller created and configured Target used to create the sender link.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpSender createSender(Target target) throws Exception {
+ return createSender(target, getNextSenderId());
+ }
+
+ /**
+ * Create a sender instance using the given Target
+ *
+ * @param target
+ * the caller created and configured Target used to create the sender link.
+ * @param sender
+ * the sender ID to assign to the newly created Sender.
+ *
+ * @return a newly created sender that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the receiver.
+ */
+ public AmqpSender createSender(Target target, String senderId) throws Exception {
checkClosed();
- final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId());
+ final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@@ -274,6 +290,22 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source) throws Exception {
+ return createReceiver(source, getNextReceiverId());
+ }
+
+ /**
+ * Create a receiver instance using the given Source
+ *
+ * @param source
+ * the caller created and configured Source used to create the receiver link.
+ * @param receivedId
+ * the ID value to assign to the newly created receiver
+ *
+ * @return a newly created receiver that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the receiver.
+ */
+ public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
http://git-wip-us.apache.org/repos/asf/activemq/blob/24a79413/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index 3db3301..c5636db 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -29,12 +29,15 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpFrameValidator;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
@@ -81,6 +84,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
connection.setContainerId(getTestName());
connection.connect();
+ connection.setReceivedFrameInspector(new AmqpFrameValidator() {
+
+ @Override
+ public void inspectDetach(Detach detach, Binary encoded) {
+ if (detach.getClosed()) {
+ markAsInvalid("Remote should have detached but closed instead.");
+ }
+ }
+ });
+
+ connection.setSentFrameInspector(new AmqpFrameValidator() {
+
+ @Override
+ public void inspectDetach(Detach detach, Binary encoded) {
+ if (detach.getClosed()) {
+ markAsInvalid("Client should have detached but closed instead.");
+ }
+ }
+ });
+
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
@@ -94,6 +117,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
assertEquals(0, brokerView.getDurableTopicSubscribers().length);
assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+ connection.getSentFrameInspector().assertValid();
+ connection.getReceivedFrameInspector().assertValid();
+
connection.close();
}