You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/26 19:29:30 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6485

Repository: activemq
Updated Branches:
  refs/heads/master b34336cc0 -> cec3245a9


https://issues.apache.org/jira/browse/AMQ-6485

Allow for inspection of Deliveries on Receivers and Delivery updates on
Senders. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cec3245a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cec3245a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cec3245a

Branch: refs/heads/master
Commit: cec3245a9fd50ad1850bdf57d3935d42886cefe2
Parents: b34336c
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 26 15:17:24 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 26 15:17:24 2016 -0400

----------------------------------------------------------------------
 .../amqp/client/AmqpAbstractResource.java       |  1 -
 .../transport/amqp/client/AmqpReceiver.java     | 12 +++++++++-
 .../transport/amqp/client/AmqpSender.java       | 12 +++++++++-
 .../transport/amqp/client/AmqpValidator.java    |  9 ++++++++
 .../transport/amqp/interop/AmqpSenderTest.java  | 24 +++++++++++++++++++-
 5 files changed, 54 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cec3245a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index e17a0c9..d62178a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -297,7 +297,6 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
         return new IOException("Open failed unexpectedly.");
     }
 
-    // TODO - Fina a more generic way to do this.
     protected abstract void doOpenInspection();
     protected abstract void doClosedInspection();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/cec3245a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index d2b859a..fbe36ff 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -856,6 +856,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     private void processDelivery(Delivery incoming) throws Exception {
+        doDeliveryInspection(incoming);
+
         Message message = null;
         try {
             message = decodeIncomingMessage(incoming);
@@ -878,6 +880,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
         }
     }
 
+    private void doDeliveryInspection(Delivery delivery) {
+        try {
+            getStateInspector().inspectDelivery(getReceiver(), delivery);
+        } catch (Throwable error) {
+            getStateInspector().markAsInvalid(error.getMessage());
+        }
+    }
+
     @Override
     public void processFlowUpdates(AmqpConnection connection) throws IOException {
         if (pullRequest != null || stopRequest != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cec3245a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index dd3a371..3850ec7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -322,6 +322,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         }
     }
 
+    protected void doDeliveryUpdateInspection(Delivery delivery) {
+        try {
+            getStateInspector().inspectDeliveryUpdate(getSender(), delivery);
+        } catch (Throwable error) {
+            getStateInspector().markAsInvalid(error.getMessage());
+        }
+    }
+
     @Override
     protected Exception getOpenAbortException() {
         // Verify the attach response contained a non-null target
@@ -408,6 +416,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
                 continue;
             }
 
+            doDeliveryUpdateInspection(delivery);
+
             Outcome outcome = null;
             if (state instanceof TransactionalState) {
                 LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);

http://git-wip-us.apache.org/repos/asf/activemq/blob/cec3245a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
index c8a20ac..a8fd04a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
@@ -71,6 +72,14 @@ public class AmqpValidator {
 
     }
 
+    public void inspectDelivery(Receiver receiver, Delivery delivery) {
+
+    }
+
+    public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
+
+    }
+
     public boolean isValid() {
         return errorMessage.get() != null;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cec3245a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
index 66e1efc..3c3b75d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
@@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -28,7 +31,10 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.activemq.util.Wait;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
 import org.junit.Test;
 
 /**
@@ -123,10 +129,23 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
     public void testUnsettledSender() throws Exception {
         final int MSG_COUNT = 1000;
 
+        final CountDownLatch settled = new CountDownLatch(MSG_COUNT);
+
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
-        AmqpSession session = connection.createSession();
 
+        connection.setStateInspector(new AmqpValidator() {
+
+            @Override
+            public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
+                if (delivery.remotelySettled()) {
+                    LOG.trace("Remote settled message for sender: {}", sender.getName());
+                    settled.countDown();
+                }
+            }
+        });
+
+        AmqpSession session = connection.createSession();
         AmqpSender sender = session.createSender("topic://" + getTestName(), false);
 
         for (int i = 1; i <= MSG_COUNT; ++i) {
@@ -149,6 +168,9 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
         }));
 
         sender.close();
+
+        assertTrue("Remote should have settled all deliveries", settled.await(5, TimeUnit.MINUTES));
+
         connection.close();
     }