You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/05/04 08:24:16 UTC

[GitHub] [activemq-artemis] michaelpearce-gain commented on a change in pull request #3555: ARTEMIS-2716 Implements pluggable Quorum Vote

michaelpearce-gain commented on a change in pull request #3555:
URL: https://github.com/apache/activemq-artemis/pull/3555#discussion_r625596297



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.core.server.NodeManager;;
+import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
+
+/**
+ * This activation can be used by a primary while trying to fail-back ie {@code failback == true} or
+ * by a natural-born backup ie {@code failback == false}.<br>
+ */
+public final class ReplicationBackupActivation extends Activation {
+
+   private static final Logger LOGGER = Logger.getLogger(ReplicationBackupActivation.class);
+
+   private final ReplicationBackupPolicy policy;
+   private final ActiveMQServerImpl activeMQServer;
+   private final boolean failback;
+   // This field is != null iff this node is a primary during a fail-back ie acting as a backup in order to become live again.
+   private final String expectedNodeID;
+   @GuardedBy("this")
+   private boolean closed;
+   private final DistributedPrimitiveManager distributedManager;
+   // Used for monitoring purposes
+   private volatile ReplicationObserver replicationObserver;
+   // Used for testing purposes
+   private volatile ReplicationEndpoint replicationEndpoint;
+
+   public ReplicationBackupActivation(final ActiveMQServerImpl activeMQServer,
+                                      final boolean failback,
+                                      final DistributedPrimitiveManager distributedManager,
+                                      final ReplicationBackupPolicy policy) {
+      this.activeMQServer = activeMQServer;
+      this.failback = failback;
+      if (failback) {
+         final SimpleString serverNodeID = activeMQServer.getNodeID();
+         if (serverNodeID == null || serverNodeID.isEmpty()) {
+            throw new IllegalStateException("A failback activation must be biased around a specific NodeID");
+         }
+         this.expectedNodeID = serverNodeID.toString();
+      } else {
+         this.expectedNodeID = null;
+      }
+      this.distributedManager = distributedManager;
+      this.policy = policy;
+      this.replicationObserver = null;
+      this.replicationEndpoint = null;
+   }
+
+   @Override
+   public void run() {
+      synchronized (this) {
+         if (closed) {
+            return;
+         }
+      }
+      try {
+         try {
+            // best effort to add an additional "witness" node in case of quorum using distributed consensus
+            distributedManager.start(policy.getQuorumVoteWait(), TimeUnit.SECONDS);
+         } catch (ExecutionException ignore) {
+            LOGGER.debug("Failed to start distributed primitive manager", ignore);
+         }
+         // Stop the previous node manager and create a new one with NodeManager::replicatedBackup == true:
+         // NodeManager::start skip setup lock file with NodeID, until NodeManager::stopBackup is called.
+         activeMQServer.resetNodeManager();
+         activeMQServer.getNodeManager().stop();
+         // A primary need to preserve NodeID across runs
+         activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), failback);
+         activeMQServer.getNodeManager().start();
+         if (!activeMQServer.initialisePart1(false)) {
+            return;
+         }
+         synchronized (this) {
+            if (closed)
+               return;
+         }
+         final ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
+         clusterController.awaitConnectionToReplicationCluster();
+         activeMQServer.getBackupManager().start();
+         ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(),
+                                                         activeMQServer.getNodeManager().getNodeId());
+         activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
+         final DistributedLock liveLock = replicateAndFailover(clusterController);
+         if (liveLock == null) {
+            return;
+         }
+         startAsLive(liveLock);
+      } catch (Exception e) {
+         if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted()) {
+            // do not log these errors if the server is being stopped.
+            return;
+         }
+         ActiveMQServerLogger.LOGGER.initializationError(e);
+      }
+   }
+
+   private void startAsLive(final DistributedLock liveLock) throws Exception {
+      policy.getLivePolicy().setBackupPolicy(policy);
+      activeMQServer.setHAPolicy(policy.getLivePolicy());
+
+      synchronized (activeMQServer) {
+         if (!activeMQServer.isStarted()) {
+            liveLock.close();
+            return;
+         }
+         ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+         // stopBackup is going to write the NodeID previously set on the NodeManager,
+         // because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
+         activeMQServer.getNodeManager().stopBackup();
+         activeMQServer.getStorageManager().start();
+         activeMQServer.getBackupManager().activated();
+         // IMPORTANT:
+         // we're setting this activation JUST because it would allow the server to use its
+         // getActivationChannelHandler to handle replication
+         final ReplicationPrimaryActivation primaryActivation = new ReplicationPrimaryActivation(activeMQServer, distributedManager, policy.getLivePolicy());
+         liveLock.addListener(primaryActivation);
+         activeMQServer.setActivation(primaryActivation);
+         activeMQServer.initialisePart2(false);
+         final boolean stillLive;
+         try {
+            stillLive = liveLock.isHeldByCaller();
+         } catch (UnavailableStateException e) {
+            LOGGER.warn(e);
+            throw new ActiveMQIllegalStateException("This server cannot check its role as a live: activation is failed");
+         }
+         if (!stillLive) {
+            throw new ActiveMQIllegalStateException("This server is not live anymore: activation is failed");
+         }
+         if (activeMQServer.getIdentity() != null) {
+            ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+         } else {
+            ActiveMQServerLogger.LOGGER.serverIsLive();
+         }
+         activeMQServer.completeActivation(true);
+      }
+   }
+
+   private LiveNodeLocator createLiveNodeLocator(final LiveNodeLocator.BackupRegistrationListener registrationListener) {
+      if (expectedNodeID != null) {
+         assert failback;
+         return new NamedLiveNodeIdLocatorForReplication(expectedNodeID, registrationListener, policy.getRetryReplicationWait());
+      }
+      return policy.getGroupName() == null ?
+         new AnyLiveNodeLocatorForReplication(registrationListener, activeMQServer, policy.getRetryReplicationWait()) :
+         new NamedLiveNodeLocatorForReplication(policy.getGroupName(), registrationListener, policy.getRetryReplicationWait());
+   }
+
+   private DistributedLock replicateAndFailover(final ClusterController clusterController) throws ActiveMQException, InterruptedException {
+      while (true) {
+         synchronized (this) {
+            if (closed) {
+               return null;
+            }
+         }
+         final ReplicationFailure failure = replicateLive(clusterController);
+         if (failure == null) {
+            Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
+            continue;
+         }
+         if (!activeMQServer.isStarted()) {
+            return null;
+         }
+         LOGGER.debugf("ReplicationFailure = %s", failure);
+         switch (failure) {
+            case VoluntaryFailOver:
+            case NonVoluntaryFailover:
+               final DistributedLock liveLock = tryAcquireLiveLock();
+               if (liveLock != null) {
+                  return liveLock;
+               }
+               if (!failback) {
+                  ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
+               }
+               asyncRestartServer(activeMQServer, true);
+               return null;
+            case RegistrationError:
+               asyncRestartServer(activeMQServer, false);
+               return null;
+            case AlreadyReplicating:
+               // can just retry here, data should be clean
+               continue;
+            case ClosedObserver:
+               return null;
+            case BackupNotInSync:
+               asyncRestartServer(activeMQServer, true);
+               return null;
+            case WrongNodeId:
+               asyncRestartServer(activeMQServer, true);
+               return null;
+            default:
+               throw new AssertionError("Unsupported failure " + failure);
+         }
+      }
+   }
+
+   private DistributedLock tryAcquireLiveLock() throws InterruptedException {
+      assert activeMQServer.getNodeManager().getNodeId() != null;
+      final String liveID = activeMQServer.getNodeManager().getNodeId().toString();
+      final int voteRetries = policy.getVoteRetries();
+      final long maxAttempts = voteRetries >= 0 ? (voteRetries + 1) : -1;

Review comment:
       so 
   
   -1 normally means to me "unlimited" e.g. never stop
   
   0 means dont do any retry, e.g. fail/error on first error, and do not re-attempt.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org