You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2021/07/24 10:29:50 UTC

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #3646: ARTEMIS-3340 layered over ARTEMIS-2716 - activation sequence tracking to protect the journal

franz1981 commented on a change in pull request #3646:
URL: https://github.com/apache/activemq-artemis/pull/3646#discussion_r675981524



##########
File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.MutableLong;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.server.ActiveMQServer.SERVER_STATE.STARTED;
+import static org.apache.activemq.artemis.core.server.impl.ClusterTopologySearch.searchActiveLiveNodeId;
+
+/**
+ * This is going to be {@link #run()} just by natural born primary, at the first start.
+ * Both during a failover or a failback, {@link #run()} isn't going to be used, but only {@link #getActivationChannelHandler(Channel, Acceptor)}.
+ */
+public class ReplicationPrimaryActivation extends LiveActivation implements DistributedLock.UnavailableLockListener {
+
+   private static final Logger LOGGER = Logger.getLogger(ReplicationPrimaryActivation.class);
+   private static final long DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS = 20_000;
+   private static final long BLOCKING_CALLS_TIMEOUT_MILLIS = 5_000;
+
+   private final ReplicationPrimaryPolicy policy;
+
+   private final ActiveMQServerImpl activeMQServer;
+
+   @GuardedBy("replicationLock")
+   private ReplicationManager replicationManager;
+
+   private final Object replicationLock;
+
+   private final DistributedPrimitiveManager distributedManager;
+
+   private volatile boolean stoppingServer;
+
+   public ReplicationPrimaryActivation(final ActiveMQServerImpl activeMQServer,
+                                       final DistributedPrimitiveManager distributedManager,
+                                       final ReplicationPrimaryPolicy policy) {
+      this.activeMQServer = activeMQServer;
+      this.policy = policy;
+      this.replicationLock = new Object();
+      this.distributedManager = distributedManager;
+   }
+
+   /**
+    * used for testing purposes.
+    */
+   public DistributedPrimitiveManager getDistributedManager() {
+      return distributedManager;
+   }
+
+   @Override
+   public void freezeConnections(RemotingService remotingService) {
+      final ReplicationManager replicationManager = getReplicationManager();
+
+      if (remotingService != null && replicationManager != null) {
+         remotingService.freeze(null, replicationManager.getBackupTransportConnection());
+      } else if (remotingService != null) {
+         remotingService.freeze(null, null);
+      }
+   }
+
+   @Override
+   public void run() {
+      try {
+
+         // we have a common nodeId that we can share and coordinate with between peers
+         if (policy.getCoordinationId() != null) {
+            LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", policy.getCoordinationId());
+
+            // REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted!
+            activeMQServer.resetNodeManager();
+            activeMQServer.getNodeManager().start();
+            activeMQServer.getNodeManager().setNodeID(policy.getCoordinationId());
+            activeMQServer.getNodeManager().stopBackup();
+         }
+         final String nodeId = activeMQServer.getNodeManager().readNodeId().toString();
+
+         final DistributedLock liveLock = searchLiveOrAcquireLiveLock(nodeId, BLOCKING_CALLS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+         if (liveLock == null) {
+            return;
+         }
+
+         ReplicationBackupActivation.ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER);
+
+         activeMQServer.initialisePart1(false);
+
+         activeMQServer.initialisePart2(false);
+
+         // must be registered before checking the caller
+         liveLock.addListener(this);
+
+         // This control is placed here because initialisePart2 is going to load the journal that
+         // could pause the JVM for enough time to lose lock ownership
+         if (!liveLock.isHeldByCaller()) {
+            throw new IllegalStateException("This broker isn't live anymore, probably due to application pauses eg GC, OS etc: failing now");
+         }
+
+         activeMQServer.completeActivation(true);
+
+         if (activeMQServer.getIdentity() != null) {
+            ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+         } else {
+            ActiveMQServerLogger.LOGGER.serverIsLive();
+         }
+      } catch (Exception e) {
+         // async stop it, we don't need to await this to complete
+         distributedManager.stop();
+         ActiveMQServerLogger.LOGGER.initializationError(e);
+         activeMQServer.callActivationFailureListeners(e);
+      }
+   }
+
+   private DistributedLock searchLiveOrAcquireLiveLock(final String nodeId,
+                                                       final long blockingCallTimeout,
+                                                       final TimeUnit unit) throws ActiveMQException, InterruptedException {
+      if (policy.isCheckForLiveServer()) {
+         LOGGER.infof("Searching for a live server matching NodeID = %s", nodeId);
+         if (searchActiveLiveNodeId(policy.getClusterName(), nodeId, blockingCallTimeout, unit, activeMQServer.getConfiguration())) {
+            LOGGER.infof("Found a live server with  NodeID = %s: restarting as backup", nodeId);
+            activeMQServer.setHAPolicy(policy.getBackupPolicy());
+            return null;
+         }
+      }
+      startDistributedPrimitiveManager();
+      return acquireDistributeLock(getDistributeLock(nodeId), blockingCallTimeout, unit);
+   }
+
+   private void startDistributedPrimitiveManager() throws InterruptedException, ActiveMQException {
+      LOGGER.infof("Trying to reach the majority of quorum nodes in %d ms.", DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS);
+      try {
+         if (distributedManager.start(DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            return;
+         }
+      } catch (InterruptedException ie) {
+         throw ie;
+      } catch (Throwable t) {
+         LOGGER.debug(t);
+      }
+      assert !distributedManager.isStarted();
+      throw new ActiveMQException("Cannot reach the majority of quorum nodes");
+   }
+
+   private DistributedLock getDistributeLock(final String nodeId) throws InterruptedException, ActiveMQException {
+      try {
+         return distributedManager.getDistributedLock(nodeId);
+      } catch (Throwable t) {
+         try {
+            distributedManager.stop();
+         } catch (Throwable ignore) {
+            // don't care
+         }
+         if (t instanceof InterruptedException) {
+            throw (InterruptedException) t;
+         }
+         throw new ActiveMQException("Cannot obtain a live lock instance");
+      }
+   }
+
+   private DistributedLock acquireDistributeLock(final DistributedLock liveLock,
+                                                 final long acquireLockTimeout,
+                                                 final TimeUnit unit) throws InterruptedException, ActiveMQException {
+      try {
+         if (liveLock.tryLock(acquireLockTimeout, unit)) {
+            return liveLock;
+         }
+      } catch (UnavailableStateException e) {
+         LOGGER.debug(e);
+      }
+      try {
+         distributedManager.stop();
+      } catch (Throwable ignore) {
+         // don't care
+      }
+      throw new ActiveMQException("Failed to become live");
+   }
+
+   @Override
+   public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) {
+      if (stoppingServer) {
+         return null;
+      }
+      return packet -> {
+         if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) {
+            onBackupRegistration(channel, acceptorUsed, (BackupRegistrationMessage) packet);
+         }
+      };
+   }
+
+   private void onBackupRegistration(final Channel channel,
+                                     final Acceptor acceptorUsed,
+                                     final BackupRegistrationMessage msg) {
+      try {
+         startAsyncReplication(channel.getConnection(), acceptorUsed.getClusterConnection(), msg.getConnector(), msg.isFailBackRequest());
+      } catch (ActiveMQAlreadyReplicatingException are) {
+         channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
+      } catch (ActiveMQException e) {
+         LOGGER.debug("Failed to process backup registration packet", e);
+         channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
+      }
+   }
+
+   private void startAsyncReplication(final CoreRemotingConnection remotingConnection,
+                                      final ClusterConnection clusterConnection,
+                                      final TransportConfiguration backupTransport,
+                                      final boolean isFailBackRequest) throws ActiveMQException {
+      synchronized (replicationLock) {
+         if (replicationManager != null) {
+            throw new ActiveMQAlreadyReplicatingException();
+         }
+         if (!activeMQServer.isStarted()) {
+            throw new ActiveMQIllegalStateException();
+         }
+         final ReplicationFailureListener listener = new ReplicationFailureListener();
+         remotingConnection.addCloseListener(listener);
+         remotingConnection.addFailureListener(listener);
+         final ReplicationManager replicationManager = new ReplicationManager(activeMQServer, remotingConnection, clusterConnection.getCallTimeout(), policy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
+         this.replicationManager = replicationManager;
+         replicationManager.start();
+         final Thread replicatingThread = new Thread(() -> replicate(replicationManager, clusterConnection, isFailBackRequest, backupTransport));
+         replicatingThread.setName("async-replication-thread");
+         replicatingThread.start();
+      }
+   }
+
+   private void replicate(final ReplicationManager replicationManager,
+                          final ClusterConnection clusterConnection,
+                          final boolean isFailBackRequest,
+                          final TransportConfiguration backupTransport) {
+      try {
+         final String nodeID = activeMQServer.getNodeID().toString();
+         activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), nodeID, isFailBackRequest && policy.isAllowAutoFailBack(), policy.getInitialReplicationSyncTimeout());
+
+         clusterConnection.nodeAnnounced(System.currentTimeMillis(), nodeID, policy.getGroupName(), policy.getScaleDownGroupName(), new Pair<>(null, backupTransport), true);
+
+         if (isFailBackRequest && policy.isAllowAutoFailBack()) {
+            awaitBackupAnnouncementOnFailbackRequest(clusterConnection);
+         }
+      } catch (Exception e) {
+         if (activeMQServer.getState() == STARTED) {
+            /*
+             * The reasoning here is that the exception was either caused by (1) the
+             * (interaction with) the backup, or (2) by an IO Error at the storage. If (1), we
+             * can swallow the exception and ignore the replication request. If (2) the live
+             * will crash shortly.
+             */
+            ActiveMQServerLogger.LOGGER.errorStartingReplication(e);
+         }
+         try {
+            ActiveMQServerImpl.stopComponent(replicationManager);
+         } catch (Exception amqe) {
+            ActiveMQServerLogger.LOGGER.errorStoppingReplication(amqe);
+         } finally {
+            synchronized (replicationLock) {
+               this.replicationManager = null;
+            }
+         }
+      }
+   }
+
+   /**
+    * This is handling awaiting backup announcement before trying to failover.
+    * This broker is a backup broker, acting as a live and ready to restart as a backup
+    */
+   private void awaitBackupAnnouncementOnFailbackRequest(ClusterConnection clusterConnection) throws Exception {
+      final String nodeID = activeMQServer.getNodeID().toString();
+      final BackupTopologyListener topologyListener = new BackupTopologyListener(nodeID, clusterConnection.getConnector());
+      clusterConnection.addClusterTopologyListener(topologyListener);
+      try {
+         if (topologyListener.waitForBackup()) {
+            restartAsBackupAfterFailback();
+         } else {
+            ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
+         }
+      } finally {
+         clusterConnection.removeClusterTopologyListener(topologyListener);
+      }
+   }
+
+   /**
+    * If {@link #asyncStopServer()} happens before this call, the restart just won't happen.
+    * If {@link #asyncStopServer()} happens after this call, will make the server to stop right after being restarted.
+    */
+   private void restartAsBackupAfterFailback() throws Exception {
+      if (stoppingServer) {
+         return;
+      }
+      synchronized (this) {
+         if (stoppingServer) {
+            return;
+         }
+         final String coordinatedLockAndNodeId = activeMQServer.getNodeManager().getNodeId().toString();
+         final long inSyncReplicaActivation = activeMQServer.getNodeManager().getNodeActivationSequence();
+         DistributedLock existingLiveLock = distributedManager.getDistributedLock(coordinatedLockAndNodeId);
+         // give up our role as 'live' asap, will also happen on server.fail
+         existingLiveLock.close();
+
+         activeMQServer.fail(true);
+
+         // need to restart
+         distributedManager.start();
+         final MutableLong coordinatedActivationSequence = distributedManager.getMutableLong(coordinatedLockAndNodeId);
+         // wait for the live to activate and run un replicated with a sequence > inSyncReplicaActivation
+         // this read can be dirty b/c we are just looking for an increment, we don't care about the actual value
+         final long done = System.currentTimeMillis() + policy.getBackupPolicy().getVoteRetryWait();
+         do {
+            final long coordinatedValue = coordinatedActivationSequence.get();
+            if (coordinatedValue > inSyncReplicaActivation) {
+               // all good, activation has gone ahead
+               LOGGER.infof("Detected expected sequential server activation after failback, with NodeID = %s: and sequence: %d", coordinatedLockAndNodeId, coordinatedValue);
+               break;
+            }
+            try {
+               TimeUnit.MILLISECONDS.sleep(100);
+            } catch (InterruptedException ignored) {
+            }
+         }
+         while (done < System.currentTimeMillis());
+
+         if (coordinatedActivationSequence.get() == inSyncReplicaActivation) {
+            LOGGER.warnf("Timed out waiting for failback server activation with NodeID = %s: and sequence > %d: after %dms",

Review comment:
       what happen if the other primary hasn't been able to failback?
   
   If this broker is going to restart as backup it will skip checking any activation sequence and will rotate the data (while preserving the NodeID), is it correct?
   In the worst case scenario the other primary has died and there is no live around AND this broker will rotate the most up to date data, that seems a bad thing. Let's speak on monday about it: maybe makes sense to NOT skip checking the activation sequence on server restart during a failback, in short...
   ```java
            if (policy.isTryFailback()) {
               // we are replicating to overwrite our data, transient backup state while trying to be the primary
            } else {
               // we may be a valid insync_replica (backup) if our activation sequence is largest for a nodeId
               // verify that before removing data..
               final DistributedLock liveLockWithInSyncReplica = checkForInSyncReplica();
   ```
   can become
   ```java
               // we may be a valid insync_replica (backup) if our activation sequence is largest for a nodeId
               // verify that before removing data..
               final DistributedLock liveLockWithInSyncReplica = checkForInSyncReplica();
   ```
   because `checkForInSyncReplica` won't give up on becoming backup if the data is the most up to date.
   wdyt?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org