You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/10 22:39:26 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1898 Proper fix on AtomicRunnables and avoiding leaks

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2b3819bc1 -> a65374fe2


ARTEMIS-1898 Proper fix on AtomicRunnables and avoiding leaks

GlobalDiskFullTest was broken before this fix.
Basically when using multiple addresses over a session you would miss flow credits on all your producers except to the first one
that ran out of credit.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fceb9ea7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fceb9ea7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fceb9ea7

Branch: refs/heads/master
Commit: fceb9ea7182a801ba91e6d0db0ca01c730a1b6c9
Parents: 2b3819b
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 10 18:09:28 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 10 18:32:58 2018 -0400

----------------------------------------------------------------------
 .../artemis/utils/runnables/AtomicRunnable.java | 18 +++++-
 .../amqp/broker/AMQPSessionCallback.java        | 47 ++-------------
 .../proton/ProtonServerReceiverContext.java     | 61 ++++++++++++++------
 .../amqp/broker/AMQPSessionCallbackTest.java    | 13 +++--
 4 files changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fceb9ea7/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
index f1f53ce..9ec5e8f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
@@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 public abstract class AtomicRunnable implements Runnable {
 
 
-   public static Runnable checkAtomic(Runnable run) {
+   public static AtomicRunnable checkAtomic(Runnable run) {
       if (run instanceof AtomicRunnable) {
-         return run;
+         return (AtomicRunnable)run;
       } else {
          return new AtomicRunnableWithDelegate(run);
       }
@@ -35,6 +35,20 @@ public abstract class AtomicRunnable implements Runnable {
    private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
       AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
 
+   public AtomicRunnable reset() {
+      RAN_UPDATE.set(this, 0);
+      return this;
+   }
+
+   public AtomicRunnable setRan() {
+      RAN_UPDATE.set(this, 1);
+      return this;
+   }
+
+   public boolean isRun() {
+      return RAN_UPDATE.get(this) == 1;
+   }
+
    @Override
    public void run() {
       if (RAN_UPDATE.compareAndSet(this, 0, 1)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fceb9ea7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7fef3db..3d8ae5a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -109,8 +109,6 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
 
-   private CreditRunnable creditRunnable;
-
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -577,49 +575,17 @@ public class AMQPSessionCallback implements SessionCallback {
       });
    }
 
-   public void offerProducerCredit(final SimpleString address,
-                                   final int credits,
-                                   final int threshold,
-                                   final Receiver receiver) {
+   /** Will execute a Runnable on an Address when there's space in memory*/
+   public void flow(final SimpleString address,
+                    Runnable runnable) {
       try {
-         /*
-         * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
-         * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
-         * may cause a memory leak, one is enough.
-         * */
-         if (creditRunnable != null && !creditRunnable.isRun())
-            return;
          PagingManager pagingManager = manager.getServer().getPagingManager();
-         creditRunnable = new CreditRunnable() {
-            boolean isRun = false;
-            @Override
-            public boolean isRun() {
-               return isRun;
-            }
-
-            @Override
-            public void run() {
-               connection.lock();
-               try {
-                  if (receiver.getCredit() <= threshold) {
-                     int topUp = credits - receiver.getCredit();
-                     if (topUp > 0) {
-                        receiver.flow(topUp);
-                     }
-                  }
-               } finally {
-                  isRun = true;
-                  connection.unlock();
-               }
-               connection.flush();
-            }
-         };
 
          if (address == null) {
-            pagingManager.checkMemory(creditRunnable);
+            pagingManager.checkMemory(runnable);
          } else {
             final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
-            store.checkMemory(creditRunnable);
+            store.checkMemory(runnable);
          }
       } catch (Exception e) {
          throw new RuntimeException(e);
@@ -791,7 +757,4 @@ public class AMQPSessionCallback implements SessionCallback {
       }
 
    }
-   interface CreditRunnable extends Runnable {
-      boolean isRun();
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fceb9ea7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0f0e9d5..0758714 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -60,6 +61,35 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    protected final AMQPSessionCallback sessionSPI;
 
+   /** We create this AtomicRunnable with setRan.
+    *  This is because we always reuse the same instance.
+    *  In case the creditRunnable was run, we reset and send it over.
+    *  We set it as ran as the first one should always go through */
+   protected final AtomicRunnable creditRunnable;
+
+
+   /** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
+   public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
+      return new AtomicRunnable() {
+         @Override
+         public void atomicRun() {
+            connection.lock();
+            try {
+               if (receiver.getCredit() <= threshold) {
+                  int topUp = refill - receiver.getCredit();
+                  if (topUp > 0) {
+                     receiver.flow(topUp);
+                  }
+               }
+            } finally {
+               connection.unlock();
+            }
+            connection.flush();
+         }
+      };
+   }
+
+
    /*
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
@@ -68,7 +98,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
    // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
    private final int minCreditRefresh;
-   private TerminusExpiryPolicy expiryPolicy;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
                                       AMQPConnectionContext connection,
@@ -80,11 +109,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       this.sessionSPI = sessionSPI;
       this.amqpCredits = connection.getAmqpCredits();
       this.minCreditRefresh = connection.getAmqpLowCredits();
+      this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
    }
 
    @Override
    public void onFlow(int credits, boolean drain) {
-      flow(Math.min(credits, amqpCredits), amqpCredits);
+      flow();
    }
 
    @Override
@@ -116,7 +146,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             } catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
-            expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
             target.setAddress(address.toString());
          } else {
             // the target will have an address unless the remote is requesting an anonymous
@@ -182,7 +211,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             }
          }
       }
-      flow(amqpCredits, minCreditRefresh);
+      flow();
    }
 
    public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
@@ -245,7 +274,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
          sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
 
-         flow(amqpCredits, minCreditRefresh);
+         flow();
       } catch (Exception e) {
          log.warn(e.getMessage(), e);
          Rejected rejected = new Rejected();
@@ -262,7 +291,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
          delivery.disposition(rejected);
          delivery.settle();
-         flow(amqpCredits, minCreditRefresh);
+         flow();
       }
    }
 
@@ -285,20 +314,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       close(false);
    }
 
-   public void flow(int credits, int threshold) {
+   public void flow() {
+      if (!creditRunnable.isRun()) {
+         return; // nothing to be done as the previous one did not run yet
+      }
+
+      creditRunnable.reset();
+
       // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
       if (sessionSPI != null) {
-         if (receiver.getCredit() <= threshold) {
-            sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
-         }
+         sessionSPI.flow(address, creditRunnable);
       } else {
-         connection.lock();
-         try {
-            receiver.flow(credits);
-         } finally {
-            connection.unlock();
-         }
-         connection.flush();
+         creditRunnable.run();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fceb9ea7/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index e86e960..30814a9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.qpid.proton.engine.Receiver;
 import org.junit.Rule;
@@ -74,7 +75,7 @@ public class AMQPSessionCallbackTest {
       // Credit is above threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
 
-      session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+      session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
       Mockito.verify(pagingManager).checkMemory(argument.capture());
@@ -105,7 +106,7 @@ public class AMQPSessionCallbackTest {
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
 
-      session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+      session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
       Mockito.verify(pagingManager).checkMemory(argument.capture());
@@ -137,7 +138,7 @@ public class AMQPSessionCallbackTest {
       // Credit is above threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
 
-      session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+      session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
       Mockito.verify(pagingStore).checkMemory(argument.capture());
@@ -169,7 +170,7 @@ public class AMQPSessionCallbackTest {
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
 
-      session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+      session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
       Mockito.verify(pagingStore).checkMemory(argument.capture());
@@ -200,7 +201,7 @@ public class AMQPSessionCallbackTest {
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
 
-      session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+      session.flow(null, ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
       Mockito.verify(pagingManager).checkMemory(argument.capture());
@@ -232,7 +233,7 @@ public class AMQPSessionCallbackTest {
       // Credit is at threshold
       Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
 
-      session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+      session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
       Mockito.verify(pagingStore).checkMemory(argument.capture());


[2/2] activemq-artemis git commit: This closes #2364

Posted by cl...@apache.org.
This closes #2364


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a65374fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a65374fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a65374fe

Branch: refs/heads/master
Commit: a65374fe28dc6a520c5eb1f08ba5ede567c06e3f
Parents: 2b3819b fceb9ea
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 10 18:39:18 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 10 18:39:18 2018 -0400

----------------------------------------------------------------------
 .../artemis/utils/runnables/AtomicRunnable.java | 18 +++++-
 .../amqp/broker/AMQPSessionCallback.java        | 47 ++-------------
 .../proton/ProtonServerReceiverContext.java     | 61 ++++++++++++++------
 .../amqp/broker/AMQPSessionCallbackTest.java    | 13 +++--
 4 files changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------