You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2021/09/15 20:39:36 UTC

[asterixdb] branch master updated: [NO ISSUE][REP] Sync replicas for different partitions concurrently

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e138d  [NO ISSUE][REP] Sync replicas for different partitions concurrently
16e138d is described below

commit 16e138d15dbe70c644980cc890db7c724d127b40
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Wed Sep 15 03:57:28 2021 +0300

    [NO ISSUE][REP] Sync replicas for different partitions concurrently
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Get sync lock per partition.
    
    Change-Id: I6693fbca51d8e13f9740941f797f63fae7b1d2d0
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13244
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../org/apache/asterix/app/nc/ReplicaManager.java  | 30 +++++++++++++---------
 .../message/RegistrationTasksResponseMessage.java  |  4 ++-
 .../asterix/common/storage/IReplicaManager.java    |  6 +++--
 .../asterix/replication/api/PartitionReplica.java  | 13 +++++++++-
 .../replication/sync/ReplicaSynchronizer.java      |  3 ++-
 .../service/recovery/CheckpointManager.java        | 14 ++++++----
 6 files changed, 48 insertions(+), 22 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index f6de92d..7c4b59c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -54,17 +54,18 @@ public class ReplicaManager implements IReplicaManager {
     /**
      * the partitions to which the current node is master
      */
-    private final Set<Integer> partitions = new HashSet<>();
+    private final Map<Integer, Object> partitions = new HashMap<>();
     /**
      * current replicas
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
-    private final Object replicaSyncLock = new Object();
     private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
 
     public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
         this.appCtx = appCtx;
-        this.partitions.addAll(partitions);
+        for (Integer partition : partitions) {
+            this.partitions.put(partition, new Object());
+        }
         setNodeOwnedPartitions(appCtx);
     }
 
@@ -77,7 +78,7 @@ public class ReplicaManager implements IReplicaManager {
             LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
             return;
         }
-        if (!partitions.contains(id.getPartition())) {
+        if (!partitions.containsKey(id.getPartition())) {
             throw new IllegalStateException(
                     "This node is not the current master of partition(" + id.getPartition() + ")");
         }
@@ -96,7 +97,6 @@ public class ReplicaManager implements IReplicaManager {
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
-
     }
 
     @Override
@@ -112,18 +112,20 @@ public class ReplicaManager implements IReplicaManager {
 
     @Override
     public synchronized Set<Integer> getPartitions() {
-        return Collections.unmodifiableSet(partitions);
+        return Collections.unmodifiableSet(partitions.keySet());
     }
 
     @Override
     public synchronized void setActivePartitions(Set<Integer> activePartitions) {
         partitions.clear();
-        partitions.addAll(activePartitions);
+        for (Integer partition : activePartitions) {
+            partitions.put(partition, new Object());
+        }
     }
 
     @Override
     public synchronized void promote(int partition) throws HyracksDataException {
-        if (partitions.contains(partition)) {
+        if (partitions.containsKey(partition)) {
             return;
         }
         LOGGER.warn("promoting partition {}", partition);
@@ -132,12 +134,12 @@ public class ReplicaManager implements IReplicaManager {
         localResourceRepository.cleanup(partition);
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
-        partitions.add(partition);
+        partitions.put(partition, new Object());
     }
 
     @Override
     public synchronized void release(int partition) throws HyracksDataException {
-        if (!partitions.contains(partition)) {
+        if (!partitions.containsKey(partition)) {
             return;
         }
         closePartitionResources(partition);
@@ -149,8 +151,12 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     @Override
-    public Object getReplicaSyncLock() {
-        return replicaSyncLock;
+    public synchronized Object getPartitionSyncLock(int partition) {
+        Object syncLock = partitions.get(partition);
+        if (syncLock == null) {
+            throw new IllegalStateException("partition " + partition + " is not active on this node");
+        }
+        return syncLock;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index b3546cc..f0a4a7c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -73,7 +74,8 @@ public class RegistrationTasksResponseMessage extends CcIdentifiedMessage
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            Set<Integer> nodeActivePartitions = appCtx.getReplicaManager().getPartitions();
+            // wrap the returned partitions in a hash set to make it serializable
+            Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
                     new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
             result.setException(exception);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 88d3113..a4d56ce 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -80,12 +80,14 @@ public interface IReplicaManager {
     void release(int partition) throws HyracksDataException;
 
     /**
-     * A lock that can be used to ensure a single replica is being synchronized at a time
+     * A lock that can be used to ensure a single partition replica is being synchronized at a time
      * by this {@link IReplicaManager}
      *
+     * @param partition partition
+     *
      * @return the synchronization lock
      */
-    Object getReplicaSyncLock();
+    Object getPartitionSyncLock(int partition);
 
     /**
      * Gets the partition replicas matching {@code id}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 3b10700..c49bb7b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -24,6 +24,8 @@ import static org.apache.asterix.common.replication.IPartitionReplica.PartitionR
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -57,6 +59,7 @@ public class PartitionReplica implements IPartitionReplica {
     private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
     private ISocketChannel sc;
+    private Future<?> syncFuture;
 
     public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
         this.id = id;
@@ -87,7 +90,8 @@ public class PartitionReplica implements IPartitionReplica {
             return;
         }
         setStatus(CATCHING_UP);
-        appCtx.getThreadExecutor().execute(() -> {
+        ExecutorService threadExecutor = (ExecutorService) appCtx.getThreadExecutor();
+        syncFuture = threadExecutor.submit(() -> {
             try {
                 new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
                 setStatus(IN_SYNC);
@@ -100,6 +104,13 @@ public class PartitionReplica implements IPartitionReplica {
         });
     }
 
+    public synchronized void abort() {
+        if (syncFuture != null) {
+            syncFuture.cancel(true);
+        }
+        syncFuture = null;
+    }
+
     public synchronized ISocketChannel getChannel() {
         try {
             if (!NetworkingUtil.isHealthy(sc)) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 05e2e75..2434686 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -44,7 +44,8 @@ public class ReplicaSynchronizer {
     }
 
     public void sync(boolean register, boolean deltaRecovery) throws IOException {
-        synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
+        Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
+        synchronized (partitionLock) {
             final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
             try {
                 // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6582670..f09248f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -46,7 +46,7 @@ public class CheckpointManager extends AbstractCheckpointManager {
     private static final long NO_SECURED_LSN = -1L;
     private final long datasetCheckpointIntervalNanos;
     private final Map<TxnId, Long> securedLSNs;
-    private boolean suspended = false;
+    private int suspendCount = 0;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
@@ -84,7 +84,7 @@ public class CheckpointManager extends AbstractCheckpointManager {
         }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-        if (!checkpointSucceeded && !suspended) {
+        if (!checkpointSucceeded && !isSuspended()) {
             // Flush datasets with indexes behind target checkpoint LSN
             final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
             dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
@@ -109,21 +109,25 @@ public class CheckpointManager extends AbstractCheckpointManager {
 
     @Override
     public synchronized void checkpointIdleDatasets() throws HyracksDataException {
-        if (suspended) {
+        if (isSuspended()) {
             return;
         }
         final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
         dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
     }
 
+    private synchronized boolean isSuspended() {
+        return suspendCount != 0;
+    }
+
     @Override
     public synchronized void suspend() {
-        suspended = true;
+        suspendCount++;
     }
 
     @Override
     public synchronized void resume() {
-        suspended = false;
+        suspendCount--;
     }
 
     private synchronized long getMinSecuredLSN() {