You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/05/11 11:35:18 UTC

[1/2] activemq-artemis git commit: ARTEMIS-46 Adds AMQP Drain Support

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 0230a4026 -> 0e0dec664


ARTEMIS-46 Adds AMQP Drain Support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54752a9c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54752a9c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54752a9c

Branch: refs/heads/master
Commit: 54752a9cedcceef9715345730eae4766be2c8458
Parents: 0230a40
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue May 10 14:55:20 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed May 11 12:04:58 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  | 29 +++++++-
 artemis-protocols/artemis-proton-plug/pom.xml   | 17 +++++
 .../proton/plug/AMQPClientReceiverContext.java  |  6 ++
 .../org/proton/plug/AMQPSessionCallback.java    |  2 +-
 .../plug/context/AbstractConnectionContext.java |  3 +-
 .../context/AbstractProtonContextSender.java    |  2 +-
 .../context/AbstractProtonReceiverContext.java  | 16 +++++
 .../plug/context/ProtonDeliveryHandler.java     |  2 +-
 .../plug/context/ProtonTransactionHandler.java  |  2 +-
 .../client/ProtonClientReceiverContext.java     |  3 +-
 .../server/ProtonServerReceiverContext.java     |  2 +-
 .../server/ProtonServerSenderContext.java       |  6 +-
 .../java/org/proton/plug/test/ProtonTest.java   | 12 ++--
 .../test/minimalserver/MinimalSessionSPI.java   |  2 +-
 .../core/server/impl/ServerConsumerImpl.java    | 26 ++++---
 tests/integration-tests/pom.xml                 |  6 ++
 .../tests/integration/proton/ProtonTest.java    | 75 ++++++++++++++++++--
 17 files changed, 176 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 421a382..2350f9d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -17,11 +17,13 @@
 package org.apache.activemq.artemis.core.protocol.proton.plug;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -70,6 +72,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
 
    private final Executor closeExecutor;
 
+   private final AtomicBoolean draining = new AtomicBoolean(false);
+
    public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
                                            ProtonProtocolManager manager,
                                            AMQPConnectionContext connection,
@@ -88,9 +92,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
    }
 
    @Override
-   public void onFlowConsumer(Object consumer, int credits) {
-      // We have our own flow control on AMQP, so we set activemq's flow control to 0
-      ((ServerConsumer) consumer).receiveCredits(-1);
+   public void onFlowConsumer(Object consumer, int credits, final boolean drain) {
+      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
+      if (drain) {
+         // If the draining is already running, then don't do anything
+         if (draining.compareAndSet(false, true)) {
+            final ProtonPlugSender plugSender = (ProtonPlugSender) serverConsumer.getProtocolContext();
+            serverConsumer.forceDelivery(1, new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     plugSender.getSender().drained();
+                  }
+                  finally {
+                     draining.set(false);
+                  }
+               }
+            });
+         }
+      }
+      else {
+         serverConsumer.receiveCredits(-1);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/pom.xml b/artemis-protocols/artemis-proton-plug/pom.xml
index b23e08f..c667e45 100644
--- a/artemis-protocols/artemis-proton-plug/pom.xml
+++ b/artemis-protocols/artemis-proton-plug/pom.xml
@@ -110,6 +110,23 @@
 
    </dependencies>
 
+   <!-- We use the proton plug test classes in some of the Artemis Integration tests -->
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <version>2.6</version>
+            <executions>
+               <execution>
+                  <goals>
+                     <goal>test-jar</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+      </plugins>
+   </build>
 
    <packaging>bundle</packaging>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
index 26d539e..514ee19 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
@@ -25,4 +25,10 @@ public interface AMQPClientReceiverContext {
    ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception;
 
    void flow(int credits);
+
+   void drain(int i);
+
+   int drained();
+
+   boolean isDraining();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index cce8e0c..0c0dbe0 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -32,7 +32,7 @@ public interface AMQPSessionCallback {
 
    void start();
 
-   void onFlowConsumer(Object consumer, int credits);
+   void onFlowConsumer(Object consumer, int credits, boolean drain);
 
    Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index 93442de..262dc2a 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -70,6 +70,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
       connectionCallback.setConnection(this);
       this.handler =   ProtonHandler.Factory.create(dispatchExecutor);
       Transport transport = handler.getTransport();
+      transport.setEmitFlowEventOnSend(false);
       if (idleTimeout > 0) {
          transport.setIdleTimeout(idleTimeout);
       }
@@ -256,7 +257,7 @@ public abstract class AbstractConnectionContext extends ProtonInitializable impl
 
       @Override
       public void onFlow(Link link) throws Exception {
-         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit());
+         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
index 7a4d295..6b209b8 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
@@ -51,7 +51,7 @@ public abstract class AbstractProtonContextSender extends ProtonInitializable im
    }
 
    @Override
-   public void onFlow(int credits) {
+   public void onFlow(int credits, boolean drain) {
       this.creditsSemaphore.setCredits(credits);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 8481853..4286140 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -69,4 +69,20 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
       }
       connection.flush();
    }
+
+
+   public void drain(int credits) {
+      synchronized (connection.getLock()) {
+         receiver.drain(credits);
+      }
+      connection.flush();
+   }
+
+   public int drained() {
+      return receiver.drained();
+   }
+
+   public boolean isDraining() {
+      return receiver.draining();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
index 128ea65..ad7ff4f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
@@ -25,7 +25,7 @@ import org.proton.plug.exceptions.ActiveMQAMQPException;
  */
 public interface ProtonDeliveryHandler {
 
-   void onFlow(int currentCredits);
+   void onFlow(int currentCredits, boolean drain);
 
    void onMessage(Delivery delivery) throws ActiveMQAMQPException;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index 6a9ad6a..1b32b32 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -111,7 +111,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    }
 
    @Override
-   public void onFlow(int credits) {
+   public void onFlow(int credits, boolean drain) {
 
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
index ca8dc98..884af60 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
@@ -46,7 +46,7 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i
    }
 
    @Override
-   public void onFlow(int credits) {
+   public void onFlow(int credits, boolean drain) {
    }
 
    LinkedBlockingDeque<MessageImpl> queues = new LinkedBlockingDeque<>();
@@ -83,4 +83,5 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i
    public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception {
       return queues.poll(time, unit);
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index 0406919..d0f798a 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -46,7 +46,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
    }
 
    @Override
-   public void onFlow(int credits) {
+   public void onFlow(int credits, boolean drain) {
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index dfc69df..ae1caa4 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -65,9 +65,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
    }
 
    @Override
-   public void onFlow(int currentCredits) {
-      super.onFlow(currentCredits);
-      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits);
+   public void onFlow(int currentCredits, boolean drain) {
+      super.onFlow(currentCredits, drain);
+      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
    }
 
    /*

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
index acbb697..4c3aaf4 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
@@ -36,19 +36,19 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.proton.plug.AMQPClientConnectionContext;
 import org.proton.plug.AMQPClientSenderContext;
 import org.proton.plug.AMQPClientSessionContext;
 import org.proton.plug.sasl.ClientSASLPlain;
 import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
 import org.proton.plug.test.minimalserver.DumbServer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.proton.plug.util.ByteUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index c702957..3578926 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -71,7 +71,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
    }
 
    @Override
-   public void onFlowConsumer(Object consumer, int credits) {
+   public void onFlowConsumer(Object consumer, int credits, boolean drain) {
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 859b57d..0224c7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -510,7 +510,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * there are no other messages to be delivered.
     */
    @Override
-   public synchronized void forceDelivery(final long sequence) {
+   public void forceDelivery(final long sequence) {
+      forceDelivery(sequence, new Runnable() {
+         @Override
+         public void run() {
+            ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
+
+            forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+            forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+            callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
+         }
+      });
+   }
+
+   public synchronized void forceDelivery(final long sequence, final Runnable r) {
       promptDelivery();
 
       // JBPAPP-6030 - Using the executor to avoid distributed dead locks
@@ -527,17 +541,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                      messageQueue.getExecutor().execute(new Runnable() {
                         @Override
                         public void run() {
-                           forceDelivery(sequence);
+                           forceDelivery(sequence, r);
                         }
                      });
                   }
                   else {
-                     ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateID(), 50);
-
-                     forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
-                     forcedDeliveryMessage.setAddress(messageQueue.getName());
-
-                     callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
+                     r.run();
                   }
                }
             }
@@ -546,7 +555,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             }
          }
       });
-
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index f0e1d14..38ac4c2 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -124,6 +124,12 @@
       </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-proton-plug</artifactId>
+         <version>${project.version}</version>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-stomp-protocol</artifactId>
          <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54752a9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index efd5a85..9534681 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -40,6 +40,7 @@ import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -50,12 +51,18 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.message.ProtonJMessage;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.AMQPClientReceiverContext;
+import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.test.Constants;
+import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
 
 @RunWith(Parameterized.class)
 public class ProtonTest extends ActiveMQTestBase {
@@ -214,10 +221,8 @@ public class ProtonTest extends ActiveMQTestBase {
    /*
    // Uncomment testLoopBrowser to validate the hunging on the test
    @Test
-   public void testLoopBrowser() throws Throwable
-   {
-      for (int i = 0 ; i < 1000; i++)
-      {
+   public void testLoopBrowser() throws Throwable {
+      for (int i = 0 ; i < 1000; i++) {
          System.out.println("#test " + i);
          testBrowser();
          tearDown();
@@ -230,7 +235,7 @@ public class ProtonTest extends ActiveMQTestBase {
     *
     * @throws Throwable
     */
-   // @Test TODO: re-enable this when we can get a version free of QPID-4901 bug
+   //@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug
    public void testBrowser() throws Throwable {
 
       boolean success = false;
@@ -272,7 +277,7 @@ public class ProtonTest extends ActiveMQTestBase {
                connection.close();
                Assert.assertEquals(getMessageCount(q), numMessages);
             }
-         }, 1000);
+         }, 5000);
 
          if (success) {
             break;
@@ -290,6 +295,64 @@ public class ProtonTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testReceiveImmediate() throws Exception {
+      testReceiveImmediate(1000, 1000);
+   }
+
+   @Test
+   public void testReceiveImmediateMoreCredits() throws Exception {
+      testReceiveImmediate(1000, 100);
+   }
+
+   @Test
+   public void testReceiveImmediateMoreMessages() throws Exception {
+      testReceiveImmediate(100, 1000);
+   }
+
+   public void testReceiveImmediate(int noCredits, int noMessages) throws Exception {
+
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = createQueue(address);
+      MessageProducer p = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage();
+      message.setText("Message temporary");
+      for (int i = 0; i < noMessages; i++) {
+         message.setText("msg:" + i);
+         p.send(message);
+      }
+
+      SimpleAMQPConnector connector = new SimpleAMQPConnector();
+      connector.start();
+      AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+      clientConnection.clientOpen(null);
+
+      AMQPClientSessionContext csession = clientConnection.createClientSession();
+      AMQPClientReceiverContext receiver = csession.createReceiver(address);
+      receiver.drain(noCredits);
+
+      int expectedNumberMessages = noCredits > noMessages ? noMessages : noCredits;
+      for (int i = 0; i < expectedNumberMessages; i++) {
+         ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.SECONDS);
+         Assert.assertNotNull(protonJMessage);
+      }
+      ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS);
+      Assert.assertNull(protonJMessage);
+
+      assertFalse(receiver.isDraining());
+      if (noCredits >= noMessages) {
+         assertEquals(noCredits - noMessages, receiver.drained());
+      }
+      else {
+         assertEquals(0, receiver.drained());
+      }
+   }
+
+
+   @Test
    public void testConnection() throws Exception {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 


[2/2] activemq-artemis git commit: This closes #514 ARTEMIS-46 Adds AMQP Drain Support

Posted by an...@apache.org.
This closes #514 ARTEMIS-46 Adds AMQP Drain Support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0e0dec66
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0e0dec66
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0e0dec66

Branch: refs/heads/master
Commit: 0e0dec66492877b91ed38cfa2f9d4d15bdf501a5
Parents: 0230a40 54752a9
Author: Andy Taylor <an...@gmail.com>
Authored: Wed May 11 12:09:53 2016 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Wed May 11 12:09:53 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  | 29 +++++++-
 artemis-protocols/artemis-proton-plug/pom.xml   | 17 +++++
 .../proton/plug/AMQPClientReceiverContext.java  |  6 ++
 .../org/proton/plug/AMQPSessionCallback.java    |  2 +-
 .../plug/context/AbstractConnectionContext.java |  3 +-
 .../context/AbstractProtonContextSender.java    |  2 +-
 .../context/AbstractProtonReceiverContext.java  | 16 +++++
 .../plug/context/ProtonDeliveryHandler.java     |  2 +-
 .../plug/context/ProtonTransactionHandler.java  |  2 +-
 .../client/ProtonClientReceiverContext.java     |  3 +-
 .../server/ProtonServerReceiverContext.java     |  2 +-
 .../server/ProtonServerSenderContext.java       |  6 +-
 .../java/org/proton/plug/test/ProtonTest.java   | 12 ++--
 .../test/minimalserver/MinimalSessionSPI.java   |  2 +-
 .../core/server/impl/ServerConsumerImpl.java    | 26 ++++---
 tests/integration-tests/pom.xml                 |  6 ++
 .../tests/integration/proton/ProtonTest.java    | 75 ++++++++++++++++++--
 17 files changed, 176 insertions(+), 35 deletions(-)
----------------------------------------------------------------------