You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ni...@apache.org on 2018/03/29 12:59:35 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1776 Asynchronous Flow control on the bridge

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8f9299793 -> 4dbd03e53


ARTEMIS-1776 Asynchronous Flow control on the bridge


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/70bdfe76
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/70bdfe76
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/70bdfe76

Branch: refs/heads/master
Commit: 70bdfe760393a9d7d17ec175ea68ce83819fe83c
Parents: 8f92997
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 28 12:40:06 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Mar 28 19:09:20 2018 -0400

----------------------------------------------------------------------
 .../impl/AbstractProducerCreditsImpl.java       | 167 +++++++++++++++++++
 .../impl/AsynchronousProducerCreditsImpl.java   |  76 +++++++++
 .../impl/ClientProducerCreditManager.java       |   4 +
 .../impl/ClientProducerCreditManagerImpl.java   |  24 ++-
 .../core/client/impl/ClientProducerCredits.java |   3 +
 .../client/impl/ClientProducerCreditsImpl.java  | 167 +++++--------------
 .../client/impl/ClientProducerFlowCallback.java |  24 +++
 .../core/impl/ActiveMQSessionContext.java       |   4 +-
 .../spi/core/remoting/SessionContext.java       |   4 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../core/server/cluster/impl/BridgeImpl.java    |  26 ++-
 .../integration/cluster/bridge/BridgeTest.java  |   2 +-
 12 files changed, 373 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
new file mode 100644
index 0000000..25b2d97
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+
+public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {
+
+   protected int pendingCredits;
+
+   private final int windowSize;
+
+   protected volatile boolean closed;
+
+   protected boolean blocked;
+
+   protected final SimpleString address;
+
+   private final ClientSessionInternal session;
+
+   protected int arriving;
+
+   private int refCount;
+
+   protected boolean serverRespondedWithFail;
+
+   protected SessionContext sessionContext;
+
+   public AbstractProducerCreditsImpl(final ClientSessionInternal session,
+                                      final SimpleString address,
+                                      final int windowSize) {
+      this.session = session;
+
+      this.address = address;
+
+      this.windowSize = windowSize / 2;
+   }
+
+   @Override
+   public SimpleString getAddress() {
+      return address;
+   }
+
+   @Override
+   public void init(SessionContext sessionContext) {
+      // We initial request twice as many credits as we request in subsequent requests
+      // This allows the producer to keep sending as more arrive, minimising pauses
+      checkCredits(windowSize);
+
+      this.sessionContext = sessionContext;
+
+      this.sessionContext.linkFlowControl(address, this);
+   }
+
+   @Override
+   public void acquireCredits(final int credits) throws ActiveMQException {
+      checkCredits(credits);
+
+      actualAcquire(credits);
+
+      afterAcquired(credits);
+
+   }
+
+   protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
+      // check to see if the blocking mode is FAIL on the server
+      synchronized (this) {
+         pendingCredits -= credits;
+      }
+   }
+
+   protected abstract void actualAcquire(int credits);
+
+   @Override
+   public boolean isBlocked() {
+      return blocked;
+   }
+
+   @Override
+   public void receiveFailCredits(final int credits) {
+      serverRespondedWithFail = true;
+      // receive credits like normal to keep the sender from blocking
+      receiveCredits(credits);
+   }
+
+
+   @Override
+   public void receiveCredits(final int credits) {
+      synchronized (this) {
+         arriving -= credits;
+      }
+   }
+
+
+   @Override
+   public synchronized void reset() {
+      // Any pendingCredits credits from before failover won't arrive, so we re-initialise
+
+      int beforeFailure = pendingCredits;
+
+      pendingCredits = 0;
+      arriving = 0;
+
+      // If we are waiting for more credits than what's configured, then we need to use what we tried before
+      // otherwise the client may starve as the credit will never arrive
+      checkCredits(Math.max(windowSize * 2, beforeFailure));
+   }
+
+   @Override
+   public void close() {
+      // Closing a producer that is blocking should make it return
+      closed = true;
+   }
+
+   @Override
+   public synchronized void incrementRefCount() {
+      refCount++;
+   }
+
+   @Override
+   public synchronized int decrementRefCount() {
+      return --refCount;
+   }
+
+   public abstract int getBalance();
+
+   protected void checkCredits(final int credits) {
+      int needed = Math.max(credits, windowSize);
+
+      int toRequest = -1;
+
+      synchronized (this) {
+         if (getBalance() + arriving < needed) {
+            toRequest = needed - arriving;
+
+            pendingCredits += toRequest;
+            arriving += toRequest;
+         }
+      }
+
+      if (toRequest != -1) {
+         requestCredits(toRequest);
+      }
+   }
+
+   private void requestCredits(final int credits) {
+      session.sendProducerCreditsMessage(credits, address);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
new file mode 100644
index 0000000..a49122f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl {
+
+   int balance;
+
+   final ClientProducerFlowCallback callback;
+
+   public AsynchronousProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize,
+                                          ClientProducerFlowCallback callback) {
+      super(session, address, windowSize);
+      balance = windowSize;
+      this.callback = callback;
+   }
+
+   @Override
+   protected synchronized void actualAcquire(int credits) {
+      synchronized (this) {
+         balance -= credits;
+         if (balance <= 0) {
+            callback.onCreditsFlow(true, this);
+         }
+      }
+
+   }
+
+   @Override
+   public int getBalance() {
+      return balance;
+   }
+
+   @Override
+   public void receiveCredits(int credits) {
+      synchronized (this) {
+         super.receiveCredits(credits);
+         balance += credits;
+         callback.onCreditsFlow(balance <= 0, this);
+      }
+
+   }
+
+
+   @Override
+   public void receiveFailCredits(final int credits) {
+      super.receiveFailCredits(credits);
+      callback.onCreditsFail(this);
+   }
+
+   @Override
+   public void releaseOutstanding() {
+      synchronized (this) {
+         balance = 0;
+         callback.onCreditsFlow(true, this);
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
index 3c10e1a..bb65f72 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
@@ -36,4 +36,8 @@ public interface ClientProducerCreditManager {
    int creditsMapSize();
 
    int unReferencedCreditsSize();
+
+   /** This will determine the flow control as asynchronous,
+    *  no actual block should happen instead a callback will be sent whenever blockages change  */
+   void setCallback(ClientProducerFlowCallback callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index 80b255f..cd2db9c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -35,12 +35,22 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
 
    private int windowSize;
 
+   private ClientProducerFlowCallback callback;
+
    public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
       this.session = session;
 
       this.windowSize = windowSize;
    }
 
+
+   /** This will determine the flow control as asynchronous,
+    *  no actual block should happen instead a callback will be sent whenever blockages change  */
+   @Override
+   public void setCallback(ClientProducerFlowCallback callback) {
+      this.callback = callback;
+   }
+
    @Override
    public synchronized ClientProducerCredits getCredits(final SimpleString address,
                                                         final boolean anon,
@@ -56,7 +66,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
 
             if (credits == null) {
                // Doesn't need to be fair since session is single threaded
-               credits = new ClientProducerCreditsImpl(session, address, windowSize);
+               credits = build(address);
                needInit = true;
 
                producerCredits.put(address, credits);
@@ -83,6 +93,14 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
       }
    }
 
+   private ClientProducerCredits build(SimpleString address) {
+      if (callback != null) {
+         return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);
+      } else {
+         return new ClientProducerCreditsImpl(session, address, windowSize);
+      }
+   }
+
    @Override
    public synchronized void returnCredits(final SimpleString address) {
       ClientProducerCredits credits = producerCredits.get(address);
@@ -210,6 +228,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
       public void releaseOutstanding() {
       }
 
+      @Override
+      public SimpleString getAddress() {
+         return SimpleString.toSimpleString("");
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index a97df92..321fda5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.client.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 
 public interface ClientProducerCredits {
@@ -40,4 +41,6 @@ public interface ClientProducerCredits {
    int decrementRefCount();
 
    void releaseOutstanding();
+
+   SimpleString getAddress();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index 75543fd..41a08c9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -19,96 +19,33 @@ package org.apache.activemq.artemis.core.client.impl;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
-import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 
-public class ClientProducerCreditsImpl implements ClientProducerCredits {
+public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl {
 
-   private final Semaphore semaphore;
-
-   private final int windowSize;
-
-   private volatile boolean closed;
-
-   private boolean blocked;
-
-   private final SimpleString address;
-
-   private final ClientSessionInternal session;
-
-   private int pendingCredits;
-
-   private int arriving;
-
-   private int refCount;
-
-   private boolean serverRespondedWithFail;
 
-   private SessionContext sessionContext;
-
-   public ClientProducerCreditsImpl(final ClientSessionInternal session,
-                                    final SimpleString address,
-                                    final int windowSize) {
-      this.session = session;
+   private final Semaphore semaphore;
 
-      this.address = address;
+   public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) {
+      super(session, address, windowSize);
 
-      this.windowSize = windowSize / 2;
 
       // Doesn't need to be fair since session is single threaded
-
       semaphore = new Semaphore(0, false);
-   }
 
-   @Override
-   public void init(SessionContext sessionContext) {
-      // We initial request twice as many credits as we request in subsequent requests
-      // This allows the producer to keep sending as more arrive, minimising pauses
-      checkCredits(windowSize);
-
-      this.sessionContext = sessionContext;
-
-      this.sessionContext.linkFlowControl(address, this);
    }
 
-   @Override
-   public void acquireCredits(final int credits) throws ActiveMQException {
-      checkCredits(credits);
-
-      boolean tryAcquire;
-
-      synchronized (this) {
-         tryAcquire = semaphore.tryAcquire(credits);
-      }
-
-      if (!tryAcquire) {
-         if (!closed) {
-            this.blocked = true;
-            try {
-               while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
-                  // I'm using string concatenation here in case address is null
-                  // better getting a "null" string than a NPE
-                  ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
-               }
-            } catch (InterruptedException interrupted) {
-               Thread.currentThread().interrupt();
-               throw new ActiveMQInterruptedException(interrupted);
-            } finally {
-               this.blocked = false;
-            }
-         }
-      }
-
-      synchronized (this) {
-         pendingCredits -= credits;
-      }
 
+   @Override
+   protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
       // check to see if the blocking mode is FAIL on the server
       synchronized (this) {
+         super.afterAcquired(credits);
+
          if (serverRespondedWithFail) {
             serverRespondedWithFail = false;
 
@@ -123,29 +60,30 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
    }
 
    @Override
-   public boolean isBlocked() {
-      return blocked;
-   }
-
-   public int getBalance() {
-      return semaphore.availablePermits();
-   }
+   protected void actualAcquire(int credits) {
 
-   @Override
-   public void receiveCredits(final int credits) {
+      boolean tryAcquire;
       synchronized (this) {
-         arriving -= credits;
+         tryAcquire = semaphore.tryAcquire(credits);
       }
 
-      semaphore.release(credits);
+      if (!tryAcquire && !closed) {
+         this.blocked = true;
+         try {
+            while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
+               // I'm using string concatenation here in case address is null
+               // better getting a "null" string than a NPE
+               ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
+            }
+         } catch (InterruptedException interrupted) {
+            Thread.currentThread().interrupt();
+            throw new ActiveMQInterruptedException(interrupted);
+         } finally {
+            this.blocked = false;
+         }
+      }
    }
 
-   @Override
-   public void receiveFailCredits(final int credits) {
-      serverRespondedWithFail = true;
-      // receive credits like normal to keep the sender from blocking
-      receiveCredits(credits);
-   }
 
    @Override
    public synchronized void reset() {
@@ -153,59 +91,38 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits {
 
       semaphore.drainPermits();
 
-      int beforeFailure = pendingCredits;
-
-      pendingCredits = 0;
-      arriving = 0;
-
-      // If we are waiting for more credits than what's configured, then we need to use what we tried before
-      // otherwise the client may starve as the credit will never arrive
-      checkCredits(Math.max(windowSize * 2, beforeFailure));
+      super.reset();
    }
 
+
    @Override
    public void close() {
-      // Closing a producer that is blocking should make it return
-      closed = true;
+      super.close();
 
+      // Closing a producer that is blocking should make it return
       semaphore.release(Integer.MAX_VALUE / 2);
    }
 
    @Override
-   public synchronized void incrementRefCount() {
-      refCount++;
-   }
+   public void receiveCredits(final int credits) {
+      synchronized (this) {
+         super.receiveCredits(credits);
+      }
 
-   @Override
-   public synchronized int decrementRefCount() {
-      return --refCount;
+      semaphore.release(credits);
    }
 
+
    @Override
    public synchronized void releaseOutstanding() {
       semaphore.drainPermits();
    }
 
-   private void checkCredits(final int credits) {
-      int needed = Math.max(credits, windowSize);
-
-      int toRequest = -1;
-
-      synchronized (this) {
-         if (semaphore.availablePermits() + arriving < needed) {
-            toRequest = needed - arriving;
-
-            pendingCredits += toRequest;
-            arriving += toRequest;
-         }
-      }
-
-      if (toRequest != -1) {
-         requestCredits(toRequest);
-      }
+   @Override
+   public int getBalance() {
+      return semaphore.availablePermits();
    }
 
-   private void requestCredits(final int credits) {
-      session.sendProducerCreditsMessage(credits, address);
-   }
+
 }
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
new file mode 100644
index 0000000..1fc289d
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerFlowCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.client.impl;
+
+public interface ClientProducerFlowCallback {
+   void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits);
+
+   void onCreditsFail(ClientProducerCredits credits);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 6037925..f4033ec 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -234,7 +234,7 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
-   public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) {
+   public void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits) {
       // nothing to be done here... Flow control here is done on the core side
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 058e606..6571335 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -34,7 +34,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
-import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.IDGenerator;
@@ -342,7 +342,7 @@ public abstract class SessionContext {
 
    public abstract void cleanup();
 
-   public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
+   public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits);
 
    public abstract boolean isWritable(ReadyListener callback);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 353403f..c3f0a7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1585,6 +1585,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 22272, value = "Message ack in prepared tx for queue {0} which does not exist. This ack will be ignored.", format = Message.Format.MESSAGE_FORMAT)
    void journalMessageAckMissingQueueInPreparedTX(Long queueID);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 22273,  value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
+   void bridgeAddressFull(String addressName, String bridgeName);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 9815d37..01596fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
@@ -70,7 +72,7 @@ import org.jboss.logging.Logger;
  * A Core BridgeImpl
  */
 
-public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
+public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
    // Constants -----------------------------------------------------
 
    private static final Logger logger = Logger.getLogger(BridgeImpl.class);
@@ -119,6 +121,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private final long maxRetryInterval;
 
+   private boolean blockedOnFlowControl;
+
    /**
     * Used when there's a scheduled reconnection
     */
@@ -254,6 +258,20 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    }
 
    @Override
+   public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) {
+      this.blockedOnFlowControl = blocked;
+      if (!blocked) {
+         queue.deliverAsync();
+      }
+   }
+
+   @Override
+   public void onCreditsFail(ClientProducerCredits producerCredits) {
+      ActiveMQServerLogger.LOGGER.bridgeAddressFull("" + producerCredits.getAddress(), "" + this.getName());
+      disconnect();
+   }
+
+   @Override
    public long sequentialID() {
       return sequentialID;
    }
@@ -541,6 +559,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       queue.deliverAsync();
    }
 
+
    @Override
    public HandleStatus handle(final MessageReference ref) throws Exception {
       if (filter != null && !filter.match(ref.getMessage())) {
@@ -555,6 +574,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             return HandleStatus.BUSY;
          }
 
+         if (blockedOnFlowControl) {
+            return HandleStatus.BUSY;
+         }
+
          if (deliveringLargeMessage) {
             return HandleStatus.BUSY;
          }
@@ -876,6 +899,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                }
                // Session is pre-acknowledge
                session = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
+               session.getProducerCreditManager().setCallback(this);
                sessionConsumer = (ClientSessionInternal) csf.createSession(user, password, false, true, true, true, 1);
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/70bdfe76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 219aa15..ae60a61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -1187,7 +1187,7 @@ public class BridgeTest extends ActiveMQTestBase {
       ArrayList<String> staticConnectors = new ArrayList<>();
       staticConnectors.add(server1tc.getName());
 
-      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors);
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(0).setStaticConnectors(staticConnectors).setProducerWindowSize(1);
 
       List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
       bridgeConfigs.add(bridgeConfiguration);


[2/2] activemq-artemis git commit: This closes #1979

Posted by ni...@apache.org.
This closes #1979


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4dbd03e5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4dbd03e5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4dbd03e5

Branch: refs/heads/master
Commit: 4dbd03e53b1358c94ba139b9275da6cc55bb5d60
Parents: 8f92997 70bdfe7
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Mar 29 14:55:25 2018 +0200
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Thu Mar 29 14:55:25 2018 +0200

----------------------------------------------------------------------
 .../impl/AbstractProducerCreditsImpl.java       | 167 +++++++++++++++++++
 .../impl/AsynchronousProducerCreditsImpl.java   |  76 +++++++++
 .../impl/ClientProducerCreditManager.java       |   4 +
 .../impl/ClientProducerCreditManagerImpl.java   |  24 ++-
 .../core/client/impl/ClientProducerCredits.java |   3 +
 .../client/impl/ClientProducerCreditsImpl.java  | 167 +++++--------------
 .../client/impl/ClientProducerFlowCallback.java |  24 +++
 .../core/impl/ActiveMQSessionContext.java       |   4 +-
 .../spi/core/remoting/SessionContext.java       |   4 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../core/server/cluster/impl/BridgeImpl.java    |  26 ++-
 .../integration/cluster/bridge/BridgeTest.java  |   2 +-
 12 files changed, 373 insertions(+), 132 deletions(-)
----------------------------------------------------------------------