You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/17 14:58:41 UTC

[2/4] activemq-artemis git commit: ARTEMIS-1776 Asynchronous Flow control on the bridge

ARTEMIS-1776 Asynchronous Flow control on the bridge

Cherry-picked from 70bdfe760393a9d7d17ec175ea68ce83819fe83c


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2c73bd04
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2c73bd04
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2c73bd04

Branch: refs/heads/1.x
Commit: 2c73bd0462b3818d49ce7c4557e1b48587707cb1
Parents: 1e6e1f7
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 28 12:40:06 2018 -0400
Committer: JiriOndrusek <jo...@redhat.com>
Committed: Tue Apr 10 16:44:07 2018 +0200

----------------------------------------------------------------------
 .../impl/AbstractProducerCreditsImpl.java       | 167 ++++++++++++++
 .../impl/AsynchronousProducerCreditsImpl.java   |  76 +++++++
 .../impl/ClientProducerCreditManager.java       |   4 +
 .../impl/ClientProducerCreditManagerImpl.java   |  24 +-
 .../core/client/impl/ClientProducerCredits.java |   3 +
 .../client/impl/ClientProducerCreditsImpl.java  | 167 ++++----------
 .../client/impl/ClientProducerFlowCallback.java |  24 ++
 .../core/impl/ActiveMQSessionContext.java       |   4 +-
 .../spi/core/remoting/SessionContext.java       |   4 +-
 .../core/server/ActiveMQServerLogger.java       | 222 +++++++++++++++++++
 .../core/server/cluster/impl/BridgeImpl.java    |  26 ++-
 .../integration/cluster/bridge/BridgeTest.java  |   2 +-
 12 files changed, 591 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
new file mode 100644
index 0000000..25b2d97
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+
+public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {
+
+   protected int pendingCredits;
+
+   private final int windowSize;
+
+   protected volatile boolean closed;
+
+   protected boolean blocked;
+
+   protected final SimpleString address;
+
+   private final ClientSessionInternal session;
+
+   protected int arriving;
+
+   private int refCount;
+
+   protected boolean serverRespondedWithFail;
+
+   protected SessionContext sessionContext;
+
+   public AbstractProducerCreditsImpl(final ClientSessionInternal session,
+                                      final SimpleString address,
+                                      final int windowSize) {
+      this.session = session;
+
+      this.address = address;
+
+      this.windowSize = windowSize / 2;
+   }
+
+   @Override
+   public SimpleString getAddress() {
+      return address;
+   }
+
+   @Override
+   public void init(SessionContext sessionContext) {
+      // We initial request twice as many credits as we request in subsequent requests
+      // This allows the producer to keep sending as more arrive, minimising pauses
+      checkCredits(windowSize);
+
+      this.sessionContext = sessionContext;
+
+      this.sessionContext.linkFlowControl(address, this);
+   }
+
+   @Override
+   public void acquireCredits(final int credits) throws ActiveMQException {
+      checkCredits(credits);
+
+      actualAcquire(credits);
+
+      afterAcquired(credits);
+
+   }
+
+   protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
+      // check to see if the blocking mode is FAIL on the server
+      synchronized (this) {
+         pendingCredits -= credits;
+      }
+   }
+
+   protected abstract void actualAcquire(int credits);
+
+   @Override
+   public boolean isBlocked() {
+      return blocked;
+   }
+
+   @Override
+   public void receiveFailCredits(final int credits) {
+      serverRespondedWithFail = true;
+      // receive credits like normal to keep the sender from blocking
+      receiveCredits(credits);
+   }
+
+
+   @Override
+   public void receiveCredits(final int credits) {
+      synchronized (this) {
+         arriving -= credits;
+      }
+   }
+
+
+   @Override
+   public synchronized void reset() {
+      // Any pendingCredits credits from before failover won't arrive, so we re-initialise
+
+      int beforeFailure = pendingCredits;
+
+      pendingCredits = 0;
+      arriving = 0;
+
+      // If we are waiting for more credits than what's configured, then we need to use what we tried before
+      // otherwise the client may starve as the credit will never arrive
+      checkCredits(Math.max(windowSize * 2, beforeFailure));
+   }
+
+   @Override
+   public void close() {
+      // Closing a producer that is blocking should make it return
+      closed = true;
+   }
+
+   @Override
+   public synchronized void incrementRefCount() {
+      refCount++;
+   }
+
+   @Override
+   public synchronized int decrementRefCount() {
+      return --refCount;
+   }
+
+   public abstract int getBalance();
+
+   protected void checkCredits(final int credits) {
+      int needed = Math.max(credits, windowSize);
+
+      int toRequest = -1;
+
+      synchronized (this) {
+         if (getBalance() + arriving < needed) {
+            toRequest = needed - arriving;
+
+            pendingCredits += toRequest;
+            arriving += toRequest;
+         }
+      }
+
+      if (toRequest != -1) {
+         requestCredits(toRequest);
+      }
+   }
+
+   private void requestCredits(final int credits) {
+      session.sendProducerCreditsMessage(credits, address);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
new file mode 100644
index 0000000..a49122f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl {
+
+   int balance;
+
+   final ClientProducerFlowCallback callback;
+
+   public AsynchronousProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize,
+                                          ClientProducerFlowCallback callback) {
+      super(session, address, windowSize);
+      balance = windowSize;
+      this.callback = callback;
+   }
+
+   @Override
+   protected synchronized void actualAcquire(int credits) {
+      synchronized (this) {
+         balance -= credits;
+         if (balance <= 0) {
+            callback.onCreditsFlow(true, this);
+         }
+      }
+
+   }
+
+   @Override
+   public int getBalance() {
+      return balance;
+   }
+
+   @Override
+   public void receiveCredits(int credits) {
+      synchronized (this) {
+         super.receiveCredits(credits);
+         balance += credits;
+         callback.onCreditsFlow(balance <= 0, this);
+      }
+
+   }
+
+
+   @Override
+   public void receiveFailCredits(final int credits) {
+      super.receiveFailCredits(credits);
+      callback.onCreditsFail(this);
+   }
+
+   @Override
+   public void releaseOutstanding() {
+      synchronized (this) {
+         balance = 0;
+         callback.onCreditsFlow(true, this);
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
index 3c10e1a..bb65f72 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
@@ -36,4 +36,8 @@ public interface ClientProducerCreditManager {
    int creditsMapSize();
 
    int unReferencedCreditsSize();
+
+   /** This will determine the flow control as asynchronous,
+    *  no actual block should happen instead a callback will be sent whenever blockages change  */
+   void setCallback(ClientProducerFlowCallback callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index 80b255f..cd2db9c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -35,12 +35,22 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
 
    private int windowSize;
 
+   private ClientProducerFlowCallback callback;
+
    public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
       this.session = session;
 
       this.windowSize = windowSize;
    }
 
+
+   /** This will determine the flow control as asynchronous,
+    *  no actual block should happen instead a callback will be sent whenever blockages change  */
+   @Override
+   public void setCallback(ClientProducerFlowCallback callback) {
+      this.callback = callback;
+   }
+
    @Override
    public synchronized ClientProducerCredits getCredits(final SimpleString address,
                                                         final boolean anon,
@@ -56,7 +66,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
 
             if (credits == null) {
                // Doesn't need to be fair since session is single threaded
-               credits = new ClientProducerCreditsImpl(session, address, windowSize);
+               credits = build(address);
                needInit = true;
 
                producerCredits.put(address, credits);
@@ -83,6 +93,14 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
       }
    }
 
+   private ClientProducerCredits build(SimpleString address) {
+      if (callback != null) {
+         return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);
+      } else {
+         return new ClientProducerCreditsImpl(session, address, windowSize);
+      }
+   }
+
    @Override
    public synchronized void returnCredits(final SimpleString address) {
       ClientProducerCredits credits = producerCredits.get(address);
@@ -210,6 +228,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
       public void releaseOutstanding() {
       }
 
+      @Override
+      public SimpleString getAddress() {
+         return SimpleString.toSimpleString("");
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index a97df92..321fda5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.client.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 
 public interface ClientProducerCredits {
@@ -40,4 +41,6 @@ public interface ClientProducerCredits {
    int decrementRefCount();
 
    void releaseOutstanding();
+
+   SimpleString getAddress();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index 75543fd..41a08c9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -19,96 +19,33 @@ package org.apache.activemq.artemis.core.client.impl;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 
-public class ClientProducerCreditsImpl implements ClientProducerCredits {
+public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl {
 
-   private final Semaphore semaphore;
-
-   private final int windowSize;
-
-   private volatile boolean closed;
-
-   private boolean blocked;
-
-   private final SimpleString address;
-
-   private final ClientSessionInternal session;
-
-   private int pendingCredits;
-
-   private int arriving;
-
-   private int refCount;
-
-   private boolean serverRespondedWithFail;
 
-   private SessionContext sessionContext;
-
-   public ClientProducerCreditsImpl(final ClientSessionInternal session,
-                                    final SimpleString address,
-                                    final int windowSize) {
-      this.session = session;
+   private final Semaphore semaphore;
 
-      this.address = address;
+   public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) {
+      super(session, address, windowSize);
 
-      this.windowSize = windowSize / 2;
 
       // Doesn't need to be fair since session is single threaded
-
       semaphore = new Semaphore(0, false);
-   }
 
-   @Override
-   public void init(SessionContext sessionContext) {
-      // We initial request twice as many credits as we request in subsequent requests
-      // This allows the producer to keep sending as more arrive, minimising pauses
-      checkCredits(windowSize);
-
-      this.sessionContext = sessionContext;
-
-      this.sessionContext.linkFlowControl(address, this);
    }
 
-   @Override
-   public void acquireCredits(final int credits) throws ActiveMQException {
-      checkCredits(credits);
-
-      boolean tryAcquire;
-
-      synchronized (this) {
-         tryAcquire = semaphore.tryAcquire(credits);
-      }
-
-      if (!tryAcquire) {
-         if (!closed) {
-            this.blocked = true;
-            try {
-               while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
-                  // I'm using string concatenation here in case address is null
-                  // better getting a "null" string than a NPE
-                  ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
-               }
-            } catch (InterruptedException interrupted) {
-               Thread.currentThread().interrupt();
-               throw new ActiveMQInterruptedException(interrupted);
-            } finally {
-               this.blocked = false;
-            }
-         }
-      }
-
-      synchronized (this) {
-         pendingCredits -= credits;
-      }
 
+   @Override
+   protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
       // check to see if the blocking mode is FAIL on the server
       synchronized (this) {
+         super.afterAcquired(credits);
+
          if (serverRespondedWithFail) {
             serverRespondedWithFail = false;
 
@@ -123,29 +60,30 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
    }
 
    @Override
-   public boolean isBlocked() {
-      return blocked;
-   }
-
-   public int getBalance() {
-      return semaphore.availablePermits();
-   }
+   protected void actualAcquire(int credits) {
 
-   @Override
-   public void receiveCredits(final int credits) {
+      boolean tryAcquire;
       synchronized (this) {
-         arriving -= credits;
+         tryAcquire = semaphore.tryAcquire(credits);
       }
 
-      semaphore.release(credits);
+      if (!tryAcquire && !closed) {
+         this.blocked = true;
+         try {
+            while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
+               // I'm using string concatenation here in case address is null
+               // better getting a "null" string than a NPE
+               ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
+            }
+         } catch (InterruptedException interrupted) {
+            Thread.currentThread().interrupt();
+            throw new ActiveMQInterruptedException(interrupted);
+         } finally {
+            this.blocked = false;
+         }
+      }
    }
 
-   @Override
-   public void receiveFailCredits(final int credits) {
-      serverRespondedWithFail = true;
-      // receive credits like normal to keep the sender from blocking
-      receiveCredits(credits);
-   }
 
    @Override
    public synchronized void reset() {
@@ -153,59 +91,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
 
       semaphore.drainPermits();
 
-      int beforeFailure = pendingCredits;
-
-      pendingCredits = 0;
-      arriving = 0;
-
-      // If we are waiting for more credits than what's configured, then we need to use what we tried before
-      // otherwise the client may starve as the credit will never arrive
-      checkCredits(Math.max(windowSize * 2, beforeFailure));
+      super.reset();
    }
 
+
    @Override
    public void close() {
-      // Closing a producer that is blocking should make it return
-      closed = true;
+      super.close();
 
+      // Closing a producer that is blocking should make it return
       semaphore.release(Integer.MAX_VALUE / 2);
    }
 
    @Override
-   public synchronized void incrementRefCount() {
-      refCount++;
-   }
+   public void receiveCredits(final int credits) {
+      synchronized (this) {
+         super.receiveCredits(credits);
+      }
 
-   @Override
-   public synchronized int decrementRefCount() {
-      return --refCount;
+      semaphore.release(credits);
    }
 
+
    @Override
    public synchronized void releaseOutstanding() {
       semaphore.drainPermits();
    }
 
-   private void checkCredits(final int credits) {
-      int needed = Math.max(credits, windowSize);
-
-      int toRequest = -1;
-
-      synchronized (this) {
-         if (semaphore.availablePermits() + arriving < needed) {
-            toRequest = needed - arriving;
-
-            pendingCredits += toRequest;
-            arriving += toRequest;
-         }
-      }
-
-      if (toRequest != -1) {
-         requestCredits(toRequest);
-      }
+   @Override
+   public int getBalance() {
+      return semaphore.availablePermits();
    }
 
-   private void requestCredits(final int credits) {
-      session.sendProducerCreditsMessage(credits, address);
-   }
+
 }
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
new file mode 100644
index 0000000..1fc289d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+public interface ClientProducerFlowCallback {
+   void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits);
+
+   void onCreditsFail(ClientProducerCredits credits);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index d2bcc96..721f717 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -225,7 +225,7 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) {
+   public void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits) {
       // nothing to be done here... Flow control here is done on the core side
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 1f15cc6..7c31597 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -277,7 +277,7 @@ public abstract class SessionContext {
 
    public abstract void cleanup();
 
-   public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
+   public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits);
 
    public abstract boolean isWritable(ReadyListener callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 5ecaee6..e728d33 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -31,6 +31,7 @@ package org.apache.activemq.artemis.core.server;
  * so an INFO message would be 101000 to 101999
  */
 
+import javax.naming.NamingException;
 import javax.transaction.xa.Xid;
 import java.io.File;
 import java.net.SocketAddress;
@@ -1291,6 +1292,227 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 222217, value = "Cannot find connector-ref {0}. The cluster-connection {1} will not be deployed.", format = Message.Format.MESSAGE_FORMAT)
    void connectorRefNotFound(String connectorRef, String clusterConnection);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222218, value = "Server disconnecting: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void disconnectCritical(String reason, @Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222219, value = "File {0} does not exist",
+           format = Message.Format.MESSAGE_FORMAT)
+   void fileDoesNotExist(String path);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222220, value = "   Error while cleaning paging on queue {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorCleaningPagingOnQueue(@Cause Exception e, String queue);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222221, value = "Error while cleaning page, during the commit", format = Message.Format.MESSAGE_FORMAT)
+   void errorCleaningPagingDuringCommit(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222222, value = "Error while deleting page-complete-record", format = Message.Format.MESSAGE_FORMAT)
+   void errorDeletingPageCompleteRecord(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222223, value = "Failed to calculate message memory estimate", format = Message.Format.MESSAGE_FORMAT)
+   void errorCalculateMessageMemoryEstimate(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222224, value = "Failed to calculate scheduled delivery time", format = Message.Format.MESSAGE_FORMAT)
+   void errorCalculateScheduledDeliveryTime(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222225, value = "Sending unexpected exception to the client", format = Message.Format.MESSAGE_FORMAT)
+   void sendingUnexpectedExceptionToClient(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222226, value = "Connection configuration is null for connectorName {0}", format = Message.Format.MESSAGE_FORMAT)
+   void connectionConfigurationIsNull(String connectorName);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222227, value = "Failed to process an event", format = Message.Format.MESSAGE_FORMAT)
+   void failedToProcessEvent(@Cause NamingException e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222228, value = "Missing replication token on queue", format = Message.Format.MESSAGE_FORMAT)
+   void missingReplicationTokenOnQueue();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222229, value = "Failed to perform rollback", format = Message.Format.MESSAGE_FORMAT)
+   void failedToPerformRollback(@Cause IllegalStateException e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222230, value = "Failed to send notification", format = Message.Format.MESSAGE_FORMAT)
+   void failedToSendNotification(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222231, value = "Failed to flush outstanding data from the connection", format = Message.Format.MESSAGE_FORMAT)
+   void failedToFlushOutstandingDataFromTheConnection(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222232, value = "Unable to acquire lock", format = Message.Format.MESSAGE_FORMAT)
+   void unableToAcquireLock(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222233, value = "Unable to destroy connection with session metadata", format = Message.Format.MESSAGE_FORMAT)
+   void unableDestroyConnectionWithSessionMetadata(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222234, value = "Unable to deactivate a callback", format = Message.Format.MESSAGE_FORMAT)
+   void unableToDeactiveCallback(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222235, value = "Unable to inject a monitor", format = Message.Format.MESSAGE_FORMAT)
+   void unableToInjectMonitor(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222236, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT)
+   void unableToFlushDeliveries(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222237, value = "Unable to flush deliveries", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCancelRedistributor(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222238, value = "Unable to commit transaction", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCommitTransaction(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222239, value = "Unable to delete Queue status", format = Message.Format.MESSAGE_FORMAT)
+   void unableToDeleteQueueStatus(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222240, value = "Unable to pause a Queue", format = Message.Format.MESSAGE_FORMAT)
+   void unableToPauseQueue(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222241, value = "Unable to resume a Queue", format = Message.Format.MESSAGE_FORMAT)
+   void unableToResumeQueue(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222242, value = "Unable to obtain message priority, using default ", format = Message.Format.MESSAGE_FORMAT)
+   void unableToGetMessagePriority(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222243, value = "Unable to extract GroupID from message", format = Message.Format.MESSAGE_FORMAT)
+   void unableToExtractGroupID(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222244, value = "Unable to check if message expired", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCheckIfMessageExpired(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222245, value = "Unable to perform post acknowledge", format = Message.Format.MESSAGE_FORMAT)
+   void unableToPerformPostAcknowledge(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222246, value = "Unable to rollback on close", format = Message.Format.MESSAGE_FORMAT)
+   void unableToRollbackOnClose(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222247, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCloseConsumer(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222248, value = "Unable to remove consumer", format = Message.Format.MESSAGE_FORMAT)
+   void unableToRemoveConsumer(@Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222249, value = "Unable to rollback on TX timed out", format = Message.Format.MESSAGE_FORMAT)
+   void unableToRollbackOnTxTimedOut(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222250, value = "Unable to delete heuristic completion from storage manager", format = Message.Format.MESSAGE_FORMAT)
+   void unableToDeleteHeuristicCompletion(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222251, value = "Unable to start replication", format = Message.Format.MESSAGE_FORMAT)
+   void unableToStartReplication(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222252, value = "Unable to calculate file size", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCalculateFileSize(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222253, value = "Error while syncing data on largeMessageInSync:: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorWhileSyncingData(String target, @Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222254, value = "Invalid record type {0}", format = Message.Format.MESSAGE_FORMAT)
+   void invalidRecordType(byte type, @Cause Throwable e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222255, value = "Unable to calculate file store usage", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCalculateFileStoreUsage(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222256, value = "Failed to unregister acceptors", format = Message.Format.MESSAGE_FORMAT)
+   void failedToUnregisterAcceptors(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222257, value = "Failed to decrement message reference count", format = Message.Format.MESSAGE_FORMAT)
+   void failedToDecrementMessageReferenceCount(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222258, value = "Error on deleting queue {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorOnDeletingQueue(String queueName, @Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222259, value = "Failed to flush the executor", format = Message.Format.MESSAGE_FORMAT)
+   void failedToFlushExecutor(@Cause InterruptedException e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222260, value = "Failed to perform rollback", format = Message.Format.MESSAGE_FORMAT)
+   void failedToRollback(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222261, value = "Failed to activate a backup", format = Message.Format.MESSAGE_FORMAT)
+   void failedToActivateBackup(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222262, value = "Failed to stop cluster manager", format = Message.Format.MESSAGE_FORMAT)
+   void failedToStopClusterManager(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222263, value = "Failed to stop cluster connection", format = Message.Format.MESSAGE_FORMAT)
+   void failedToStopClusterConnection(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222264, value = "Failed to process message reference after rollback", format = Message.Format.MESSAGE_FORMAT)
+   void failedToProcessMessageReferenceAfterRollback(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222265, value = "Failed to finish delivery, unable to lock delivery", format = Message.Format.MESSAGE_FORMAT)
+   void failedToFinishDelivery(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222266, value = "Failed to send request to the node", format = Message.Format.MESSAGE_FORMAT)
+   void failedToSendRequestToNode(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222267, value = "Failed to disconnect bindings", format = Message.Format.MESSAGE_FORMAT)
+   void failedToDisconnectBindings(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222268, value = "Failed to remove a record", format = Message.Format.MESSAGE_FORMAT)
+   void failedToRemoveRecord(@Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222269, value = "Please use a fixed value for \"journal-pool-files\". Default changed per https://issues.apache.org/jira/browse/ARTEMIS-1628", format = Message.Format.MESSAGE_FORMAT)
+   void useFixedValueOnJournalPoolFiles();
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
+   void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT)
+   void journalMessageAckMissingQueueInPreparedTX(Long queueID);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 22273,  value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
+   void bridgeAddressFull(String addressName, String bridgeName);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 94e28e6..dbbb4ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
@@ -71,7 +73,7 @@ import org.jboss.logging.Logger;
  * A Core BridgeImpl
  */
 
-public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
+public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
    // Constants -----------------------------------------------------
 
    private static final Logger logger = Logger.getLogger(BridgeImpl.class);
@@ -122,6 +124,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private final long maxRetryInterval;
 
+   private boolean blockedOnFlowControl;
+
    /**
     * Used when there's a scheduled reconnection
     */
@@ -255,6 +259,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    @Override
+   public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) {
+      this.blockedOnFlowControl = blocked;
+      if (!blocked) {
+         queue.deliverAsync();
+      }
+   }
+
+   @Override
+   public void onCreditsFail(ClientProducerCredits producerCredits) {
+      ActiveMQServerLogger.LOGGER.bridgeAddressFull("" + producerCredits.getAddress(), "" + this.getName());
+      disconnect();
+   }
+
+   @Override
    public synchronized void start() throws Exception {
       if (started) {
          return;
@@ -536,6 +554,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       queue.deliverAsync();
    }
 
+
    @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
       if (filter != null && !filter.match(ref.getMessage())) {
@@ -550,6 +569,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             return HandleStatus.BUSY;
          }
 
+         if (blockedOnFlowControl) {
+            return HandleStatus.BUSY;
+         }
+
          if (deliveringLargeMessage) {
             return HandleStatus.BUSY;
          }
@@ -868,6 +891,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                }
                // Session is pre-acknowledge
                session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
+               session.getProducerCreditManager().setCallback(this);
                sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2c73bd04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 59d35db..03ceb1a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -1077,7 +1077,7 @@ public class BridgeTest extends ActiveMQTestBase {
       ArrayList<String> staticConnectors = new ArrayList<>();
       staticConnectors.add(server1tc.getName());
 
-      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors);
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
       bridgeConfigs.add(bridgeConfiguration);