You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2022/01/26 21:13:14 UTC

[asterixdb] branch master updated: [NO ISSUE][STO] Fix search when switching from memory to disk component

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cced3e  [NO ISSUE][STO] Fix search when switching from memory to disk component
     new f2b2ac1  Merge branch 'gerrit/neo'
4cced3e is described below

commit 4cced3e30c31d8330e69f0a10f5b0a46ee13d7bf
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Mon Jan 24 20:40:17 2022 -0800

    [NO ISSUE][STO] Fix search when switching from memory to disk component
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - When searching the index and making the switch from the memory
      components to the disk components, keep the states of the queue and
      the cursors on the switched-to disk components the same as their
      states were on the memory components. If a cursor was the one who
      produced the outputElement, then do not push the next element into
      the queue from the cursor since there should not be an element in
      the queue from this cursor. Restart the search operation at the
      elements that the cursors were at and consume them since they were
      already consumed before we make the switch.
    
    - add test case.
    
    Change-Id: I647641f6044c1edf1477049be1c5d1b697f404c1
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14885
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../asterix/app/bootstrap/TestNodeController.java  | 36 +++++----
 .../dataflow/SearchCursorComponentSwitchTest.java  | 87 +++++++++++++++++++++-
 .../lsm/btree/impls/LSMBTreeRangeSearchCursor.java | 35 +++++++--
 .../common/impls/ComponentReplacementContext.java  |  2 +-
 .../am/lsm/common/impls/LSMIndexSearchCursor.java  |  3 +-
 5 files changed, 138 insertions(+), 25 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f40628fb..0b80881 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@ import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@ import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@ public class TestNodeController {
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
         // fix pk fields
-        int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
+        int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length);
         int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
         for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
-            pkFieldsInCommitOp[i] = diff + i;
+            pkFieldsInCommitOp[i] = start++;
         }
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
                 true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@ public class TestNodeController {
 
     private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
             ARecordType itemType, ARecordType metaItemType) throws Exception {
-        ITypeTraits[] outputTypeTraits =
-                new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
-                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        // 1 boolean field at the beginning to indicate whether the operation was upsert or delete
+        int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount();
+        ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
+        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields];
 
-        // add the previous record first
+        ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+        ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
         int f = 0;
-        outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        // add the upsert indicator boolean field
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
+        f++;
+        // add the previous record
+        outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
+        outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
         f++;
         // add the previous meta second
         if (dataset.hasMetaPart()) {
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
             f++;
         }
         // add the previous filter third
@@ -854,10 +864,8 @@ public class TestNodeController {
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                    .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                    .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 24379a3..c5292d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.test.dataflow;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -45,6 +44,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -84,7 +94,7 @@ public class SearchCursorComponentSwitchTest {
     private static final boolean[] UNIQUE_META_FIELDS = null;
     private static final int[] KEY_INDEXES = { 0 };
     private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
-    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final List<Integer> KEY_INDICATORS_LIST = List.of(Index.RECORD_INDICATOR);
     private static final int TOTAL_NUM_OF_RECORDS = 2000;
     private static final int RECORDS_PER_COMPONENT = 1000;
     private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@ public class SearchCursorComponentSwitchTest {
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMPrimaryInsertOperatorNodePushable insertOp;
+    private static LSMPrimaryUpsertOperatorNodePushable upsertOp;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -143,6 +154,8 @@ public class SearchCursorComponentSwitchTest {
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
+        upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
     }
 
     @After
@@ -202,6 +215,63 @@ public class SearchCursorComponentSwitchTest {
     }
 
     @Test
+    public void testCursorSwitchSucceedWithNoDuplicates() {
+        try {
+            StorageTestUtils.allowAllOps(lsmBtree);
+            lsmBtree.clearSearchCallbacks();
+            RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+                    KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE + 2;
+            ITupleReference[] upsertTuples = new ITupleReference[totalNumRecords];
+            for (int j = 0; j < totalNumRecords; j++) {
+                ITupleReference tuple = tupleGenerator.next();
+                upsertTuples[j] = TupleUtils.copyTuple(tuple);
+            }
+
+            // upsert and flush the tuples to create a disk component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, true);
+            // upsert but don't flush the tuples to create a memory component
+            upsert(tupleAppender, totalNumRecords, upsertTuples, false);
+
+            // do the search operation
+            ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(lsmBtree.getHarness(),
+                    lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), LSMBTreeSearchCursor::new);
+            IIndexCursor searchCursor = accessor.createSearchCursor(false);
+            MultiComparator lowKeySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            MultiComparator highKeySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+            RangePredicate rangePredicate =
+                    new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp, null, null);
+
+            accessor.search(searchCursor, rangePredicate);
+
+            int count = 0;
+            while (searchCursor.hasNext()) {
+                searchCursor.next();
+                count++;
+                // flush the memory component to disk so that we make the switch to it when we hit the switch cycle
+                if (count == 1) {
+                    StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+                }
+            }
+
+            Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
+            failure = CleanupUtils.destroy(failure, searchCursor, accessor);
+            Assert.assertEquals("Records count not matching", totalNumRecords, count);
+            if (failure != null) {
+                Assert.fail(failure.getMessage());
+            }
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
     public void testCursorSwitchFails() {
         try {
             // allow all operations
@@ -268,4 +338,17 @@ public class SearchCursorComponentSwitchTest {
         emptyTupleOp.close();
         Assert.assertEquals(numOfRecords, countOp.getCount());
     }
+
+    private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, ITupleReference[] upsertTuples,
+            boolean flush) throws Exception {
+        upsertOp.open();
+        for (int j = 0; j < totalNumRecords; j++) {
+            DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], upsertOp);
+        }
+        tupleAppender.write(upsertOp, true);
+        if (flush) {
+            StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+        }
+        upsertOp.close();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 46d279f..2c5fb50 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -236,10 +236,15 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
                     rangeCursors[i].close();
                     btreeAccessors[i].reset(btree, iap);
                     btreeAccessors[i].search(rangeCursors[i], reusablePred);
-                    pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+                    // consume the element that we restarted the search at since before the switch it was consumed
+                    if (rangeCursors[i].hasNext()) {
+                        rangeCursors[i].next();
+                        switchedElements[i].reset(rangeCursors[i].getTuple());
+                    }
                 }
             }
             switchRequest[i] = false;
+            switchedElements[i] = null;
             // any failed switch makes further switches pointless
             switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
         }
@@ -264,14 +269,18 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
                 if (replaceFrom < 0) {
                     replaceFrom = i;
                 }
-                // we return the outputElement to the priority queue if it came from this component
+
+                PriorityQueueElement element;
                 if (outputElement != null && outputElement.getCursorIndex() == i) {
-                    pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
-                    needPushElementIntoQueue = false;
-                    outputElement = null;
-                    canCallProceed = true;
+                    // there should be no element from this cursor in the queue since the element was polled
+                    if (findElement(outputPriorityQueue, i) != null) {
+                        throw new IllegalStateException("found element in the queue from the cursor of output element");
+                    }
+                    element = outputElement;
+                } else {
+                    element = findElement(outputPriorityQueue, i);
                 }
-                PriorityQueueElement element = remove(outputPriorityQueue, i);
+
                 // if this cursor is still active (has an element)
                 // then we copy the search key to restart the operation after
                 // replacing the component
@@ -331,6 +340,18 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
         return null;
     }
 
+    private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
+        // Scans the PQ for the component's element
+        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+        while (it.hasNext()) {
+            PriorityQueueElement e = it.next();
+            if (e.getCursorIndex() == cursorIndex) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
         LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index 70f6d9e..5b0363a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -193,7 +193,7 @@ public class ComponentReplacementContext implements ILSMIndexOperationContext {
             for (int i = 0; i < count; i++) {
                 ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]);
                 if (removed.getType() == LSMComponentType.MEMORY) {
-                    LOGGER.info("Removed a memory component from the search operation");
+                    LOGGER.debug("Removed memory component {} from the search operation", removed);
                 } else {
                     throw new IllegalStateException("Disk components can't be removed from the search operation");
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index b6f6e26..27875c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -40,7 +40,7 @@ import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
-    protected static final int SWITCH_COMPONENT_CYCLE = 100;
+    public static final int SWITCH_COMPONENT_CYCLE = 100;
     protected final ILSMIndexOperationContext opCtx;
     protected final boolean returnDeletedTuples;
     protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@ public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implement
         needPushElementIntoQueue = false;
         for (int i = 0; i < switchRequest.length; i++) {
             switchRequest[i] = false;
+            switchedElements[i] = null;
         }
         try {
             if (outputPriorityQueue != null) {