You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/17 14:58:41 UTC
[2/4] activemq-artemis git commit: ARTEMIS-1776 Asynchronous Flow
control on the bridge
ARTEMIS-1776 Asynchronous Flow control on the bridge
Cherry-picked from 70bdfe760393a9d7d17ec175ea68ce83819fe83c
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2c73bd04
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2c73bd04
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2c73bd04
Branch: refs/heads/1.x
Commit: 2c73bd0462b3818d49ce7c4557e1b48587707cb1
Parents: 1e6e1f7
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 28 12:40:06 2018 -0400
Committer: JiriOndrusek <jo...@redhat.com>
Committed: Tue Apr 10 16:44:07 2018 +0200
----------------------------------------------------------------------
.../impl/AbstractProducerCreditsImpl.java | 167 ++++++++++++++
.../impl/AsynchronousProducerCreditsImpl.java | 76 +++++++
.../impl/ClientProducerCreditManager.java | 4 +
.../impl/ClientProducerCreditManagerImpl.java | 24 +-
.../core/client/impl/ClientProducerCredits.java | 3 +
.../client/impl/ClientProducerCreditsImpl.java | 167 ++++----------
.../client/impl/ClientProducerFlowCallback.java | 24 ++
.../core/impl/ActiveMQSessionContext.java | 4 +-
.../spi/core/remoting/SessionContext.java | 4 +-
.../core/server/ActiveMQServerLogger.java | 222 +++++++++++++++++++
.../core/server/cluster/impl/BridgeImpl.java | 26 ++-
.../integration/cluster/bridge/BridgeTest.java | 2 +-
12 files changed, 591 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
new file mode 100644
index 0000000..25b2d97
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+
+public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {
+
+ protected int pendingCredits;
+
+ private final int windowSize;
+
+ protected volatile boolean closed;
+
+ protected boolean blocked;
+
+ protected final SimpleString address;
+
+ private final ClientSessionInternal session;
+
+ protected int arriving;
+
+ private int refCount;
+
+ protected boolean serverRespondedWithFail;
+
+ protected SessionContext sessionContext;
+
+ public AbstractProducerCreditsImpl(final ClientSessionInternal session,
+ final SimpleString address,
+ final int windowSize) {
+ this.session = session;
+
+ this.address = address;
+
+ this.windowSize = windowSize / 2;
+ }
+
+ @Override
+ public SimpleString getAddress() {
+ return address;
+ }
+
+ @Override
+ public void init(SessionContext sessionContext) {
+ // We initial request twice as many credits as we request in subsequent requests
+ // This allows the producer to keep sending as more arrive, minimising pauses
+ checkCredits(windowSize);
+
+ this.sessionContext = sessionContext;
+
+ this.sessionContext.linkFlowControl(address, this);
+ }
+
+ @Override
+ public void acquireCredits(final int credits) throws ActiveMQException {
+ checkCredits(credits);
+
+ actualAcquire(credits);
+
+ afterAcquired(credits);
+
+ }
+
+ protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
+ // check to see if the blocking mode is FAIL on the server
+ synchronized (this) {
+ pendingCredits -= credits;
+ }
+ }
+
+ protected abstract void actualAcquire(int credits);
+
+ @Override
+ public boolean isBlocked() {
+ return blocked;
+ }
+
+ @Override
+ public void receiveFailCredits(final int credits) {
+ serverRespondedWithFail = true;
+ // receive credits like normal to keep the sender from blocking
+ receiveCredits(credits);
+ }
+
+
+ @Override
+ public void receiveCredits(final int credits) {
+ synchronized (this) {
+ arriving -= credits;
+ }
+ }
+
+
+ @Override
+ public synchronized void reset() {
+ // Any pendingCredits credits from before failover won't arrive, so we re-initialise
+
+ int beforeFailure = pendingCredits;
+
+ pendingCredits = 0;
+ arriving = 0;
+
+ // If we are waiting for more credits than what's configured, then we need to use what we tried before
+ // otherwise the client may starve as the credit will never arrive
+ checkCredits(Math.max(windowSize * 2, beforeFailure));
+ }
+
+ @Override
+ public void close() {
+ // Closing a producer that is blocking should make it return
+ closed = true;
+ }
+
+ @Override
+ public synchronized void incrementRefCount() {
+ refCount++;
+ }
+
+ @Override
+ public synchronized int decrementRefCount() {
+ return --refCount;
+ }
+
+ public abstract int getBalance();
+
+ protected void checkCredits(final int credits) {
+ int needed = Math.max(credits, windowSize);
+
+ int toRequest = -1;
+
+ synchronized (this) {
+ if (getBalance() + arriving < needed) {
+ toRequest = needed - arriving;
+
+ pendingCredits += toRequest;
+ arriving += toRequest;
+ }
+ }
+
+ if (toRequest != -1) {
+ requestCredits(toRequest);
+ }
+ }
+
+ private void requestCredits(final int credits) {
+ session.sendProducerCreditsMessage(credits, address);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
new file mode 100644
index 0000000..a49122f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl {
+
+ int balance;
+
+ final ClientProducerFlowCallback callback;
+
+ public AsynchronousProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize,
+ ClientProducerFlowCallback callback) {
+ super(session, address, windowSize);
+ balance = windowSize;
+ this.callback = callback;
+ }
+
+ @Override
+ protected synchronized void actualAcquire(int credits) {
+ synchronized (this) {
+ balance -= credits;
+ if (balance <= 0) {
+ callback.onCreditsFlow(true, this);
+ }
+ }
+
+ }
+
+ @Override
+ public int getBalance() {
+ return balance;
+ }
+
+ @Override
+ public void receiveCredits(int credits) {
+ synchronized (this) {
+ super.receiveCredits(credits);
+ balance += credits;
+ callback.onCreditsFlow(balance <= 0, this);
+ }
+
+ }
+
+
+ @Override
+ public void receiveFailCredits(final int credits) {
+ super.receiveFailCredits(credits);
+ callback.onCreditsFail(this);
+ }
+
+ @Override
+ public void releaseOutstanding() {
+ synchronized (this) {
+ balance = 0;
+ callback.onCreditsFlow(true, this);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
index 3c10e1a..bb65f72 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
@@ -36,4 +36,8 @@ public interface ClientProducerCreditManager {
int creditsMapSize();
int unReferencedCreditsSize();
+
+ /** This will determine the flow control as asynchronous,
+ * no actual block should happen instead a callback will be sent whenever blockages change */
+ void setCallback(ClientProducerFlowCallback callback);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index 80b255f..cd2db9c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -35,12 +35,22 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
private int windowSize;
+ private ClientProducerFlowCallback callback;
+
public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
this.session = session;
this.windowSize = windowSize;
}
+
+ /** This will determine the flow control as asynchronous,
+ * no actual block should happen instead a callback will be sent whenever blockages change */
+ @Override
+ public void setCallback(ClientProducerFlowCallback callback) {
+ this.callback = callback;
+ }
+
@Override
public synchronized ClientProducerCredits getCredits(final SimpleString address,
final boolean anon,
@@ -56,7 +66,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
if (credits == null) {
// Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ credits = build(address);
needInit = true;
producerCredits.put(address, credits);
@@ -83,6 +93,14 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
}
}
+ private ClientProducerCredits build(SimpleString address) {
+ if (callback != null) {
+ return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);
+ } else {
+ return new ClientProducerCreditsImpl(session, address, windowSize);
+ }
+ }
+
@Override
public synchronized void returnCredits(final SimpleString address) {
ClientProducerCredits credits = producerCredits.get(address);
@@ -210,6 +228,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
public void releaseOutstanding() {
}
+ @Override
+ public SimpleString getAddress() {
+ return SimpleString.toSimpleString("");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index a97df92..321fda5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
public interface ClientProducerCredits {
@@ -40,4 +41,6 @@ public interface ClientProducerCredits {
int decrementRefCount();
void releaseOutstanding();
+
+ SimpleString getAddress();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index 75543fd..41a08c9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -19,96 +19,33 @@ package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
-public class ClientProducerCreditsImpl implements ClientProducerCredits {
+public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl {
- private final Semaphore semaphore;
-
- private final int windowSize;
-
- private volatile boolean closed;
-
- private boolean blocked;
-
- private final SimpleString address;
-
- private final ClientSessionInternal session;
-
- private int pendingCredits;
-
- private int arriving;
-
- private int refCount;
-
- private boolean serverRespondedWithFail;
- private SessionContext sessionContext;
-
- public ClientProducerCreditsImpl(final ClientSessionInternal session,
- final SimpleString address,
- final int windowSize) {
- this.session = session;
+ private final Semaphore semaphore;
- this.address = address;
+ public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) {
+ super(session, address, windowSize);
- this.windowSize = windowSize / 2;
// Doesn't need to be fair since session is single threaded
-
semaphore = new Semaphore(0, false);
- }
- @Override
- public void init(SessionContext sessionContext) {
- // We initial request twice as many credits as we request in subsequent requests
- // This allows the producer to keep sending as more arrive, minimising pauses
- checkCredits(windowSize);
-
- this.sessionContext = sessionContext;
-
- this.sessionContext.linkFlowControl(address, this);
}
- @Override
- public void acquireCredits(final int credits) throws ActiveMQException {
- checkCredits(credits);
-
- boolean tryAcquire;
-
- synchronized (this) {
- tryAcquire = semaphore.tryAcquire(credits);
- }
-
- if (!tryAcquire) {
- if (!closed) {
- this.blocked = true;
- try {
- while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
- // I'm using string concatenation here in case address is null
- // better getting a "null" string than a NPE
- ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
- }
- } catch (InterruptedException interrupted) {
- Thread.currentThread().interrupt();
- throw new ActiveMQInterruptedException(interrupted);
- } finally {
- this.blocked = false;
- }
- }
- }
-
- synchronized (this) {
- pendingCredits -= credits;
- }
+ @Override
+ protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
// check to see if the blocking mode is FAIL on the server
synchronized (this) {
+ super.afterAcquired(credits);
+
if (serverRespondedWithFail) {
serverRespondedWithFail = false;
@@ -123,29 +60,30 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
}
@Override
- public boolean isBlocked() {
- return blocked;
- }
-
- public int getBalance() {
- return semaphore.availablePermits();
- }
+ protected void actualAcquire(int credits) {
- @Override
- public void receiveCredits(final int credits) {
+ boolean tryAcquire;
synchronized (this) {
- arriving -= credits;
+ tryAcquire = semaphore.tryAcquire(credits);
}
- semaphore.release(credits);
+ if (!tryAcquire && !closed) {
+ this.blocked = true;
+ try {
+ while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
+ // I'm using string concatenation here in case address is null
+ // better getting a "null" string than a NPE
+ ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
+ }
+ } catch (InterruptedException interrupted) {
+ Thread.currentThread().interrupt();
+ throw new ActiveMQInterruptedException(interrupted);
+ } finally {
+ this.blocked = false;
+ }
+ }
}
- @Override
- public void receiveFailCredits(final int credits) {
- serverRespondedWithFail = true;
- // receive credits like normal to keep the sender from blocking
- receiveCredits(credits);
- }
@Override
public synchronized void reset() {
@@ -153,59 +91,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
semaphore.drainPermits();
- int beforeFailure = pendingCredits;
-
- pendingCredits = 0;
- arriving = 0;
-
- // If we are waiting for more credits than what's configured, then we need to use what we tried before
- // otherwise the client may starve as the credit will never arrive
- checkCredits(Math.max(windowSize * 2, beforeFailure));
+ super.reset();
}
+
@Override
public void close() {
- // Closing a producer that is blocking should make it return
- closed = true;
+ super.close();
+ // Closing a producer that is blocking should make it return
semaphore.release(Integer.MAX_VALUE / 2);
}
@Override
- public synchronized void incrementRefCount() {
- refCount++;
- }
+ public void receiveCredits(final int credits) {
+ synchronized (this) {
+ super.receiveCredits(credits);
+ }
- @Override
- public synchronized int decrementRefCount() {
- return --refCount;
+ semaphore.release(credits);
}
+
@Override
public synchronized void releaseOutstanding() {
semaphore.drainPermits();
}
- private void checkCredits(final int credits) {
- int needed = Math.max(credits, windowSize);
-
- int toRequest = -1;
-
- synchronized (this) {
- if (semaphore.availablePermits() + arriving < needed) {
- toRequest = needed - arriving;
-
- pendingCredits += toRequest;
- arriving += toRequest;
- }
- }
-
- if (toRequest != -1) {
- requestCredits(toRequest);
- }
+ @Override
+ public int getBalance() {
+ return semaphore.availablePermits();
}
- private void requestCredits(final int credits) {
- session.sendProducerCreditsMessage(credits, address);
- }
+
}
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
new file mode 100644
index 0000000..1fc289d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+public interface ClientProducerFlowCallback {
+ void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits);
+
+ void onCreditsFail(ClientProducerCredits credits);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index d2bcc96..721f717 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -225,7 +225,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) {
+ public void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits) {
// nothing to be done here... Flow control here is done on the core side
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 1f15cc6..7c31597 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -277,7 +277,7 @@ public abstract class SessionContext {
public abstract void cleanup();
- public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
+ public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits);
public abstract boolean isWritable(ReadyListener callback);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 5ecaee6..e728d33 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -31,6 +31,7 @@ package org.apache.activemq.artemis.core.server;
* so an INFO message would be 101000 to 101999
*/
+import javax.naming.NamingException;
import javax.transaction.xa.Xid;
import java.io.File;
import java.net.SocketAddress;
@@ -1291,6 +1292,227 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222217, value = "Cannot find connector-ref {0}. The cluster-connection {1} will not be deployed.", format = Message.Format.MESSAGE_FORMAT)
void connectorRefNotFound(String connectorRef, String clusterConnection);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222218, value = "Server disconnecting: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void disconnectCritical(String reason, @Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222219, value = "File {0} does not exist",
+ format = Message.Format.MESSAGE_FORMAT)
+ void fileDoesNotExist(String path);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222220, value = " Error while cleaning paging on queue {0}", format = Message.Format.MESSAGE_FORMAT)
+ void errorCleaningPagingOnQueue(@Cause Exception e, String queue);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222221, value = "Error while cleaning page, during the commit", format = Message.Format.MESSAGE_FORMAT)
+ void errorCleaningPagingDuringCommit(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222222, value = "Error while deleting page-complete-record", format = Message.Format.MESSAGE_FORMAT)
+ void errorDeletingPageCompleteRecord(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222223, value = "Failed to calculate message memory estimate", format = Message.Format.MESSAGE_FORMAT)
+ void errorCalculateMessageMemoryEstimate(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222224, value = "Failed to calculate scheduled delivery time", format = Message.Format.MESSAGE_FORMAT)
+ void errorCalculateScheduledDeliveryTime(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222225, value = "Sending unexpected exception to the client", format = Message.Format.MESSAGE_FORMAT)
+ void sendingUnexpectedExceptionToClient(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222226, value = "Connection configuration is null for connectorName {0}", format = Message.Format.MESSAGE_FORMAT)
+ void connectionConfigurationIsNull(String connectorName);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222227, value = "Failed to process an event", format = Message.Format.MESSAGE_FORMAT)
+ void failedToProcessEvent(@Cause NamingException e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222228, value = "Missing replication token on queue", format = Message.Format.MESSAGE_FORMAT)
+ void missingReplicationTokenOnQueue();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222229, value = "Failed to perform rollback", format = Message.Format.MESSAGE_FORMAT)
+ void failedToPerformRollback(@Cause IllegalStateException e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222230, value = "Failed to send notification", format = Message.Format.MESSAGE_FORMAT)
+ void failedToSendNotification(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222231, value = "Failed to flush outstanding data from the connection", format = Message.Format.MESSAGE_FORMAT)
+ void failedToFlushOutstandingDataFromTheConnection(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222232, value = "Unable to acquire lock", format = Message.Format.MESSAGE_FORMAT)
+ void unableToAcquireLock(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222233, value = "Unable to destroy connection with session metadata", format = Message.Format.MESSAGE_FORMAT)
+ void unableDestroyConnectionWithSessionMetadata(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222234, value = "Unable to deactivate a callback", format = Message.Format.MESSAGE_FORMAT)
+ void unableToDeactiveCallback(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222235, value = "Unable to inject a monitor", format = Message.Format.MESSAGE_FORMAT)
+ void unableToInjectMonitor(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222236, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT)
+ void unableToFlushDeliveries(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222237, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCancelRedistributor(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222238, value = "Unable to commit transaction", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCommitTransaction(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222239, value = "Unable to delete Queue status", format = Message.Format.MESSAGE_FORMAT)
+ void unableToDeleteQueueStatus(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222240, value = "Unable to pause a Queue", format = Message.Format.MESSAGE_FORMAT)
+ void unableToPauseQueue(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222241, value = "Unable to resume a Queue", format = Message.Format.MESSAGE_FORMAT)
+ void unableToResumeQueue(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222242, value = "Unable to obtain message priority, using default ", format = Message.Format.MESSAGE_FORMAT)
+ void unableToGetMessagePriority(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222243, value = "Unable to extract GroupID from message", format = Message.Format.MESSAGE_FORMAT)
+ void unableToExtractGroupID(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222244, value = "Unable to check if message expired", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCheckIfMessageExpired(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222245, value = "Unable to perform post acknowledge", format = Message.Format.MESSAGE_FORMAT)
+ void unableToPerformPostAcknowledge(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222246, value = "Unable to rollback on close", format = Message.Format.MESSAGE_FORMAT)
+ void unableToRollbackOnClose(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222247, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCloseConsumer(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222248, value = "Unable to remove consumer", format = Message.Format.MESSAGE_FORMAT)
+ void unableToRemoveConsumer(@Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222249, value = "Unable to rollback on TX timed out", format = Message.Format.MESSAGE_FORMAT)
+ void unableToRollbackOnTxTimedOut(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222250, value = "Unable to delete heuristic completion from storage manager", format = Message.Format.MESSAGE_FORMAT)
+ void unableToDeleteHeuristicCompletion(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222251, value = "Unable to start replication", format = Message.Format.MESSAGE_FORMAT)
+ void unableToStartReplication(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222252, value = "Unable to calculate file size", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCalculateFileSize(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222253, value = "Error while syncing data on largeMessageInSync:: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void errorWhileSyncingData(String target, @Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222254, value = "Invalid record type {0}", format = Message.Format.MESSAGE_FORMAT)
+ void invalidRecordType(byte type, @Cause Throwable e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222255, value = "Unable to calculate file store usage", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCalculateFileStoreUsage(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222256, value = "Failed to unregister acceptors", format = Message.Format.MESSAGE_FORMAT)
+ void failedToUnregisterAcceptors(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222257, value = "Failed to decrement message reference count", format = Message.Format.MESSAGE_FORMAT)
+ void failedToDecrementMessageReferenceCount(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222258, value = "Error on deleting queue {0}", format = Message.Format.MESSAGE_FORMAT)
+ void errorOnDeletingQueue(String queueName, @Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222259, value = "Failed to flush the executor", format = Message.Format.MESSAGE_FORMAT)
+ void failedToFlushExecutor(@Cause InterruptedException e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222260, value = "Failed to perform rollback", format = Message.Format.MESSAGE_FORMAT)
+ void failedToRollback(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222261, value = "Failed to activate a backup", format = Message.Format.MESSAGE_FORMAT)
+ void failedToActivateBackup(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222262, value = "Failed to stop cluster manager", format = Message.Format.MESSAGE_FORMAT)
+ void failedToStopClusterManager(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222263, value = "Failed to stop cluster connection", format = Message.Format.MESSAGE_FORMAT)
+ void failedToStopClusterConnection(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222264, value = "Failed to process message reference after rollback", format = Message.Format.MESSAGE_FORMAT)
+ void failedToProcessMessageReferenceAfterRollback(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222265, value = "Failed to finish delivery, unable to lock delivery", format = Message.Format.MESSAGE_FORMAT)
+ void failedToFinishDelivery(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222266, value = "Failed to send request to the node", format = Message.Format.MESSAGE_FORMAT)
+ void failedToSendRequestToNode(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222267, value = "Failed to disconnect bindings", format = Message.Format.MESSAGE_FORMAT)
+ void failedToDisconnectBindings(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222268, value = "Failed to remove a record", format = Message.Format.MESSAGE_FORMAT)
+ void failedToRemoveRecord(@Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222269, value = "Please use a fixed value for \"journal-pool-files\". Default changed per https://issues.apache.org/jira/browse/ARTEMIS-1628", format = Message.Format.MESSAGE_FORMAT)
+ void useFixedValueOnJournalPoolFiles();
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT)
+ void journalMessageAckMissingQueueInPreparedTX(Long queueID);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
+ void bridgeAddressFull(String addressName, String bridgeName);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 94e28e6..dbbb4ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
@@ -71,7 +73,7 @@ import org.jboss.logging.Logger;
* A Core BridgeImpl
*/
-public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
+public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
// Constants -----------------------------------------------------
private static final Logger logger = Logger.getLogger(BridgeImpl.class);
@@ -122,6 +124,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final long maxRetryInterval;
+ private boolean blockedOnFlowControl;
+
/**
* Used when there's a scheduled reconnection
*/
@@ -255,6 +259,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
@Override
+ public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) {
+ this.blockedOnFlowControl = blocked;
+ if (!blocked) {
+ queue.deliverAsync();
+ }
+ }
+
+ @Override
+ public void onCreditsFail(ClientProducerCredits producerCredits) {
+ ActiveMQServerLogger.LOGGER.bridgeAddressFull("" + producerCredits.getAddress(), "" + this.getName());
+ disconnect();
+ }
+
+ @Override
public synchronized void start() throws Exception {
if (started) {
return;
@@ -536,6 +554,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
queue.deliverAsync();
}
+
@Override
public HandleStatus handle(final MessageReference ref) throws Exception {
if (filter != null && !filter.match(ref.getMessage())) {
@@ -550,6 +569,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return HandleStatus.BUSY;
}
+ if (blockedOnFlowControl) {
+ return HandleStatus.BUSY;
+ }
+
if (deliveringLargeMessage) {
return HandleStatus.BUSY;
}
@@ -868,6 +891,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
// Session is pre-acknowledge
session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
+ session.getProducerCreditManager().setCallback(this);
sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 59d35db..03ceb1a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -1077,7 +1077,7 @@ public class BridgeTest extends ActiveMQTestBase {
ArrayList<String> staticConnectors = new ArrayList<>();
staticConnectors.add(server1tc.getName());
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors);
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);