You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/13 13:19:17 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2075 - allow Extra backups to try to replicate more than once

Repository: activemq-artemis
Updated Branches:
  refs/heads/master ec24ee456 -> bf0eede93


ARTEMIS-2075 - allow Extra backups to try to replicate more than once

https://issues.apache.org/jira/browse/ARTEMIS-2075


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3b34127b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3b34127b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3b34127b

Branch: refs/heads/master
Commit: 3b34127bb3ce6720ff794551a2877ba9794f0960
Parents: ec24ee4
Author: andytaylor <an...@gmail.com>
Authored: Wed Sep 5 08:41:06 2018 +0100
Committer: andytaylor <an...@gmail.com>
Committed: Thu Sep 13 10:02:45 2018 +0100

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  6 +++
 .../artemis/core/config/ConfigurationUtils.java |  4 +-
 .../config/ha/ReplicaPolicyConfiguration.java   | 10 ++++
 .../ha/ReplicatedPolicyConfiguration.java       | 10 ++++
 .../deployers/impl/FileConfigurationParser.java |  4 ++
 .../core/server/cluster/ha/ReplicaPolicy.java   | 15 +++++-
 .../server/cluster/ha/ReplicatedPolicy.java     | 13 ++++-
 .../impl/AnyLiveNodeLocatorForReplication.java  |  9 +++-
 .../NamedLiveNodeLocatorForReplication.java     | 13 +++--
 .../impl/SharedNothingBackupActivation.java     |  2 +-
 .../resources/schema/artemis-configuration.xsd  | 14 +++++
 .../config/impl/HAPolicyConfigurationTest.java  |  2 +
 .../test/resources/replica-hapolicy-config.xml  |  1 +
 .../resources/replicated-hapolicy-config.xml    |  1 +
 .../failover/ReplicatedFailoverTest.java        | 54 ++++++++++++++++++++
 15 files changed, 147 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 45397f3..bd7ce51 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -504,6 +504,8 @@ public final class ActiveMQDefaultConfiguration {
    //how long we wait for vote result, 30 secs
    private static int DEFAULT_QUORUM_VOTE_WAIT = 30;
 
+   private static long DEFAULT_RETRY_REPLICATION_WAIT = 2000;
+
    public static int DEFAULT_QUORUM_SIZE = -1;
 
    public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
@@ -1384,4 +1386,8 @@ public final class ActiveMQDefaultConfiguration {
    public static int getDefaultQuorumVoteWait() {
       return DEFAULT_QUORUM_VOTE_WAIT;
    }
+
+   public static long getDefaultRetryReplicationWait() {
+      return DEFAULT_RETRY_REPLICATION_WAIT;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
index 6d4661b..a314947 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
@@ -73,11 +73,11 @@ public final class ConfigurationUtils {
          }
          case REPLICATED: {
             ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
-            return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait());
+            return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
          }
          case REPLICA: {
             ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
-            return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait());
+            return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
          }
          case SHARED_STORE_MASTER: {
             SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
index e02bb56..4bf24a7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
@@ -49,6 +49,8 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
 
    private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait();
 
+   private long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait();
+
    public ReplicaPolicyConfiguration() {
    }
 
@@ -170,4 +172,12 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
    public void setQuorumVoteWait(int quorumVoteWait) {
       this.quorumVoteWait = quorumVoteWait;
    }
+
+   public long getRetryReplicationWait() {
+      return retryReplicationWait;
+   }
+
+   public void setRetryReplicationWait(long retryReplicationWait) {
+      this.retryReplicationWait = retryReplicationWait;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
index 3acad77..162f095 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
@@ -39,6 +39,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
 
    private  int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait();
 
+   private Long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait();
+
    public ReplicatedPolicyConfiguration() {
    }
 
@@ -129,4 +131,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
       this.quorumVoteWait = quorumVoteWait;
       return this;
    }
+
+   public void setRetryReplicationWait(Long retryReplicationWait) {
+      this.retryReplicationWait = retryReplicationWait;
+   }
+
+   public Long getRetryReplicationWait() {
+      return retryReplicationWait;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index b1b2c0e..9bc292b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1336,6 +1336,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
 
+      configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
+
       configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
 
       return configuration;
@@ -1367,6 +1369,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
 
+      configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
+
       configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
 
       return configuration;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
index 7a4b06b..36e65f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
@@ -59,6 +59,8 @@ public class ReplicaPolicy extends BackupPolicy {
 
    private final int quorumVoteWait;
 
+   private long retryReplicationWait;
+
    public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, int quorumVoteWait) {
       this.networkHealthCheck = networkHealthCheck;
       this.quorumVoteWait = quorumVoteWait;
@@ -84,7 +86,8 @@ public class ReplicaPolicy extends BackupPolicy {
                         int quorumSize,
                         int voteRetries,
                         long voteRetryWait,
-                        int quorumVoteWait) {
+                        int quorumVoteWait,
+                        long retryReplicationWait) {
       this.clusterName = clusterName;
       this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
       this.groupName = groupName;
@@ -94,10 +97,12 @@ public class ReplicaPolicy extends BackupPolicy {
       this.quorumSize = quorumSize;
       this.voteRetries = voteRetries;
       this.voteRetryWait = voteRetryWait;
+      this.retryReplicationWait = retryReplicationWait;
       this.scaleDownPolicy = scaleDownPolicy;
       this.networkHealthCheck = networkHealthCheck;
       this.voteOnReplicationFailure = voteOnReplicationFailure;
       this.quorumVoteWait = quorumVoteWait;
+      this.retryReplicationWait = retryReplicationWait;
    }
 
    public ReplicaPolicy(String clusterName,
@@ -247,4 +252,12 @@ public class ReplicaPolicy extends BackupPolicy {
    public int getQuorumVoteWait() {
       return quorumVoteWait;
    }
+
+   public long getRetryReplicationWait() {
+      return retryReplicationWait;
+   }
+
+   public void setretryReplicationWait(long retryReplicationWait) {
+      this.retryReplicationWait = retryReplicationWait;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
index e170429..99b98fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
@@ -54,6 +54,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
 
    private long voteRetryWait;
 
+   private long retryReplicationWait;
+
    /*
    * this are only used as the policy when the server is started as a live after a failover
    * */
@@ -78,7 +80,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
                            int quorumSize,
                            int voteRetries,
                            long voteRetryWait,
-                           int quorumVoteWait) {
+                           int quorumVoteWait,
+                           long retryReplicationWait) {
       this.checkForLiveServer = checkForLiveServer;
       this.groupName = groupName;
       this.clusterName = clusterName;
@@ -89,6 +92,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
       this.voteRetries = voteRetries;
       this.voteRetryWait = voteRetryWait;
       this.quorumVoteWait = quorumVoteWait;
+      this.retryReplicationWait = retryReplicationWait;
    }
 
    public ReplicatedPolicy(boolean checkForLiveServer,
@@ -159,6 +163,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
          replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
          replicaPolicy.setVoteRetries(voteRetries);
          replicaPolicy.setVoteRetryWait(voteRetryWait);
+         replicaPolicy.setretryReplicationWait(retryReplicationWait);
          if (clusterName != null && clusterName.length() > 0) {
             replicaPolicy.setClusterName(clusterName);
          }
@@ -241,4 +246,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
    public int getQuorumVoteWait() {
       return quorumVoteWait;
    }
-}
\ No newline at end of file
+
+   public long getRetryReplicationWait() {
+      return retryReplicationWait;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
index a446b2e..015339a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -40,14 +41,16 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private final ActiveMQServerImpl server;
+   private final long retryReplicationWait;
    Map<String, Pair<TransportConfiguration, TransportConfiguration>> untriedConnectors = new HashMap<>();
    Map<String, Pair<TransportConfiguration, TransportConfiguration>> triedConnectors = new HashMap<>();
 
    private String nodeID;
 
-   public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server) {
+   public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server, long retryReplicationWait) {
       super(backupQuorum);
       this.server = server;
+      this.retryReplicationWait = retryReplicationWait;
    }
 
    @Override
@@ -66,7 +69,9 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
                   ConcurrentUtil.await(condition, timeout);
                } else {
                   while (untriedConnectors.isEmpty()) {
-                     condition.await();
+                     condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
+                     untriedConnectors.putAll(triedConnectors);
+                     triedConnectors.clear();
                   }
                }
             } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
index a3c50fb..624808d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -40,13 +42,16 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private final String backupGroupName;
+   private final long retryReplicationWait;
    private Queue<Pair<TransportConfiguration, TransportConfiguration>> liveConfigurations = new LinkedList<>();
+   private ArrayList<Pair<TransportConfiguration, TransportConfiguration>> triedConfigurations = new ArrayList<>();
 
    private String nodeID;
 
-   public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager) {
+   public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager, long retryReplicationWait) {
       super(quorumManager);
       this.backupGroupName = backupGroupName;
+      this.retryReplicationWait = retryReplicationWait;
    }
 
    @Override
@@ -64,7 +69,9 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
                   ConcurrentUtil.await(condition, timeout);
                } else {
                   while (liveConfigurations.size() == 0) {
-                     condition.await();
+                     condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
+                     liveConfigurations.addAll(triedConfigurations);
+                     triedConfigurations.clear();
                   }
                }
             } catch (InterruptedException e) {
@@ -112,7 +119,7 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
    public void notifyRegistrationFailed(boolean alreadyReplicating) {
       try {
          lock.lock();
-         liveConfigurations.poll();
+         triedConfigurations.add(liveConfigurations.poll());
          super.notifyRegistrationFailed(alreadyReplicating);
       } finally {
          lock.unlock();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 863923d..d1f0a05 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -142,7 +142,7 @@ public final class SharedNothingBackupActivation extends Activation {
             TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
             nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup()));
          } else {
-            nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum);
+            nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer, replicaPolicy.getRetryReplicationWait()) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum, replicaPolicy.getRetryReplicationWait());
          }
          ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
          clusterController.addClusterTopologyListenerForReplication(nodeLocator);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 04e2931..685dec4 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2277,6 +2277,13 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="retry-replication-wait" type="xsd:long" default="2000" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  If we start as a replica how long to wait (in milliseconds) before trying to replicate again after failing to find a replica
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:all>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
@@ -2380,6 +2387,13 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="retry-replication-wait" type="xsd:long" default="2000" minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  How long to wait (in milliseconds) before trying to replicate again after failing to find a replica
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
          <xsd:element name="quorum-vote-wait" type="xsd:integer" default="30" minOccurs="0" maxOccurs="1">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
index 2a23613..fd9b523 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
@@ -139,6 +139,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
          assertTrue(replicatedPolicy.isCheckForLiveServer());
          assertEquals(replicatedPolicy.getClusterName(), "abcdefg");
          assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876);
+         assertEquals(replicatedPolicy.getRetryReplicationWait(), 12345);
       } finally {
          server.stop();
       }
@@ -161,6 +162,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
          assertFalse(replicaPolicy.isRestartBackup());
          assertTrue(replicaPolicy.isAllowFailback());
          assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876);
+         assertEquals(replicaPolicy.getRetryReplicationWait(), 12345);
          ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
          assertNotNull(scaleDownPolicy);
          assertEquals(scaleDownPolicy.getGroupName(), "boo!");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/test/resources/replica-hapolicy-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/replica-hapolicy-config.xml b/artemis-server/src/test/resources/replica-hapolicy-config.xml
index 3b2c3ba..2ed3724 100644
--- a/artemis-server/src/test/resources/replica-hapolicy-config.xml
+++ b/artemis-server/src/test/resources/replica-hapolicy-config.xml
@@ -31,6 +31,7 @@
                <restart-backup>false</restart-backup>
                <allow-failback>true</allow-failback>
                <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
+               <retry-replication-wait>12345</retry-replication-wait>
                <scale-down>
                   <!--a grouping of servers that can be scaled down to-->
                   <group-name>boo!</group-name>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/artemis-server/src/test/resources/replicated-hapolicy-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/replicated-hapolicy-config.xml b/artemis-server/src/test/resources/replicated-hapolicy-config.xml
index fb2a602..2227479 100644
--- a/artemis-server/src/test/resources/replicated-hapolicy-config.xml
+++ b/artemis-server/src/test/resources/replicated-hapolicy-config.xml
@@ -27,6 +27,7 @@
                <check-for-live-server>true</check-for-live-server>
                <cluster-name>abcdefg</cluster-name>
                <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
+               <retry-replication-wait>12345</retry-replication-wait>
             </master>
          </replication>
       </ha-policy>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b34127b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
index 01a3ba6..c6ee8b4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
@@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpServer;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.component.WebServerComponent;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -35,6 +37,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
 import org.apache.activemq.artemis.dto.AppDTO;
 import org.apache.activemq.artemis.dto.WebServerDTO;
 import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -45,11 +48,13 @@ import org.junit.runner.Description;
 public class ReplicatedFailoverTest extends FailoverTest {
 
    boolean isReplicatedFailbackTest = false;
+   boolean isExtraBackupGroupNameReplicates = false;
    @Rule
    public TestRule watcher = new TestWatcher() {
       @Override
       protected void starting(Description description) {
          isReplicatedFailbackTest = description.getMethodName().equals("testReplicatedFailback") || description.getMethodName().equals("testLoop");
+         isExtraBackupGroupNameReplicates = description.getMethodName().equals("testExtraBackupGroupNameReplicates");
       }
 
    };
@@ -73,6 +78,49 @@ public class ReplicatedFailoverTest extends FailoverTest {
       Wait.waitFor(server::isReplicaSync);
    }
 
+   @Test
+   public void testExtraBackupReplicates() throws Exception {
+      Configuration secondBackupConfig = backupConfig.copy();
+      TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next();
+      TestableServer secondBackupServer = createTestableServer(secondBackupConfig);
+      tc.getParams().put("serverId", "2");
+      secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false);
+
+      waitForRemoteBackupSynchronization(backupServer.getServer());
+
+      secondBackupServer.start();
+      Thread.sleep(5000);
+      backupServer.stop();
+      waitForSync(secondBackupServer.getServer());
+      waitForRemoteBackupSynchronization(secondBackupServer.getServer());
+
+   }
+
+   @Test
+   public void testExtraBackupGroupNameReplicates() throws Exception {
+      ReplicaPolicyConfiguration backupReplicaPolicyConfiguration = (ReplicaPolicyConfiguration) backupServer.getServer().getConfiguration().getHAPolicyConfiguration();
+      backupReplicaPolicyConfiguration.setGroupName("foo");
+
+      ReplicatedPolicyConfiguration replicatedPolicyConfiguration = (ReplicatedPolicyConfiguration) liveServer.getServer().getConfiguration().getHAPolicyConfiguration();
+      replicatedPolicyConfiguration.setGroupName("foo");
+
+      Configuration secondBackupConfig = backupConfig.copy();
+      TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next();
+      TestableServer secondBackupServer = createTestableServer(secondBackupConfig);
+      tc.getParams().put("serverId", "2");
+      secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false);
+      ReplicaPolicyConfiguration replicaPolicyConfiguration = (ReplicaPolicyConfiguration) secondBackupConfig.getHAPolicyConfiguration();
+      replicaPolicyConfiguration.setGroupName("foo");
+      waitForRemoteBackupSynchronization(backupServer.getServer());
+
+      secondBackupServer.start();
+      Thread.sleep(5000);
+      backupServer.stop();
+      waitForSync(secondBackupServer.getServer());
+      waitForRemoteBackupSynchronization(secondBackupServer.getServer());
+
+   }
+
    @Test(timeout = 120000)
    /*
    * default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this
@@ -213,6 +261,12 @@ public class ReplicatedFailoverTest extends FailoverTest {
       } else {
          super.setupHAPolicyConfiguration();
       }
+
+      if (isExtraBackupGroupNameReplicates) {
+         ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setGroupName("foo");
+         ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setGroupName("foo");
+
+      }
    }
 
    @Override


[2/2] activemq-artemis git commit: This closes #2293

Posted by cl...@apache.org.
This closes #2293


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bf0eede9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bf0eede9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bf0eede9

Branch: refs/heads/master
Commit: bf0eede933365c6862922dfe3445794babd3b197
Parents: ec24ee4 3b34127
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 13 09:19:08 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 13 09:19:08 2018 -0400

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  6 +++
 .../artemis/core/config/ConfigurationUtils.java |  4 +-
 .../config/ha/ReplicaPolicyConfiguration.java   | 10 ++++
 .../ha/ReplicatedPolicyConfiguration.java       | 10 ++++
 .../deployers/impl/FileConfigurationParser.java |  4 ++
 .../core/server/cluster/ha/ReplicaPolicy.java   | 15 +++++-
 .../server/cluster/ha/ReplicatedPolicy.java     | 13 ++++-
 .../impl/AnyLiveNodeLocatorForReplication.java  |  9 +++-
 .../NamedLiveNodeLocatorForReplication.java     | 13 +++--
 .../impl/SharedNothingBackupActivation.java     |  2 +-
 .../resources/schema/artemis-configuration.xsd  | 14 +++++
 .../config/impl/HAPolicyConfigurationTest.java  |  2 +
 .../test/resources/replica-hapolicy-config.xml  |  1 +
 .../resources/replicated-hapolicy-config.xml    |  1 +
 .../failover/ReplicatedFailoverTest.java        | 54 ++++++++++++++++++++
 15 files changed, 147 insertions(+), 11 deletions(-)
----------------------------------------------------------------------