You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/06/27 14:14:50 UTC

[1/2] activemq-artemis git commit: ARTEMIS-569 fix bridge producerWindowSize

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 425fe8675 -> 7d69d913e


ARTEMIS-569 fix bridge producerWindowSize

Something bizarre happened with commit
8f52a622d0d883ca5e9f60ba7754ed51de38cc5c in April 2015. It reverted the
changes from both c1111cc156684b15938ab3f8e34df9f4b64f57c4 and
ada112a6a37dce8ddf48e2238904421b2ca8e0dc. This commit fixes that.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ce9ea176
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ce9ea176
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ce9ea176

Branch: refs/heads/master
Commit: ce9ea1760a473531f8175c9ca3c72f03fd8c617d
Parents: 425fe86
Author: jbertram <jb...@apache.org>
Authored: Fri Jun 24 16:50:25 2016 -0500
Committer: jbertram <jb...@apache.org>
Committed: Fri Jun 24 16:54:48 2016 -0500

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  10 +
 .../core/management/ActiveMQServerControl.java  |   1 +
 .../core/config/BridgeConfiguration.java        |  18 ++
 .../config/ClusterConnectionConfiguration.java  | 290 +++++++++++--------
 .../deployers/impl/FileConfigurationParser.java |  49 +++-
 .../impl/ActiveMQServerControlImpl.java         |  19 +-
 .../core/server/cluster/ClusterManager.java     |   7 +-
 .../cluster/impl/ClusterConnectionImpl.java     |  11 +-
 .../resources/schema/artemis-configuration.xsd  |  16 +
 .../core/config/impl/FileConfigurationTest.java |   4 +
 .../resources/ConfigurationTest-full-config.xml |   4 +
 .../integration/cluster/bridge/BridgeTest.java  | 195 +++++++++++++
 .../management/ActiveMQServerControlTest.java   |   1 +
 13 files changed, 492 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 8fe6e40..1239b0b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -301,6 +301,9 @@ public final class ActiveMQDefaultConfiguration {
    // Once the bridge has received this many bytes, it sends a confirmation
    private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
 
+   // Producer flow control
+   private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
+
    // Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
    private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10;
 
@@ -841,6 +844,13 @@ public final class ActiveMQDefaultConfiguration {
    }
 
    /**
+    * Producer flow control
+    */
+   public static int getDefaultBridgeProducerWindowSize() {
+      return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE;
+   }
+
+   /**
     * Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
     */
    public static int getDefaultBridgeConnectSameNode() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 677bfb6..de9bc9f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -698,6 +698,7 @@ public interface ActiveMQServerControl {
                      @Parameter(name = "reconnectAttempts", desc = "Number of reconnection attempts") int reconnectAttempts,
                      @Parameter(name = "useDuplicateDetection", desc = "Use duplicate detection") boolean useDuplicateDetection,
                      @Parameter(name = "confirmationWindowSize", desc = "Confirmation window size") int confirmationWindowSize,
+                     @Parameter(name = "producerWindowSize", desc = "Producer window size") int producerWindowSize,
                      @Parameter(name = "clientFailureCheckPeriod", desc = "Period to check client failure") long clientFailureCheckPeriod,
                      @Parameter(name = "staticConnectorNames", desc = "comma separated list of connector names or name of discovery group if 'useDiscoveryGroup' is set to true") String connectorNames,
                      @Parameter(name = "useDiscoveryGroup", desc = "use discovery  group") boolean useDiscoveryGroup,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index 0ce0ce8..f07fc17 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -56,6 +56,9 @@ public final class BridgeConfiguration implements Serializable {
 
    private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
 
+   // disable flow control
+   private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
+
    private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
 
    private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser();
@@ -267,6 +270,18 @@ public final class BridgeConfiguration implements Serializable {
       return this;
    }
 
+   public int getProducerWindowSize() {
+      return producerWindowSize;
+   }
+
+   /**
+    * @param producerWindowSize the producerWindowSize to set
+    */
+   public BridgeConfiguration setProducerWindowSize(final int producerWindowSize) {
+      this.producerWindowSize = producerWindowSize;
+      return this;
+   }
+
    public long getClientFailureCheckPeriod() {
       return clientFailureCheckPeriod;
    }
@@ -340,6 +355,7 @@ public final class BridgeConfiguration implements Serializable {
       result = prime * result + (int) (callTimeout ^ (callTimeout >>> 32));
       result = prime * result + (int) (clientFailureCheckPeriod ^ (clientFailureCheckPeriod >>> 32));
       result = prime * result + confirmationWindowSize;
+      result = prime * result + producerWindowSize;
       result = prime * result + (int) (connectionTTL ^ (connectionTTL >>> 32));
       result = prime * result + ((discoveryGroupName == null) ? 0 : discoveryGroupName.hashCode());
       result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
@@ -378,6 +394,8 @@ public final class BridgeConfiguration implements Serializable {
          return false;
       if (confirmationWindowSize != other.confirmationWindowSize)
          return false;
+      if (producerWindowSize != other.producerWindowSize)
+         return false;
       if (connectionTTL != other.connectionTTL)
          return false;
       if (discoveryGroupName == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
index 3c715af..2fecc6f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
@@ -62,7 +62,8 @@ public final class ClusterConnectionConfiguration implements Serializable {
 
    private boolean duplicateDetection = ActiveMQDefaultConfiguration.isDefaultClusterDuplicateDetection();
 
-   private MessageLoadBalancingType messageLoadBalancingType = Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration.getDefaultClusterMessageLoadBalancingType());
+   private MessageLoadBalancingType messageLoadBalancingType = Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration
+      .getDefaultClusterMessageLoadBalancingType());
 
    private URISupport.CompositeData compositeMembers;
 
@@ -74,6 +75,8 @@ public final class ClusterConnectionConfiguration implements Serializable {
 
    private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
 
+   private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
+
    private boolean allowDirectConnectionsOnly = false;
 
    private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@@ -108,15 +111,15 @@ public final class ClusterConnectionConfiguration implements Serializable {
       return this;
    }
 
+   public URISupport.CompositeData getCompositeMembers() {
+      return compositeMembers;
+   }
+
    public ClusterConnectionConfiguration setCompositeMembers(URISupport.CompositeData members) {
       this.compositeMembers = members;
       return this;
    }
 
-   public URISupport.CompositeData getCompositeMembers() {
-      return compositeMembers;
-   }
-
    /**
     * @return the clientFailureCheckPeriod
     */
@@ -125,6 +128,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
    }
 
    /**
+    * @param clientFailureCheckPeriod the clientFailureCheckPeriod to set
+    */
+   public ClusterConnectionConfiguration setClientFailureCheckPeriod(long clientFailureCheckPeriod) {
+      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+      return this;
+   }
+
+   /**
     * @return the connectionTTL
     */
    public long getConnectionTTL() {
@@ -132,6 +143,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
    }
 
    /**
+    * @param connectionTTL the connectionTTL to set
+    */
+   public ClusterConnectionConfiguration setConnectionTTL(long connectionTTL) {
+      this.connectionTTL = connectionTTL;
+      return this;
+   }
+
+   /**
     * @return the retryIntervalMultiplier
     */
    public double getRetryIntervalMultiplier() {
@@ -139,6 +158,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
    }
 
    /**
+    * @param retryIntervalMultiplier the retryIntervalMultiplier to set
+    */
+   public ClusterConnectionConfiguration setRetryIntervalMultiplier(double retryIntervalMultiplier) {
+      this.retryIntervalMultiplier = retryIntervalMultiplier;
+      return this;
+   }
+
+   /**
     * @return the maxRetryInterval
     */
    public long getMaxRetryInterval() {
@@ -146,6 +173,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
    }
 
    /**
+    * @param maxRetryInterval the maxRetryInterval to set
+    */
+   public ClusterConnectionConfiguration setMaxRetryInterval(long maxRetryInterval) {
+      this.maxRetryInterval = maxRetryInterval;
+      return this;
+   }
+
+   /**
     * @return the initialConnectAttempts
     */
    public int getInitialConnectAttempts() {
@@ -153,20 +188,52 @@ public final class ClusterConnectionConfiguration implements Serializable {
    }
 
    /**
+    * @param initialConnectAttempts the reconnectAttempts to set
+    */
+   public ClusterConnectionConfiguration setInitialConnectAttempts(int initialConnectAttempts) {
+      this.initialConnectAttempts = initialConnectAttempts;
+      return this;
+   }
+
+   /**
     * @return the reconnectAttempts
     */
    public int getReconnectAttempts() {
       return reconnectAttempts;
    }
 
+   /**
+    * @param reconnectAttempts the reconnectAttempts to set
+    */
+   public ClusterConnectionConfiguration setReconnectAttempts(int reconnectAttempts) {
+      this.reconnectAttempts = reconnectAttempts;
+      return this;
+   }
+
    public long getCallTimeout() {
       return callTimeout;
    }
 
+   /**
+    * @param callTimeout the callTimeout to set
+    */
+   public ClusterConnectionConfiguration setCallTimeout(long callTimeout) {
+      this.callTimeout = callTimeout;
+      return this;
+   }
+
    public long getCallFailoverTimeout() {
       return callFailoverTimeout;
    }
 
+   /**
+    * @param callFailoverTimeout the callTimeout to set
+    */
+   public ClusterConnectionConfiguration setCallFailoverTimeout(long callFailoverTimeout) {
+      this.callFailoverTimeout = callFailoverTimeout;
+      return this;
+   }
+
    public String getConnectorName() {
       return connectorName;
    }
@@ -180,10 +247,27 @@ public final class ClusterConnectionConfiguration implements Serializable {
       return duplicateDetection;
    }
 
+   /**
+    * @param duplicateDetection the duplicateDetection to set
+    */
+   public ClusterConnectionConfiguration setDuplicateDetection(boolean duplicateDetection) {
+      this.duplicateDetection = duplicateDetection;
+      return this;
+   }
+
    public MessageLoadBalancingType getMessageLoadBalancingType() {
       return messageLoadBalancingType;
    }
 
+   /**
+    * @param messageLoadBalancingType
+    * @return
+    */
+   public ClusterConnectionConfiguration setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
+      this.messageLoadBalancingType = messageLoadBalancingType;
+      return this;
+   }
+
    public int getMaxHops() {
       return maxHops;
    }
@@ -202,6 +286,15 @@ public final class ClusterConnectionConfiguration implements Serializable {
       return this;
    }
 
+   public int getProducerWindowSize() {
+      return producerWindowSize;
+   }
+
+   public ClusterConnectionConfiguration setProducerindowSize(int producerWindowSize) {
+      this.producerWindowSize = producerWindowSize;
+      return this;
+   }
+
    public List<String> getStaticConnectors() {
       return staticConnectors;
    }
@@ -224,6 +317,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
       return retryInterval;
    }
 
+   /**
+    * @param retryInterval the retryInterval to set
+    */
+   public ClusterConnectionConfiguration setRetryInterval(long retryInterval) {
+      this.retryInterval = retryInterval;
+      return this;
+   }
+
    public boolean isAllowDirectConnectionsOnly() {
       return allowDirectConnectionsOnly;
    }
@@ -248,95 +349,6 @@ public final class ClusterConnectionConfiguration implements Serializable {
       return this;
    }
 
-   /**
-    * @param clientFailureCheckPeriod the clientFailureCheckPeriod to set
-    */
-   public ClusterConnectionConfiguration setClientFailureCheckPeriod(long clientFailureCheckPeriod) {
-      this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-      return this;
-   }
-
-   /**
-    * @param connectionTTL the connectionTTL to set
-    */
-   public ClusterConnectionConfiguration setConnectionTTL(long connectionTTL) {
-      this.connectionTTL = connectionTTL;
-      return this;
-   }
-
-   /**
-    * @param retryInterval the retryInterval to set
-    */
-   public ClusterConnectionConfiguration setRetryInterval(long retryInterval) {
-      this.retryInterval = retryInterval;
-      return this;
-   }
-
-   /**
-    * @param retryIntervalMultiplier the retryIntervalMultiplier to set
-    */
-   public ClusterConnectionConfiguration setRetryIntervalMultiplier(double retryIntervalMultiplier) {
-      this.retryIntervalMultiplier = retryIntervalMultiplier;
-      return this;
-   }
-
-   /**
-    * @param maxRetryInterval the maxRetryInterval to set
-    */
-   public ClusterConnectionConfiguration setMaxRetryInterval(long maxRetryInterval) {
-      this.maxRetryInterval = maxRetryInterval;
-      return this;
-   }
-
-   /**
-    * @param initialConnectAttempts the reconnectAttempts to set
-    */
-   public ClusterConnectionConfiguration setInitialConnectAttempts(int initialConnectAttempts) {
-      this.initialConnectAttempts = initialConnectAttempts;
-      return this;
-   }
-
-   /**
-    * @param reconnectAttempts the reconnectAttempts to set
-    */
-   public ClusterConnectionConfiguration setReconnectAttempts(int reconnectAttempts) {
-      this.reconnectAttempts = reconnectAttempts;
-      return this;
-   }
-
-   /**
-    * @param callTimeout the callTimeout to set
-    */
-   public ClusterConnectionConfiguration setCallTimeout(long callTimeout) {
-      this.callTimeout = callTimeout;
-      return this;
-   }
-
-   /**
-    * @param callFailoverTimeout the callTimeout to set
-    */
-   public ClusterConnectionConfiguration setCallFailoverTimeout(long callFailoverTimeout) {
-      this.callFailoverTimeout = callFailoverTimeout;
-      return this;
-   }
-
-   /**
-    * @param duplicateDetection the duplicateDetection to set
-    */
-   public ClusterConnectionConfiguration setDuplicateDetection(boolean duplicateDetection) {
-      this.duplicateDetection = duplicateDetection;
-      return this;
-   }
-
-   /**
-    * @param messageLoadBalancingType
-    * @return
-    */
-   public ClusterConnectionConfiguration setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
-      this.messageLoadBalancingType = messageLoadBalancingType;
-      return this;
-   }
-
    /*
    * returns the cluster update interval
    * */
@@ -457,77 +469,107 @@ public final class ClusterConnectionConfiguration implements Serializable {
 
    @Override
    public boolean equals(Object obj) {
-      if (this == obj)
+      if (this == obj) {
          return true;
-      if (obj == null)
+      }
+      if (obj == null) {
          return false;
-      if (getClass() != obj.getClass())
+      }
+      if (getClass() != obj.getClass()) {
          return false;
+      }
       ClusterConnectionConfiguration other = (ClusterConnectionConfiguration) obj;
       if (address == null) {
-         if (other.address != null)
+         if (other.address != null) {
             return false;
+         }
       }
-      else if (!address.equals(other.address))
+      else if (!address.equals(other.address)) {
          return false;
-      if (allowDirectConnectionsOnly != other.allowDirectConnectionsOnly)
+      }
+      if (allowDirectConnectionsOnly != other.allowDirectConnectionsOnly) {
          return false;
-      if (callFailoverTimeout != other.callFailoverTimeout)
+      }
+      if (callFailoverTimeout != other.callFailoverTimeout) {
          return false;
-      if (callTimeout != other.callTimeout)
+      }
+      if (callTimeout != other.callTimeout) {
          return false;
-      if (clientFailureCheckPeriod != other.clientFailureCheckPeriod)
+      }
+      if (clientFailureCheckPeriod != other.clientFailureCheckPeriod) {
          return false;
-      if (clusterNotificationAttempts != other.clusterNotificationAttempts)
+      }
+      if (clusterNotificationAttempts != other.clusterNotificationAttempts) {
          return false;
-      if (clusterNotificationInterval != other.clusterNotificationInterval)
+      }
+      if (clusterNotificationInterval != other.clusterNotificationInterval) {
          return false;
-      if (confirmationWindowSize != other.confirmationWindowSize)
+      }
+      if (confirmationWindowSize != other.confirmationWindowSize) {
          return false;
-      if (connectionTTL != other.connectionTTL)
+      }
+      if (connectionTTL != other.connectionTTL) {
          return false;
+      }
       if (connectorName == null) {
-         if (other.connectorName != null)
+         if (other.connectorName != null) {
             return false;
+         }
       }
-      else if (!connectorName.equals(other.connectorName))
+      else if (!connectorName.equals(other.connectorName)) {
          return false;
+      }
       if (discoveryGroupName == null) {
-         if (other.discoveryGroupName != null)
+         if (other.discoveryGroupName != null) {
             return false;
+         }
       }
-      else if (!discoveryGroupName.equals(other.discoveryGroupName))
+      else if (!discoveryGroupName.equals(other.discoveryGroupName)) {
          return false;
-      if (duplicateDetection != other.duplicateDetection)
+      }
+      if (duplicateDetection != other.duplicateDetection) {
          return false;
-      if (messageLoadBalancingType != other.messageLoadBalancingType)
+      }
+      if (messageLoadBalancingType != other.messageLoadBalancingType) {
          return false;
-      if (maxHops != other.maxHops)
+      }
+      if (maxHops != other.maxHops) {
          return false;
-      if (maxRetryInterval != other.maxRetryInterval)
+      }
+      if (maxRetryInterval != other.maxRetryInterval) {
          return false;
-      if (minLargeMessageSize != other.minLargeMessageSize)
+      }
+      if (minLargeMessageSize != other.minLargeMessageSize) {
          return false;
+      }
       if (name == null) {
-         if (other.name != null)
+         if (other.name != null) {
             return false;
+         }
       }
-      else if (!name.equals(other.name))
+      else if (!name.equals(other.name)) {
          return false;
-      if (initialConnectAttempts != other.initialConnectAttempts)
+      }
+      if (initialConnectAttempts != other.initialConnectAttempts) {
          return false;
-      if (reconnectAttempts != other.reconnectAttempts)
+      }
+      if (reconnectAttempts != other.reconnectAttempts) {
          return false;
-      if (retryInterval != other.retryInterval)
+      }
+      if (retryInterval != other.retryInterval) {
          return false;
-      if (Double.doubleToLongBits(retryIntervalMultiplier) != Double.doubleToLongBits(other.retryIntervalMultiplier))
+      }
+      if (Double.doubleToLongBits(retryIntervalMultiplier) != Double.doubleToLongBits(other.retryIntervalMultiplier)) {
          return false;
+      }
       if (staticConnectors == null) {
-         if (other.staticConnectors != null)
+         if (other.staticConnectors != null) {
             return false;
+         }
       }
-      else if (!staticConnectors.equals(other.staticConnectors))
+      else if (!staticConnectors.equals(other.staticConnectors)) {
          return false;
+      }
       return true;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index b6e1c7b..ca4bb52 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1314,6 +1314,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       int confirmationWindowSize = getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), Validators.GT_ZERO);
 
+      int producerWindowSize = getInteger(e, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), Validators.MINUS_ONE_OR_GT_ZERO);
+
       long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO);
 
       int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
@@ -1343,7 +1345,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          }
       }
 
-      ClusterConnectionConfiguration config = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setDuplicateDetection(duplicateDetection).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(confirmationWindowSize).setAllowDirectConnectionsOnly(allowDirectConnectionsOnly).setClusterNotificationInterval(clusterNotificationInterval).setClusterNotificationAttempts(clusterNotificationAttempts);
+      ClusterConnectionConfiguration config = new ClusterConnectionConfiguration()
+         .setName(name)
+         .setAddress(address)
+         .setConnectorName(connectorName)
+         .setMinLargeMessageSize(minLargeMessageSize)
+         .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+         .setConnectionTTL(connectionTTL)
+         .setRetryInterval(retryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setMaxRetryInterval(maxRetryInterval)
+         .setInitialConnectAttempts(initialConnectAttempts)
+         .setReconnectAttempts(reconnectAttempts)
+         .setCallTimeout(callTimeout)
+         .setCallFailoverTimeout(callFailoverTimeout)
+         .setDuplicateDetection(duplicateDetection)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
+         .setMaxHops(maxHops)
+         .setConfirmationWindowSize(confirmationWindowSize)
+         .setProducerindowSize(producerWindowSize)
+         .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
+         .setClusterNotificationInterval(clusterNotificationInterval)
+         .setClusterNotificationAttempts(clusterNotificationAttempts);
 
       if (discoveryGroupName == null) {
          config.setStaticConnectors(staticConnectorNames);
@@ -1377,6 +1400,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       // Default bridge conf
       int confirmationWindowSize = getInteger(brNode, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), Validators.GT_ZERO);
 
+      int producerWindowSize = getInteger(brNode, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), Validators.GT_ZERO);
+
       long retryInterval = getLong(brNode, "retry-interval", ActiveMQClient.DEFAULT_RETRY_INTERVAL, Validators.GT_ZERO);
 
       long clientFailureCheckPeriod = getLong(brNode, "check-period", ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, Validators.GT_ZERO);
@@ -1444,7 +1469,27 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          }
       }
 
-      BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
+      BridgeConfiguration config = new BridgeConfiguration()
+         .setName(name)
+         .setQueueName(queueName)
+         .setForwardingAddress(forwardingAddress)
+         .setFilterString(filterString)
+         .setTransformerClassName(transformerClassName)
+         .setMinLargeMessageSize(minLargeMessageSize)
+         .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+         .setConnectionTTL(connectionTTL)
+         .setRetryInterval(retryInterval)
+         .setMaxRetryInterval(maxRetryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setInitialConnectAttempts(initialConnectAttempts)
+         .setReconnectAttempts(reconnectAttempts)
+         .setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
+         .setUseDuplicateDetection(useDuplicateDetection)
+         .setConfirmationWindowSize(confirmationWindowSize)
+         .setProducerWindowSize(producerWindowSize)
+         .setHA(ha)
+         .setUser(user)
+         .setPassword(password);
 
       if (!staticConnectorNames.isEmpty()) {
          config.setStaticConnectors(staticConnectorNames);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 4c47e74..8202f14 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1722,6 +1722,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                             final int reconnectAttempts,
                             final boolean useDuplicateDetection,
                             final int confirmationWindowSize,
+                            final int producerWindowSize,
                             final long clientFailureCheckPeriod,
                             final String staticConnectorsOrDiscoveryGroup,
                             boolean useDiscoveryGroup,
@@ -1733,7 +1734,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       clearIO();
 
       try {
-         BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
+         BridgeConfiguration config = new BridgeConfiguration()
+            .setName(name)
+            .setQueueName(queueName)
+            .setForwardingAddress(forwardingAddress)
+            .setFilterString(filterString)
+            .setTransformerClassName(transformerClassName)
+            .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+            .setRetryInterval(retryInterval)
+            .setRetryIntervalMultiplier(retryIntervalMultiplier)
+            .setInitialConnectAttempts(initialConnectAttempts)
+            .setReconnectAttempts(reconnectAttempts)
+            .setUseDuplicateDetection(useDuplicateDetection)
+            .setConfirmationWindowSize(confirmationWindowSize)
+            .setProducerWindowSize(producerWindowSize)
+            .setHA(ha)
+            .setUser(user)
+            .setPassword(password);
 
          if (useDiscoveryGroup) {
             config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 5f3c44b..ef39ba2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -465,8 +465,7 @@ public final class ClusterManager implements ActiveMQComponent {
       serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
       serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
-      //disable flow control
-      serverLocator.setProducerWindowSize(-1);
+      serverLocator.setProducerWindowSize(config.getProducerWindowSize());
 
       // This will be set to 30s unless it's changed from embedded / testing
       // there is no reason to exception the config for this timeout
@@ -615,7 +614,7 @@ public final class ClusterManager implements ActiveMQComponent {
                                                  dg);
          }
 
-         clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
+         clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
 
          clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
       }
@@ -626,7 +625,7 @@ public final class ClusterManager implements ActiveMQComponent {
             logger.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
          }
 
-         clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
+         clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
 
          clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 82b1e4f..d009e79 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -119,6 +119,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
    private final int confirmationWindowSize;
 
+   private final int producerWindowSize;
+
    /**
     * Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap},
     * however we need the guard to synchronize multiple step operations during topology updates.
@@ -184,6 +186,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
                                 final boolean useDuplicateDetection,
                                 final MessageLoadBalancingType messageLoadBalancingType,
                                 final int confirmationWindowSize,
+                                final int producerWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final ActiveMQServer server,
                                 final PostOffice postOffice,
@@ -224,6 +227,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
       this.confirmationWindowSize = confirmationWindowSize;
 
+      this.producerWindowSize = producerWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.clusterNotificationInterval = clusterNotificationInterval;
@@ -290,6 +295,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
                                 final boolean useDuplicateDetection,
                                 final MessageLoadBalancingType messageLoadBalancingType,
                                 final int confirmationWindowSize,
+                                final int producerWindowSize,
                                 final ExecutorFactory executorFactory,
                                 final ActiveMQServer server,
                                 final PostOffice postOffice,
@@ -336,6 +342,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
       this.confirmationWindowSize = confirmationWindowSize;
 
+      this.producerWindowSize = producerWindowSize;
+
       this.executorFactory = executorFactory;
 
       this.clusterNotificationInterval = clusterNotificationInterval;
@@ -601,8 +609,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
          serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
          serverLocator.setCallTimeout(callTimeout);
          serverLocator.setCallFailoverTimeout(callFailoverTimeout);
-         // No producer flow control on the bridges, as we don't want to lock the queues
-         serverLocator.setProducerWindowSize(-1);
+         serverLocator.setProducerWindowSize(producerWindowSize);
 
          if (retryInterval > 0) {
             this.serverLocator.setRetryInterval(retryInterval);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 93d2a9e..0d369ba 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1116,6 +1116,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="-1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Producer flow control
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
@@ -1341,6 +1349,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="-1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Producer flow control
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="call-failover-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 63962fb..f7db6e7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -215,6 +215,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
             Assert.assertEquals(true, bc.isUseDuplicateDetection());
             Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
             Assert.assertEquals(null, bc.getDiscoveryGroupName());
+            Assert.assertEquals(444, bc.getProducerWindowSize());
          }
          else {
             Assert.assertEquals("bridge2", bc.getName());
@@ -224,6 +225,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
             Assert.assertEquals(null, bc.getTransformerClassName());
             Assert.assertEquals(null, bc.getStaticConnectors());
             Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
+            Assert.assertEquals(555, bc.getProducerWindowSize());
          }
       }
 
@@ -256,6 +258,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
             Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
             Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
             Assert.assertEquals(null, ccc.getDiscoveryGroupName());
+            Assert.assertEquals(222, ccc.getProducerWindowSize());
          }
          else {
             Assert.assertEquals("cluster-connection2", ccc.getName());
@@ -268,6 +271,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
             Assert.assertEquals(2, ccc.getMaxHops());
             Assert.assertEquals(Collections.emptyList(), ccc.getStaticConnectors());
             Assert.assertEquals("dg1", ccc.getDiscoveryGroupName());
+            Assert.assertEquals(333, ccc.getProducerWindowSize());
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index ec6d6e9..9304745 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -142,6 +142,7 @@
             <reconnect-attempts>2</reconnect-attempts>
             <failover-on-server-shutdown>false</failover-on-server-shutdown>
             <use-duplicate-detection>true</use-duplicate-detection>
+            <producer-window-size>444</producer-window-size>
             <static-connectors>
                <connector-ref>connector1</connector-ref>
             </static-connectors>
@@ -149,6 +150,7 @@
          <bridge name="bridge2">
             <queue-name>queue2</queue-name>
             <forwarding-address>bridge-forwarding-address2</forwarding-address>
+            <producer-window-size>555</producer-window-size>
             <discovery-group-ref discovery-group-name="dg1"/>
          </bridge>
       </bridges>
@@ -180,6 +182,7 @@
             <use-duplicate-detection>true</use-duplicate-detection>
             <message-load-balancing>ON_DEMAND</message-load-balancing>
             <max-hops>1</max-hops>
+            <producer-window-size>222</producer-window-size>
             <call-failover-timeout>123</call-failover-timeout>
             <static-connectors>
                <connector-ref>connector1</connector-ref>
@@ -194,6 +197,7 @@
             <use-duplicate-detection>false</use-duplicate-detection>
             <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>2</max-hops>
+            <producer-window-size>333</producer-window-size>
             <call-failover-timeout>456</call-failover-timeout>
             <discovery-group-ref discovery-group-name="dg1"/>
          </cluster-connection>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index afd7697..0da4561 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1621,6 +1627,195 @@ public class BridgeTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testBridgeWithVeryLargeMessage() throws Exception {
+      ActiveMQServer server0 = null;
+      ActiveMQServer server1 = null;
+
+      final int PAGE_MAX = 1024 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+      ServerLocator locator = null;
+
+      try {
+         Map<String, Object> server0Params = new HashMap<>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+         Map<String, Object> server1Params = new HashMap<>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<>();
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<>();
+         staticConnectors.add(server1tc.getName());
+
+         int minLargeMessageSize = 1024 * 1024;
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
+            .setName("bridge1")
+            .setQueueName(queueName0)
+            .setForwardingAddress(forwardAddress)
+            .setRetryInterval(1000)
+            .setReconnectAttemptsOnSameNode(-1)
+            .setUseDuplicateDetection(false)
+            .setConfirmationWindowSize(1024)
+            .setStaticConnectors(staticConnectors)
+            .setMinLargeMessageSize(minLargeMessageSize)
+            .setProducerWindowSize(minLargeMessageSize / 2);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
+            .setAddress(forwardAddress)
+            .setName(queueName1);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         server0.start();
+
+         locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         //create a large message bigger than Integer.MAX_VALUE
+         final long largeMessageSize = Integer.MAX_VALUE + 1000L;
+
+         ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize);
+
+         producer0.send(largeMessage);
+
+         session0.commit();
+
+         //check target queue for large message arriving
+         ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1));
+         long messageCount = query.getMessageCount();
+         int count = 0;
+         //wait for 300 sec max
+         while (messageCount == 0 && count < 300) {
+            count++;
+            Thread.sleep(1000);
+            query = session1.queueQuery(new SimpleString(queueName1));
+            messageCount = query.getMessageCount();
+         }
+
+         if (messageCount == 0) {
+            fail("large message didn't arrived after 5 min!");
+         }
+
+         //receive the message
+         ClientMessage message = consumer1.receive(5000);
+         message.acknowledge();
+
+         File outputFile = new File(getTemporaryDir(), "huge_message_received.dat");
+
+         System.out.println("-----message save to: " + outputFile.getAbsolutePath());
+         FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
+
+         BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
+
+         message.setOutputStream(bufferedOutput);
+
+         if (!message.waitOutputStreamCompletion(5 * 60 * 1000)) {
+            fail("message didn't get received to disk in 5 min. Is the machine slow?");
+         }
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+
+      }
+      finally {
+         if (locator != null) {
+            locator.close();
+         }
+         try {
+            server0.stop();
+         }
+         catch (Throwable ignored) {
+         }
+
+         try {
+            server1.stop();
+         }
+         catch (Throwable ignored) {
+         }
+      }
+
+      assertEquals(0, loadQueues(server0).size());
+   }
+
+   private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize) throws Exception {
+      File fileInput = new File(getTemporaryDir(), "huge_message_to_send.dat");
+
+      createFile(fileInput, largeMessageSize);
+
+      System.out.println("File created at: " + fileInput.getAbsolutePath());
+
+      ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true);
+
+      FileInputStream fileInputStream = new FileInputStream(fileInput);
+      BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+
+      message.setBodyInputStream(bufferedInput);
+
+      return message;
+   }
+
+   private static void createFile(final File file, final long fileSize) throws IOException {
+      if (file.exists()) {
+         System.out.println("---file already there " + file.length());
+         return;
+      }
+      FileOutputStream fileOut = new FileOutputStream(file);
+      BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
+      byte[] outBuffer = new byte[1024 * 1024];
+      System.out.println(" --- creating file, size: " + fileSize);
+      for (long i = 0; i < fileSize; i += outBuffer.length) {
+         buffOut.write(outBuffer);
+      }
+      buffOut.close();
+   }
+
+   @Test
    public void testNullForwardingAddress() throws Exception {
       Map<String, Object> server0Params = new HashMap<>();
       server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce9ea176/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index aa4d685..0e44bae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -667,6 +667,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
                                  null, // filterString
                                  ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.INITIAL_CONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, false, // duplicateDetection
                                  1, // confirmationWindowSize
+                                 -1, // producerWindowSize
                                  ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, connectorConfig.getName(), // liveConnector
                                  false, false, null, null);
 


[2/2] activemq-artemis git commit: This closes #600

Posted by cl...@apache.org.
This closes #600


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7d69d913
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7d69d913
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7d69d913

Branch: refs/heads/master
Commit: 7d69d913e495695e3466044fa5a279f816a94790
Parents: 425fe86 ce9ea17
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jun 27 10:10:11 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jun 27 10:10:11 2016 -0400

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  10 +
 .../core/management/ActiveMQServerControl.java  |   1 +
 .../core/config/BridgeConfiguration.java        |  18 ++
 .../config/ClusterConnectionConfiguration.java  | 290 +++++++++++--------
 .../deployers/impl/FileConfigurationParser.java |  49 +++-
 .../impl/ActiveMQServerControlImpl.java         |  19 +-
 .../core/server/cluster/ClusterManager.java     |   7 +-
 .../cluster/impl/ClusterConnectionImpl.java     |  11 +-
 .../resources/schema/artemis-configuration.xsd  |  16 +
 .../core/config/impl/FileConfigurationTest.java |   4 +
 .../resources/ConfigurationTest-full-config.xml |   4 +
 .../integration/cluster/bridge/BridgeTest.java  | 195 +++++++++++++
 .../management/ActiveMQServerControlTest.java   |   1 +
 13 files changed, 492 insertions(+), 133 deletions(-)
----------------------------------------------------------------------