You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2021/06/30 00:11:34 UTC

[asterixdb] 14/23: [NO ISSUE][STO] Close datasets of flushed indexes after recovery

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit acff469e4363dde3c0aafe3c388f62aad4aa87a7
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Wed May 12 16:41:02 2021 +0300

    [NO ISSUE][STO] Close datasets of flushed indexes after recovery
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - After performing redo of a flush log on any index, close its dataset
      to ensure any cached state that might have been changed during recovery
      is cleared (e.g. the component id generator).
    - Fix LSMFlushRecoveryTest total number of records to be inserted.
    - Update LSMFlushRecoveryTest to check for duplicate component ids.
    
    Change-Id: I29072f475cc7b4d7d6efde415be0329fc568443e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11423
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Michael Blow <mb...@apache.org>
    (cherry picked from commit 0e7e4bdb2c514fe7469a775c2ffc3d9e78e6a317)
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12024
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../org/apache/asterix/app/nc/RecoveryManager.java   | 20 +++++++++++++-------
 .../asterix/test/dataflow/LSMFlushRecoveryTest.java  | 12 ++++++++++--
 .../asterix/common/api/IDatasetLifecycleManager.java |  9 +++++++++
 .../common/context/DatasetLifecycleManager.java      | 11 +++++++++++
 .../common/transactions/IRecoveryManager.java        | 11 -----------
 5 files changed, 43 insertions(+), 20 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 1461ef4..718e4b3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -173,15 +173,14 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         deleteRecoveryTemporaryFiles();
 
         //get active partitions on this node
-        replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN);
+        replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
     }
 
-    @Override
-    public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
-            throws IOException, ACIDException {
+    public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
+            boolean closeOnFlushRedo) throws IOException, ACIDException {
         try {
             Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
-            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet);
+            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet, closeOnFlushRedo);
         } finally {
             logReader.close();
             deleteRecoveryTemporaryFiles();
@@ -277,7 +276,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     }
 
     private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader,
-            long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException {
+            long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException {
         int redoCount = 0;
         long txnId = 0;
 
@@ -299,6 +298,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
 
         ILogRecord logRecord = null;
+        Set<Integer> flushRedoDatasets = new HashSet<>();
         try {
             logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -409,6 +409,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                                 && !index.isCurrentMutableComponentEmpty()) {
                                             // schedule flush
                                             redoFlush(index, logRecord);
+                                            flushRedoDatasets.add(datasetId);
                                             redoCount++;
                                         } else {
                                             // TODO: update checkpoint file?
@@ -441,6 +442,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             for (long r : resourceIdList) {
                 datasetLifecycleManager.close(resourcesMap.get(r).getPath());
             }
+            if (closeOnFlushRedo) {
+                // close datasets of indexes to ensure any cached state that might've been changed by recovery is cleared
+                // e.g. when redoing a flush, the component id generator needs to be reinitialized
+                datasetLifecycleManager.closeDatasets(flushRedoDatasets);
+            }
         }
     }
 
@@ -525,7 +531,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             if (minLSN < readableSmallestLSN) {
                 minLSN = readableSmallestLSN;
             }
-            replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
+            replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
             if (flush) {
                 appCtx.getDatasetLifecycleManager().flushAllDatasets();
             }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index 03ca1f0..dd0bd9a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -23,7 +23,9 @@ import java.lang.reflect.Field;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
@@ -62,6 +64,7 @@ import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -146,8 +149,7 @@ public class LSMFlushRecoveryTest {
         checkComponentIds();
         // insert more records
         createInsertOps();
-        insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT,
-                true);
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
 
         dsInfo.waitForIO();
         checkComponentIds();
@@ -486,8 +488,14 @@ public class LSMFlushRecoveryTest {
         List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndexes[partitionIndex].getDiskComponents();
 
         Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size());
+        Set<ILSMComponentId> uniqueIds = new HashSet<>();
         for (int i = 0; i < primaryDiskComponents.size(); i++) {
             Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId());
+            ILSMComponentId id = primaryDiskComponents.get(i).getId();
+            boolean added = uniqueIds.add(id);
+            if (!added) {
+                throw new IllegalStateException("found duplicate component ids: " + id);
+            }
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 10eddb1..0aad368 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.api;
 
 import java.util.List;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.context.DatasetInfo;
@@ -112,6 +113,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
     List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum);
 
     /**
+     * Attempts to close the datasets in {@code datasetsToClose}
+     *
+     * @param datasetsToClose
+     * @throws HyracksDataException
+     */
+    void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException;
+
+    /**
      * Flushes then closes all open datasets
      */
     void closeAllDatasets() throws HyracksDataException;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index d396d9b..fdf7822 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 
@@ -474,6 +475,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
+    public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+                closeDataset(dsr);
+            }
+        }
+    }
+
+    @Override
     public synchronized void closeAllDatasets() throws HyracksDataException {
         ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
         for (DatasetResource dsr : openDatasets) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index bfe7963..8a5f34e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -85,17 +85,6 @@ public interface IRecoveryManager {
     long getLocalMinFirstLSN() throws HyracksDataException;
 
     /**
-     * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
-     *
-     * @param partitions
-     * @param lowWaterMarkLSN
-     * @throws IOException
-     * @throws ACIDException
-     */
-    void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
-            throws IOException, ACIDException;
-
-    /**
      * Creates a temporary file to be used during recovery
      *
      * @param txnId