You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/11 19:08:13 UTC
[5/9] activemq-artemis git commit: ARTEMIS-1898 Proper fix on
AtomicRunnables and avoiding leaks
ARTEMIS-1898 Proper fix on AtomicRunnables and avoiding leaks
GlobalDiskFullTest was broken before this fix.
Basically when using multiple addresses over a session you would miss flow credits on all your producers except to the first one
that ran out of credit.
(cherry picked from commit fceb9ea7182a801ba91e6d0db0ca01c730a1b6c9)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dddff16b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dddff16b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dddff16b
Branch: refs/heads/2.6.x
Commit: dddff16bfbdefa5ff7a6462e58ad92fec7187f4b
Parents: 5e415a8
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 10 18:09:28 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Oct 11 15:02:17 2018 -0400
----------------------------------------------------------------------
.../artemis/utils/runnables/AtomicRunnable.java | 18 +++++-
.../amqp/broker/AMQPSessionCallback.java | 47 ++-------------
.../proton/ProtonServerReceiverContext.java | 61 ++++++++++++++------
.../amqp/broker/AMQPSessionCallbackTest.java | 13 +++--
4 files changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dddff16b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
index f1f53ce..9ec5e8f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
@@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public abstract class AtomicRunnable implements Runnable {
- public static Runnable checkAtomic(Runnable run) {
+ public static AtomicRunnable checkAtomic(Runnable run) {
if (run instanceof AtomicRunnable) {
- return run;
+ return (AtomicRunnable)run;
} else {
return new AtomicRunnableWithDelegate(run);
}
@@ -35,6 +35,20 @@ public abstract class AtomicRunnable implements Runnable {
private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
+ public AtomicRunnable reset() {
+ RAN_UPDATE.set(this, 0);
+ return this;
+ }
+
+ public AtomicRunnable setRan() {
+ RAN_UPDATE.set(this, 1);
+ return this;
+ }
+
+ public boolean isRun() {
+ return RAN_UPDATE.get(this) == 1;
+ }
+
@Override
public void run() {
if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dddff16b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7fef3db..3d8ae5a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -109,8 +109,6 @@ public class AMQPSessionCallback implements SessionCallback {
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
- private CreditRunnable creditRunnable;
-
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@@ -577,49 +575,17 @@ public class AMQPSessionCallback implements SessionCallback {
});
}
- public void offerProducerCredit(final SimpleString address,
- final int credits,
- final int threshold,
- final Receiver receiver) {
+ /** Will execute a Runnable on an Address when there's space in memory*/
+ public void flow(final SimpleString address,
+ Runnable runnable) {
try {
- /*
- * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
- * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
- * may cause a memory leak, one is enough.
- * */
- if (creditRunnable != null && !creditRunnable.isRun())
- return;
PagingManager pagingManager = manager.getServer().getPagingManager();
- creditRunnable = new CreditRunnable() {
- boolean isRun = false;
- @Override
- public boolean isRun() {
- return isRun;
- }
-
- @Override
- public void run() {
- connection.lock();
- try {
- if (receiver.getCredit() <= threshold) {
- int topUp = credits - receiver.getCredit();
- if (topUp > 0) {
- receiver.flow(topUp);
- }
- }
- } finally {
- isRun = true;
- connection.unlock();
- }
- connection.flush();
- }
- };
if (address == null) {
- pagingManager.checkMemory(creditRunnable);
+ pagingManager.checkMemory(runnable);
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
- store.checkMemory(creditRunnable);
+ store.checkMemory(runnable);
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -791,7 +757,4 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- interface CreditRunnable extends Runnable {
- boolean isRun();
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dddff16b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 0f0e9d5..0758714 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -60,6 +61,35 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final AMQPSessionCallback sessionSPI;
+ /** We create this AtomicRunnable with setRan.
+ * This is because we always reuse the same instance.
+ * In case the creditRunnable was run, we reset and send it over.
+ * We set it as ran as the first one should always go through */
+ protected final AtomicRunnable creditRunnable;
+
+
+ /** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
+ public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
+ return new AtomicRunnable() {
+ @Override
+ public void atomicRun() {
+ connection.lock();
+ try {
+ if (receiver.getCredit() <= threshold) {
+ int topUp = refill - receiver.getCredit();
+ if (topUp > 0) {
+ receiver.flow(topUp);
+ }
+ }
+ } finally {
+ connection.unlock();
+ }
+ connection.flush();
+ }
+ };
+ }
+
+
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
@@ -68,7 +98,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
private final int minCreditRefresh;
- private TerminusExpiryPolicy expiryPolicy;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
@@ -80,11 +109,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.sessionSPI = sessionSPI;
this.amqpCredits = connection.getAmqpCredits();
this.minCreditRefresh = connection.getAmqpLowCredits();
+ this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
}
@Override
public void onFlow(int credits, boolean drain) {
- flow(Math.min(credits, amqpCredits), amqpCredits);
+ flow();
}
@Override
@@ -116,7 +146,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
- expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
target.setAddress(address.toString());
} else {
// the target will have an address unless the remote is requesting an anonymous
@@ -182,7 +211,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
}
- flow(amqpCredits, minCreditRefresh);
+ flow();
}
public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
@@ -245,7 +274,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
- flow(amqpCredits, minCreditRefresh);
+ flow();
} catch (Exception e) {
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
@@ -262,7 +291,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
delivery.disposition(rejected);
delivery.settle();
- flow(amqpCredits, minCreditRefresh);
+ flow();
}
}
@@ -285,20 +314,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
close(false);
}
- public void flow(int credits, int threshold) {
+ public void flow() {
+ if (!creditRunnable.isRun()) {
+ return; // nothing to be done as the previous one did not run yet
+ }
+
+ creditRunnable.reset();
+
// Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
- if (receiver.getCredit() <= threshold) {
- sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
- }
+ sessionSPI.flow(address, creditRunnable);
} else {
- connection.lock();
- try {
- receiver.flow(credits);
- } finally {
- connection.unlock();
- }
- connection.flush();
+ creditRunnable.run();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dddff16b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index e86e960..30814a9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Rule;
@@ -74,7 +75,7 @@ public class AMQPSessionCallbackTest {
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
- session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+ session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
@@ -105,7 +106,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
- session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+ session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
@@ -137,7 +138,7 @@ public class AMQPSessionCallbackTest {
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
- session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+ session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
@@ -169,7 +170,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
- session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
+ session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
@@ -200,7 +201,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
- session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+ session.flow(null, ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
@@ -232,7 +233,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
- session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
+ session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());