You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2018/03/29 12:59:35 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1776 Asynchronous Flow
control on the bridge
Repository: activemq-artemis
Updated Branches:
refs/heads/master 8f9299793 -> 4dbd03e53
ARTEMIS-1776 Asynchronous Flow control on the bridge
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/70bdfe76
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/70bdfe76
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/70bdfe76
Branch: refs/heads/master
Commit: 70bdfe760393a9d7d17ec175ea68ce83819fe83c
Parents: 8f92997
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 28 12:40:06 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 28 19:09:20 2018 -0400
----------------------------------------------------------------------
.../impl/AbstractProducerCreditsImpl.java | 167 +++++++++++++++++++
.../impl/AsynchronousProducerCreditsImpl.java | 76 +++++++++
.../impl/ClientProducerCreditManager.java | 4 +
.../impl/ClientProducerCreditManagerImpl.java | 24 ++-
.../core/client/impl/ClientProducerCredits.java | 3 +
.../client/impl/ClientProducerCreditsImpl.java | 167 +++++--------------
.../client/impl/ClientProducerFlowCallback.java | 24 +++
.../core/impl/ActiveMQSessionContext.java | 4 +-
.../spi/core/remoting/SessionContext.java | 4 +-
.../core/server/ActiveMQServerLogger.java | 4 +
.../core/server/cluster/impl/BridgeImpl.java | 26 ++-
.../integration/cluster/bridge/BridgeTest.java | 2 +-
12 files changed, 373 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
new file mode 100644
index 0000000..25b2d97
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+
+public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {
+
+ protected int pendingCredits;
+
+ private final int windowSize;
+
+ protected volatile boolean closed;
+
+ protected boolean blocked;
+
+ protected final SimpleString address;
+
+ private final ClientSessionInternal session;
+
+ protected int arriving;
+
+ private int refCount;
+
+ protected boolean serverRespondedWithFail;
+
+ protected SessionContext sessionContext;
+
+ public AbstractProducerCreditsImpl(final ClientSessionInternal session,
+ final SimpleString address,
+ final int windowSize) {
+ this.session = session;
+
+ this.address = address;
+
+ this.windowSize = windowSize / 2;
+ }
+
+ @Override
+ public SimpleString getAddress() {
+ return address;
+ }
+
+ @Override
+ public void init(SessionContext sessionContext) {
+ // We initial request twice as many credits as we request in subsequent requests
+ // This allows the producer to keep sending as more arrive, minimising pauses
+ checkCredits(windowSize);
+
+ this.sessionContext = sessionContext;
+
+ this.sessionContext.linkFlowControl(address, this);
+ }
+
+ @Override
+ public void acquireCredits(final int credits) throws ActiveMQException {
+ checkCredits(credits);
+
+ actualAcquire(credits);
+
+ afterAcquired(credits);
+
+ }
+
+ protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
+ // check to see if the blocking mode is FAIL on the server
+ synchronized (this) {
+ pendingCredits -= credits;
+ }
+ }
+
+ protected abstract void actualAcquire(int credits);
+
+ @Override
+ public boolean isBlocked() {
+ return blocked;
+ }
+
+ @Override
+ public void receiveFailCredits(final int credits) {
+ serverRespondedWithFail = true;
+ // receive credits like normal to keep the sender from blocking
+ receiveCredits(credits);
+ }
+
+
+ @Override
+ public void receiveCredits(final int credits) {
+ synchronized (this) {
+ arriving -= credits;
+ }
+ }
+
+
+ @Override
+ public synchronized void reset() {
+ // Any pendingCredits credits from before failover won't arrive, so we re-initialise
+
+ int beforeFailure = pendingCredits;
+
+ pendingCredits = 0;
+ arriving = 0;
+
+ // If we are waiting for more credits than what's configured, then we need to use what we tried before
+ // otherwise the client may starve as the credit will never arrive
+ checkCredits(Math.max(windowSize * 2, beforeFailure));
+ }
+
+ @Override
+ public void close() {
+ // Closing a producer that is blocking should make it return
+ closed = true;
+ }
+
+ @Override
+ public synchronized void incrementRefCount() {
+ refCount++;
+ }
+
+ @Override
+ public synchronized int decrementRefCount() {
+ return --refCount;
+ }
+
+ public abstract int getBalance();
+
+ protected void checkCredits(final int credits) {
+ int needed = Math.max(credits, windowSize);
+
+ int toRequest = -1;
+
+ synchronized (this) {
+ if (getBalance() + arriving < needed) {
+ toRequest = needed - arriving;
+
+ pendingCredits += toRequest;
+ arriving += toRequest;
+ }
+ }
+
+ if (toRequest != -1) {
+ requestCredits(toRequest);
+ }
+ }
+
+ private void requestCredits(final int credits) {
+ session.sendProducerCreditsMessage(credits, address);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
new file mode 100644
index 0000000..a49122f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl {
+
+ int balance;
+
+ final ClientProducerFlowCallback callback;
+
+ public AsynchronousProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize,
+ ClientProducerFlowCallback callback) {
+ super(session, address, windowSize);
+ balance = windowSize;
+ this.callback = callback;
+ }
+
+ @Override
+ protected synchronized void actualAcquire(int credits) {
+ synchronized (this) {
+ balance -= credits;
+ if (balance <= 0) {
+ callback.onCreditsFlow(true, this);
+ }
+ }
+
+ }
+
+ @Override
+ public int getBalance() {
+ return balance;
+ }
+
+ @Override
+ public void receiveCredits(int credits) {
+ synchronized (this) {
+ super.receiveCredits(credits);
+ balance += credits;
+ callback.onCreditsFlow(balance <= 0, this);
+ }
+
+ }
+
+
+ @Override
+ public void receiveFailCredits(final int credits) {
+ super.receiveFailCredits(credits);
+ callback.onCreditsFail(this);
+ }
+
+ @Override
+ public void releaseOutstanding() {
+ synchronized (this) {
+ balance = 0;
+ callback.onCreditsFlow(true, this);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
index 3c10e1a..bb65f72 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
@@ -36,4 +36,8 @@ public interface ClientProducerCreditManager {
int creditsMapSize();
int unReferencedCreditsSize();
+
+ /** This will determine the flow control as asynchronous,
+ * no actual block should happen instead a callback will be sent whenever blockages change */
+ void setCallback(ClientProducerFlowCallback callback);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index 80b255f..cd2db9c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -35,12 +35,22 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
private int windowSize;
+ private ClientProducerFlowCallback callback;
+
public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
this.session = session;
this.windowSize = windowSize;
}
+
+ /** This will determine the flow control as asynchronous,
+ * no actual block should happen instead a callback will be sent whenever blockages change */
+ @Override
+ public void setCallback(ClientProducerFlowCallback callback) {
+ this.callback = callback;
+ }
+
@Override
public synchronized ClientProducerCredits getCredits(final SimpleString address,
final boolean anon,
@@ -56,7 +66,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
if (credits == null) {
// Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ credits = build(address);
needInit = true;
producerCredits.put(address, credits);
@@ -83,6 +93,14 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
}
}
+ private ClientProducerCredits build(SimpleString address) {
+ if (callback != null) {
+ return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);
+ } else {
+ return new ClientProducerCreditsImpl(session, address, windowSize);
+ }
+ }
+
@Override
public synchronized void returnCredits(final SimpleString address) {
ClientProducerCredits credits = producerCredits.get(address);
@@ -210,6 +228,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
public void releaseOutstanding() {
}
+ @Override
+ public SimpleString getAddress() {
+ return SimpleString.toSimpleString("");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index a97df92..321fda5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public interface ClientProducerCredits {
@@ -40,4 +41,6 @@ public interface ClientProducerCredits {
int decrementRefCount();
void releaseOutstanding();
+
+ SimpleString getAddress();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index 75543fd..41a08c9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -19,96 +19,33 @@ package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
-public class ClientProducerCreditsImpl implements ClientProducerCredits {
+public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl {
- private final Semaphore semaphore;
-
- private final int windowSize;
-
- private volatile boolean closed;
-
- private boolean blocked;
-
- private final SimpleString address;
-
- private final ClientSessionInternal session;
-
- private int pendingCredits;
-
- private int arriving;
-
- private int refCount;
-
- private boolean serverRespondedWithFail;
- private SessionContext sessionContext;
-
- public ClientProducerCreditsImpl(final ClientSessionInternal session,
- final SimpleString address,
- final int windowSize) {
- this.session = session;
+ private final Semaphore semaphore;
- this.address = address;
+ public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) {
+ super(session, address, windowSize);
- this.windowSize = windowSize / 2;
// Doesn't need to be fair since session is single threaded
-
semaphore = new Semaphore(0, false);
- }
- @Override
- public void init(SessionContext sessionContext) {
- // We initial request twice as many credits as we request in subsequent requests
- // This allows the producer to keep sending as more arrive, minimising pauses
- checkCredits(windowSize);
-
- this.sessionContext = sessionContext;
-
- this.sessionContext.linkFlowControl(address, this);
}
- @Override
- public void acquireCredits(final int credits) throws ActiveMQException {
- checkCredits(credits);
-
- boolean tryAcquire;
-
- synchronized (this) {
- tryAcquire = semaphore.tryAcquire(credits);
- }
-
- if (!tryAcquire) {
- if (!closed) {
- this.blocked = true;
- try {
- while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
- // I'm using string concatenation here in case address is null
- // better getting a "null" string than a NPE
- ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
- }
- } catch (InterruptedException interrupted) {
- Thread.currentThread().interrupt();
- throw new ActiveMQInterruptedException(interrupted);
- } finally {
- this.blocked = false;
- }
- }
- }
-
- synchronized (this) {
- pendingCredits -= credits;
- }
+ @Override
+ protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
// check to see if the blocking mode is FAIL on the server
synchronized (this) {
+ super.afterAcquired(credits);
+
if (serverRespondedWithFail) {
serverRespondedWithFail = false;
@@ -123,29 +60,30 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
}
@Override
- public boolean isBlocked() {
- return blocked;
- }
-
- public int getBalance() {
- return semaphore.availablePermits();
- }
+ protected void actualAcquire(int credits) {
- @Override
- public void receiveCredits(final int credits) {
+ boolean tryAcquire;
synchronized (this) {
- arriving -= credits;
+ tryAcquire = semaphore.tryAcquire(credits);
}
- semaphore.release(credits);
+ if (!tryAcquire && !closed) {
+ this.blocked = true;
+ try {
+ while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
+ // I'm using string concatenation here in case address is null
+ // better getting a "null" string than a NPE
+ ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
+ }
+ } catch (InterruptedException interrupted) {
+ Thread.currentThread().interrupt();
+ throw new ActiveMQInterruptedException(interrupted);
+ } finally {
+ this.blocked = false;
+ }
+ }
}
- @Override
- public void receiveFailCredits(final int credits) {
- serverRespondedWithFail = true;
- // receive credits like normal to keep the sender from blocking
- receiveCredits(credits);
- }
@Override
public synchronized void reset() {
@@ -153,59 +91,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
semaphore.drainPermits();
- int beforeFailure = pendingCredits;
-
- pendingCredits = 0;
- arriving = 0;
-
- // If we are waiting for more credits than what's configured, then we need to use what we tried before
- // otherwise the client may starve as the credit will never arrive
- checkCredits(Math.max(windowSize * 2, beforeFailure));
+ super.reset();
}
+
@Override
public void close() {
- // Closing a producer that is blocking should make it return
- closed = true;
+ super.close();
+ // Closing a producer that is blocking should make it return
semaphore.release(Integer.MAX_VALUE / 2);
}
@Override
- public synchronized void incrementRefCount() {
- refCount++;
- }
+ public void receiveCredits(final int credits) {
+ synchronized (this) {
+ super.receiveCredits(credits);
+ }
- @Override
- public synchronized int decrementRefCount() {
- return --refCount;
+ semaphore.release(credits);
}
+
@Override
public synchronized void releaseOutstanding() {
semaphore.drainPermits();
}
- private void checkCredits(final int credits) {
- int needed = Math.max(credits, windowSize);
-
- int toRequest = -1;
-
- synchronized (this) {
- if (semaphore.availablePermits() + arriving < needed) {
- toRequest = needed - arriving;
-
- pendingCredits += toRequest;
- arriving += toRequest;
- }
- }
-
- if (toRequest != -1) {
- requestCredits(toRequest);
- }
+ @Override
+ public int getBalance() {
+ return semaphore.availablePermits();
}
- private void requestCredits(final int credits) {
- session.sendProducerCreditsMessage(credits, address);
- }
+
}
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
new file mode 100644
index 0000000..1fc289d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+public interface ClientProducerFlowCallback {
+ void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits);
+
+ void onCreditsFail(ClientProducerCredits credits);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 6037925..f4033ec 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -234,7 +234,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) {
+ public void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits) {
// nothing to be done here... Flow control here is done on the core side
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 058e606..6571335 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -34,7 +34,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.IDGenerator;
@@ -342,7 +342,7 @@ public abstract class SessionContext {
public abstract void cleanup();
- public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
+ public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits);
public abstract boolean isWritable(ReadyListener callback);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 353403f..c3f0a7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1585,6 +1585,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT)
void journalMessageAckMissingQueueInPreparedTX(Long queueID);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
+ void bridgeAddressFull(String addressName, String bridgeName);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 9815d37..01596fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
@@ -70,7 +72,7 @@ import org.jboss.logging.Logger;
* A Core BridgeImpl
*/
-public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
+public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
// Constants -----------------------------------------------------
private static final Logger logger = Logger.getLogger(BridgeImpl.class);
@@ -119,6 +121,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final long maxRetryInterval;
+ private boolean blockedOnFlowControl;
+
/**
* Used when there's a scheduled reconnection
*/
@@ -254,6 +258,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
@Override
+ public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) {
+ this.blockedOnFlowControl = blocked;
+ if (!blocked) {
+ queue.deliverAsync();
+ }
+ }
+
+ @Override
+ public void onCreditsFail(ClientProducerCredits producerCredits) {
+ ActiveMQServerLogger.LOGGER.bridgeAddressFull("" + producerCredits.getAddress(), "" + this.getName());
+ disconnect();
+ }
+
+ @Override
public long sequentialID() {
return sequentialID;
}
@@ -541,6 +559,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
queue.deliverAsync();
}
+
@Override
public HandleStatus handle(final MessageReference ref) throws Exception {
if (filter != null && !filter.match(ref.getMessage())) {
@@ -555,6 +574,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return HandleStatus.BUSY;
}
+ if (blockedOnFlowControl) {
+ return HandleStatus.BUSY;
+ }
+
if (deliveringLargeMessage) {
return HandleStatus.BUSY;
}
@@ -876,6 +899,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
// Session is pre-acknowledge
session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
+ session.getProducerCreditManager().setCallback(this);
sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 219aa15..ae60a61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -1187,7 +1187,7 @@ public class BridgeTest extends ActiveMQTestBase {
ArrayList<String> staticConnectors = new ArrayList<>();
staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors);
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
[2/2] activemq-artemis git commit: This closes #1979
Posted by ni...@apache.org.
This closes #1979
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4dbd03e5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4dbd03e5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4dbd03e5
Branch: refs/heads/master
Commit: 4dbd03e53b1358c94ba139b9275da6cc55bb5d60
Parents: 8f92997 70bdfe7
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Mar 29 14:55:25 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Thu Mar 29 14:55:25 2018 +0200
----------------------------------------------------------------------
.../impl/AbstractProducerCreditsImpl.java | 167 +++++++++++++++++++
.../impl/AsynchronousProducerCreditsImpl.java | 76 +++++++++
.../impl/ClientProducerCreditManager.java | 4 +
.../impl/ClientProducerCreditManagerImpl.java | 24 ++-
.../core/client/impl/ClientProducerCredits.java | 3 +
.../client/impl/ClientProducerCreditsImpl.java | 167 +++++--------------
.../client/impl/ClientProducerFlowCallback.java | 24 +++
.../core/impl/ActiveMQSessionContext.java | 4 +-
.../spi/core/remoting/SessionContext.java | 4 +-
.../core/server/ActiveMQServerLogger.java | 4 +
.../core/server/cluster/impl/BridgeImpl.java | 26 ++-
.../integration/cluster/bridge/BridgeTest.java | 2 +-
12 files changed, 373 insertions(+), 132 deletions(-)
----------------------------------------------------------------------