You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/07/03 00:11:19 UTC

[asterixdb] 01/02: [NO ISSUE][REPL] Suspend Dataset Checkpointing on Replica Sync

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 0561d1072b6a217816b5c9564ccc0ae84a931a5c
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Fri Jun 28 04:39:14 2019 +0300

    [NO ISSUE][REPL] Suspend Dataset Checkpointing on Replica Sync
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Before synchronizing replicas, stop datasets checkpointing to
      prevent new files from being generated due to async IO operations
      triggered by checkpointing.
    - Instead of sync'ing current files to replicas then scheduling a flush
      and sync'ing any newly generated files, just flush datasets before
      the initial sync then sync all the files in one go.
    
    Change-Id: I058fd48bc0fb89a1e16448ce516c3410bb4d681d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3469
    Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 .../common/transactions/ICheckpointManager.java     | 10 ++++++++++
 .../replication/sync/ReplicaSynchronizer.java       | 17 +++++++++++------
 .../service/recovery/CheckpointManager.java         | 21 ++++++++++++++++-----
 3 files changed, 37 insertions(+), 11 deletions(-)

diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 36cea55..954e399 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -58,4 +58,14 @@ public interface ICheckpointManager extends ILifeCycleComponent {
      * @param id
      */
     void completed(TxnId id);
+
+    /**
+     * Suspends checkpointing datasets
+     */
+    void suspend();
+
+    /**
+     * Resumes checkpointing datasets
+     */
+    void resume();
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0f0b5bd..123709b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -45,21 +46,25 @@ public class ReplicaSynchronizer {
     public void sync() throws IOException {
         final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
         synchronized (syncLock) {
-            syncFiles();
-            checkpointReplicaIndexes();
-            appCtx.getReplicationManager().register(replica);
+            final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
+            try {
+                // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
+                checkpointManager.suspend();
+                syncFiles();
+                checkpointReplicaIndexes();
+                appCtx.getReplicationManager().register(replica);
+            } finally {
+                checkpointManager.resume();
+            }
         }
     }
 
     private void syncFiles() throws IOException {
         final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
-        waitForReplicatedDatasetsIO();
-        fileSync.sync();
         // flush replicated dataset to generate disk component for any remaining in-memory components
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
         appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
         waitForReplicatedDatasetsIO();
-        // sync any newly generated files
         fileSync.sync();
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ce523db..b85742e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.transaction.management.service.recovery;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -27,10 +31,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * An implementation of {@link ICheckpointManager} that defines the logic
  * of checkpoints.
@@ -40,6 +40,7 @@ public class CheckpointManager extends AbstractCheckpointManager {
     private static final Logger LOGGER = LogManager.getLogger();
     private static final long NO_SECURED_LSN = -1L;
     private final Map<TxnId, Long> securedLSNs;
+    private boolean suspended = false;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
@@ -76,7 +77,7 @@ public class CheckpointManager extends AbstractCheckpointManager {
         }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-        if (!checkpointSucceeded) {
+        if (!checkpointSucceeded && !suspended) {
             // Flush datasets with indexes behind target checkpoint LSN
             IDatasetLifecycleManager datasetLifecycleManager =
                     txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
@@ -100,6 +101,16 @@ public class CheckpointManager extends AbstractCheckpointManager {
         securedLSNs.remove(id);
     }
 
+    @Override
+    public synchronized void suspend() {
+        suspended = true;
+    }
+
+    @Override
+    public synchronized void resume() {
+        suspended = false;
+    }
+
     private synchronized long getMinSecuredLSN() {
         return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
     }