You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/03/27 23:48:30 UTC

[07/12] asterixdb git commit: [NO ISSUE][REPL] Ignore LSNs of Partially Replicated Indexes

[NO ISSUE][REPL] Ignore LSNs of Partially Replicated Indexes

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- When determining low watermark, ignore LSN of replicated
  indexes with no checkpoints.
- Guard logs in case of unexpected min LSN read failures.
- Ensure only one replica is synchronized at a time to prevent
  possible merge operations from deleting files being synchronized
  to another replica concurrently.
- Ensure index metadata files are replicated first to allow
  replicas to find any existing files in case of re-synchronization.
- Ensure replication channel is closed on replication failures.

Change-Id: I9ca08da29bdd8fc4406f2df7e6eb32601caf9388
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2534
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f3784bb3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f3784bb3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f3784bb3

Branch: refs/heads/master
Commit: f3784bb3e5c5a62ee244adab80bb70b1b811f255
Parents: f7c7059
Author: Murtadha Hubail <mh...@apache.org>
Authored: Tue Mar 27 21:28:14 2018 +0300
Committer: Michael Blow <mb...@apache.org>
Committed: Tue Mar 27 13:53:37 2018 -0700

----------------------------------------------------------------------
 .../apache/asterix/app/nc/RecoveryManager.java   | 15 +++++++++++++--
 .../apache/asterix/app/nc/ReplicaManager.java    |  6 ++++++
 .../asterix/common/storage/IReplicaManager.java  |  8 ++++++++
 .../replication/api/PartitionReplica.java        | 19 +++++++++++++------
 .../sync/ReplicaFilesSynchronizer.java           | 12 ++++++++++++
 .../replication/sync/ReplicaSynchronizer.java    | 13 ++++++++-----
 6 files changed, 60 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4b14a9c..d4e652d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -50,6 +50,7 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -93,6 +94,7 @@ import org.apache.logging.log4j.Logger;
 public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;
+    private static final long SMALLEST_POSSIBLE_LSN = 0;
     private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
     private final ITransactionSubsystem txnSubsystem;
     private final LogManager logMgr;
@@ -499,8 +501,17 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                 return dsResource.getPartition() == partition;
             }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
             for (DatasetResourceReference indexRef : partitionResources) {
-                long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
-                minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                try {
+                    final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef);
+                    if (idxCheckpointMgr.getCheckpointCount() > 0) {
+                        long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
+                        minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                    }
+                } catch (Exception e) {
+                    LOGGER.warn("Failed to get min LSN of resource {}", indexRef, e);
+                    // ensure no logs will be deleted in case of unexpected failures
+                    return SMALLEST_POSSIBLE_LSN;
+                }
             }
         }
         return minRemoteLSN;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index c821c56..5c5ce93 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -57,6 +57,7 @@ public class ReplicaManager implements IReplicaManager {
      * current replicas
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
+    private final Object replicaSyncLock = new Object();
 
     public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
         this.appCtx = appCtx;
@@ -126,6 +127,11 @@ public class ReplicaManager implements IReplicaManager {
         partitions.remove(partition);
     }
 
+    @Override
+    public Object getReplicaSyncLock() {
+        return replicaSyncLock;
+    }
+
     private void closePartitionResources(int partition) throws HyracksDataException {
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index b2deb1e..1b8ec53 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -71,4 +71,12 @@ public interface IReplicaManager {
      * @throws HyracksDataException
      */
     void release(int partition) throws HyracksDataException;
+
+    /**
+     * A lock that can be used to ensure a single replica is being synchronized at a time
+     * by this {@link IReplicaManager}
+     *
+     * @return the synchronization lock
+     */
+    Object getReplicaSyncLock();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index bfac451..5c324b1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -109,13 +109,12 @@ public class PartitionReplica implements IPartitionReplica {
 
     public synchronized void close() {
         try {
-            if (sc != null && sc.isOpen()) {
-                ReplicationProtocol.sendGoodbye(sc);
-                sc.close();
-                sc = null;
+            if (sc != null) {
+                sendGoodBye();
+                NetworkUtil.closeQuietly(sc);
             }
-        } catch (IOException e) {
-            LOGGER.warn("Failed to close channel", e);
+        } finally {
+            sc = null;
         }
     }
 
@@ -166,4 +165,12 @@ public class PartitionReplica implements IPartitionReplica {
         LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
         this.status = status;
     }
+
+    private void sendGoodBye() {
+        try {
+            ReplicationProtocol.sendGoodbye(sc);
+        } catch (IOException e) {
+            LOGGER.warn("Failed to send good bye to {}", this, e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index fae6ed6..0d97a7a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.replication.sync;
 
+import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -40,6 +43,13 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc
  */
 public class ReplicaFilesSynchronizer {
 
+    private static final Comparator<String> REPLICATED_FILES_COMPARATOR = (file, anotherFile) -> {
+        if (file.endsWith(METADATA_FILE_NAME) && !anotherFile.endsWith(METADATA_FILE_NAME)) {
+            return -1;
+        }
+        return file.compareTo(anotherFile);
+    };
+
     private final PartitionReplica replica;
     private final INcApplicationContext appCtx;
 
@@ -79,6 +89,8 @@ public class ReplicaFilesSynchronizer {
 
     private void replicateMissingFiles(List<String> files) {
         final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        // sort files to ensure index metadata files are replicated first
+        files.sort(REPLICATED_FILES_COMPARATOR);
         files.forEach(sync::replicate);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f3784bb3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 9f397d2..ef85977 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,9 +22,9 @@ import java.io.IOException;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.replication.messaging.ReplicationProtocol;
-import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
 import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
 
 /**
  * Performs the steps required to ensure any newly added replica
@@ -41,9 +41,12 @@ public class ReplicaSynchronizer {
     }
 
     public void sync() throws IOException {
-        syncFiles();
-        checkpointReplicaIndexes();
-        appCtx.getReplicationManager().register(replica);
+        final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
+        synchronized (syncLock) {
+            syncFiles();
+            checkpointReplicaIndexes();
+            appCtx.getReplicationManager().register(replica);
+        }
     }
 
     private void syncFiles() throws IOException {