You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/09/24 14:32:19 UTC

[6/7] asterixdb git commit: [NO ISSUE][REPL] Ensure Valid Component ID is Initialized On Replica Sync

[NO ISSUE][REPL] Ensure Valid Component ID is Initialized On Replica Sync

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Currently, the first time a replica is synchronized from master,
  the valid component id on each replicated index's initial checkpoint
  will be the initial value of a component id (-1). This value is
  fixed when the the replica receives a flushed component from
  the index. However, if the master fails before any component is
  flushed to a replica and that replica is promoted to master, it
  will start from an invalid component id. This change ensures that
  the initial checkpoint of replicated indexes is initialized to
  the maximum component id that appears on master. This will ensure
  that if the replica is promoted, it will at least start from
  a component that wasn't previously used on master.
- Replace assertion of component ids validation by illegal state.

Change-Id: I85395ad823a630725c4cab4bead1c61546dc61ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2973
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/bd728afe
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/bd728afe
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/bd728afe

Branch: refs/heads/master
Commit: bd728afe9be8c245e1de24936f8084ba3f537d05
Parents: f7d634c
Author: Murtadha Hubail <mh...@apache.org>
Authored: Thu Sep 20 20:56:01 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Thu Sep 20 12:30:12 2018 -0700

----------------------------------------------------------------------
 .../asterix/app/nc/IndexCheckpointManager.java  |  5 +++--
 .../common/storage/IIndexCheckpointManager.java |  3 ++-
 .../asterix/common/storage/IndexCheckpoint.java |  5 ++---
 .../CheckpointPartitionIndexesTask.java         | 10 +++++++---
 .../messaging/ReplicateFileTask.java            |  4 +++-
 .../replication/sync/ReplicaSynchronizer.java   | 12 +++++++++++-
 .../PersistentLocalResourceRepository.java      | 20 ++++++++++++++++++--
 .../am/lsm/common/impls/AbstractLSMIndex.java   | 10 +++++-----
 8 files changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 7b08bad..420585a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -55,7 +55,8 @@ public class IndexCheckpointManager implements IIndexCheckpointManager {
     }
 
     @Override
-    public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
+    public synchronized void init(long validComponentSequence, long lsn, long validComponentId)
+            throws HyracksDataException {
         List<IndexCheckpoint> checkpoints;
         try {
             checkpoints = getCheckpoints();
@@ -66,7 +67,7 @@ public class IndexCheckpointManager implements IIndexCheckpointManager {
             LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
             delete();
         }
-        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
+        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
         persist(firstCheckpoint);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index dd9ede5..2f0eddf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -29,9 +29,10 @@ public interface IIndexCheckpointManager {
      *
      * @param validComponentSequence
      * @param lsn
+     * @param validComponentId
      * @throws HyracksDataException
      */
-    void init(long validComponentSequence, long lsn) throws HyracksDataException;
+    void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 9654473..cb34600 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,12 +41,12 @@ public class IndexCheckpoint {
     private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
 
-    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
+    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
         firstCheckpoint.validComponentSequence = lastComponentSequence;
-        firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
+        firstCheckpoint.lastComponentId = validComponentId;
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         return firstCheckpoint;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 448613b..e778cce 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -45,9 +45,11 @@ import org.apache.hyracks.storage.common.LocalResource;
 public class CheckpointPartitionIndexesTask implements IReplicaTask {
 
     private final int partition;
+    private final long maxComponentId;
 
-    public CheckpointPartitionIndexesTask(int partition) {
+    public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
         this.partition = partition;
+        this.maxComponentId = maxComponentId;
     }
 
     @Override
@@ -75,7 +77,7 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {
                 maxComponentSequence =
                         Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
             }
-            indexCheckpointManager.init(maxComponentSequence, currentLSN);
+            indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId);
         }
         ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
     }
@@ -90,6 +92,7 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {
         try {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
+            dos.writeLong(maxComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -98,7 +101,8 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {
     public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException {
         try {
             int partition = input.readInt();
-            return new CheckpointPartitionIndexesTask(partition);
+            long maxComponentId = input.readLong();
+            return new CheckpointPartitionIndexesTask(partition, maxComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index f53d448..ae36c13 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -40,6 +40,7 @@ import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -98,7 +99,8 @@ public class ReplicateFileTask implements IReplicaTask {
         final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
-        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
+        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN,
+                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index ef85977..09f1205 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -25,6 +25,8 @@ import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Performs the steps required to ensure any newly added replica
@@ -60,9 +62,17 @@ public class ReplicaSynchronizer {
     }
 
     private void checkpointReplicaIndexes() throws IOException {
+        final int partition = replica.getIdentifier().getPartition();
         CheckpointPartitionIndexesTask task =
-                new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+                new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
         ReplicationProtocol.sendTo(replica, task);
         ReplicationProtocol.waitForAck(replica);
     }
+
+    private long getPartitionMaxComponentId(int partition) throws HyracksDataException {
+        final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index c0da095..8f870c0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -66,8 +66,8 @@ import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.ExitUtil;
@@ -196,7 +196,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             final Path path = Paths.get(resourceFile.getAbsolutePath());
             Files.write(path, bytes);
-            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
+            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0,
+                    LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
             deleteResourceFileMask(resourceFile);
         } catch (Exception e) {
             cleanup(resourceFile);
@@ -393,6 +394,21 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         return partitionReplicatedFiles;
     }
 
+    public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+            throws HyracksDataException {
+        long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+        final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                final IIndexCheckpointManager indexCheckpointManager =
+                        indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+                maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId());
+            }
+        }
+        return maxComponentId;
+    }
+
     private List<String> getIndexFiles(File indexDir) {
         final List<String> indexFiles = new ArrayList<>();
         if (indexDir.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bd728afe/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3d928a4..9199fbb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -577,7 +577,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
-        assert checkComponentIds();
+        validateComponentIds();
     }
 
     @Override
@@ -588,7 +588,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         if (newComponent != EmptyComponent.INSTANCE) {
             diskComponents.add(swapIndex, newComponent);
         }
-        assert checkComponentIds();
+        validateComponentIds();
     }
 
     /**
@@ -597,16 +597,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
      *
      * @throws HyracksDataException
      */
-    private boolean checkComponentIds() throws HyracksDataException {
+    private void validateComponentIds() throws HyracksDataException {
         for (int i = 0; i < diskComponents.size() - 1; i++) {
             ILSMComponentId id1 = diskComponents.get(i).getId();
             ILSMComponentId id2 = diskComponents.get(i + 1).getId();
             IdCompareResult cmp = id1.compareTo(id2);
             if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
-                return false;
+                throw new IllegalStateException(
+                        "found non-decreasing component ids (" + id1 + " -> " + id2 + ") on index " + this);
             }
         }
-        return true;
     }
 
     @Override