You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/05/21 03:35:24 UTC
[4/5] asterixdb git commit: [NO ISSUE][STO] Misc Storage Fixes and
Improvements
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
new file mode 100644
index 0000000..a0ed26e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.datagen.TupleGenerator;
+import org.apache.hyracks.storage.am.common.datagen.IFieldValueGenerator;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class CheckpointInSecondaryIndexTest {
+ static final int REPREAT_TEST_COUNT = 1;
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+ }
+
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+ { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ private static final ARecordType META_TYPE = null;
+ private static final GenerationFunction[] META_GEN_FUNCTION = null;
+ private static final boolean[] UNIQUE_META_FIELDS = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+ private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final int RECORDS_PER_COMPONENT = 500;
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String INDEX_NAME = "TestIdx";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private static final IndexType INDEX_TYPE = IndexType.BTREE;
+ private static final IFieldValueGenerator[] SECONDARY_INDEX_VALUE_GENERATOR =
+ { new AInt64ValueGenerator(), new AInt32ValueGenerator() };
+ private static final List<List<String>> INDEX_FIELD_NAMES =
+ Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1]));
+ private static final List<Integer> INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR);
+ private static final List<IAType> INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);
+ private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+ private static TestNodeController nc;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+ private static Dataset dataset;
+ private static Index secondaryIndex;
+ private static ITransactionContext txnCtx;
+ private static TestLsmBtree primaryLsmBtree;
+ private static TestLsmBtree secondaryLsmBtree;
+ private static PrimaryIndexInfo primaryIndexInfo;
+ private static IHyracksTaskContext taskCtx;
+ private static IIndexDataflowHelper primaryIndexDataflowHelper;
+ private static IIndexDataflowHelper secondaryIndexDataflowHelper;
+ private static LSMInsertDeleteOperatorNodePushable insertOp;
+ private static LSMIndexBulkLoadOperatorNodePushable indexLoadOp;
+ private static IHyracksTaskContext loadTaskCtx;
+ private static SecondaryIndexInfo secondaryIndexInfo;
+ private static Actor actor;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator + "cc-multipart.conf";
+ nc = new TestNodeController(configPath, false);
+ nc.init();
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.out.println("TearDown");
+ nc.deInit();
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Before
+ public void createIndex() throws Exception {
+ List<List<String>> partitioningKeys = new ArrayList<>();
+ partitioningKeys.add(Collections.singletonList("key"));
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+ NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
+ PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES,
+ INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0);
+ taskCtx = null;
+ primaryIndexDataflowHelper = null;
+ secondaryIndexDataflowHelper = null;
+ primaryLsmBtree = null;
+ insertOp = null;
+ JobId jobId = nc.newJobId();
+ txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ actor = null;
+ taskCtx = nc.createTestContext(jobId, 0, false);
+ primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager,
+ KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ primaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0);
+ primaryIndexDataflowHelper.open();
+ primaryLsmBtree = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance();
+ primaryIndexDataflowHelper.close();
+ // This pipeline skips the secondary index
+ insertOp = nc.getInsertPipeline(taskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager, null).getLeft();
+ actor = new Actor("player");
+ // allow all operations
+ StorageTestUtils.allowAllOps(primaryLsmBtree);
+ actor.add(new Request(Request.Action.INSERT_OPEN));
+ }
+
+ @After
+ public void destroyIndex() throws Exception {
+ Request close = new Request(Request.Action.INSERT_CLOSE);
+ actor.add(close);
+ close.await();
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ if (secondaryIndexDataflowHelper != null) {
+ secondaryIndexDataflowHelper.destroy();
+ }
+ primaryIndexDataflowHelper.destroy();
+ actor.stop();
+ }
+
+ @Test
+ public void testCheckpointUpdatedWhenSecondaryIsEmpty() throws Exception {
+ try {
+ // create secondary
+ createSecondaryIndex();
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // and that secondary index is empty
+ Assert.assertTrue(secondaryLsmBtree.isCurrentMutableComponentEmpty());
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+
+ // ensure secondary doesn't have a component
+ Assert.assertEquals(0, secondaryLsmBtree.getDiskComponents().size());
+ // ensure that current memory component index match
+ Assert.assertEquals(secondaryLsmBtree.getCurrentMemoryComponentIndex(),
+ primaryLsmBtree.getCurrentMemoryComponentIndex());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ // secondary ref
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private void createSecondaryIndex()
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ SecondaryIndexInfo secondaryIndexInfo =
+ nc.createSecondaryIndex(primaryIndexInfo, secondaryIndex, storageManager, 0);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
+ secondaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0);
+ secondaryIndexDataflowHelper.open();
+ secondaryLsmBtree = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance();
+ secondaryIndexDataflowHelper.close();
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsSingleComponent() throws Exception {
+ try {
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsTwoComponents() throws Exception {
+ try {
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ Assert.assertEquals(2, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ id = (LSMComponentId) primaryDiskComponent.getId();
+ min = id.getMaxId();
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsEmpty() throws Exception {
+ try {
+ // ensure primary has no component
+ Assert.assertEquals(0, primaryLsmBtree.getDiskComponents().size());
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(),
+ latestPrimaryCheckpoint.getLastComponentId());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsNotEmpty() throws Exception {
+ try {
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+ return ncAppCtx.getIndexCheckpointManagerProvider();
+ }
+
+ private void ensureDone(Actor actor) throws InterruptedException {
+ Request req = new Request(Request.Action.DUMMY);
+ actor.add(req);
+ req.await();
+ }
+
+ private static class Request {
+ enum Action {
+ DUMMY,
+ INSERT_OPEN,
+ LOAD_OPEN,
+ INSERT_PATCH,
+ INDEX_LOAD_PATCH,
+ FLUSH_DATASET,
+ INSERT_CLOSE,
+ LOAD_CLOSE,
+ }
+
+ private final Action action;
+ private volatile boolean done;
+
+ public Request(Action action) {
+ this.action = action;
+ done = false;
+ }
+
+ synchronized void complete() {
+ done = true;
+ notifyAll();
+ }
+
+ synchronized void await() throws InterruptedException {
+ while (!done) {
+ wait();
+ }
+ }
+ }
+
+ public class Actor extends SingleThreadEventProcessor<Request> {
+ private final RecordTupleGenerator primaryInsertTupleGenerator;
+ private final FrameTupleAppender tupleAppender;
+
+ public Actor(String name) throws HyracksDataException {
+ super(name);
+ primaryInsertTupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ tupleAppender = new FrameTupleAppender(new VSizeFrame(taskCtx));
+ }
+
+ @Override
+ protected void handle(Request req) throws Exception {
+ try {
+ switch (req.action) {
+ case FLUSH_DATASET:
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ break;
+ case INSERT_CLOSE:
+ insertOp.close();
+ break;
+ case INSERT_OPEN:
+ insertOp.open();
+ break;
+ case LOAD_OPEN:
+ indexLoadOp.open();
+ break;
+ case LOAD_CLOSE:
+ indexLoadOp.close();
+ break;
+ case INSERT_PATCH:
+ for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ ITupleReference tuple = primaryInsertTupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ StorageTestUtils.waitForOperations(primaryLsmBtree);
+ break;
+ case INDEX_LOAD_PATCH:
+ TupleGenerator secondaryLoadTupleGenerator =
+ new TupleGenerator(SECONDARY_INDEX_VALUE_GENERATOR, secondaryIndexInfo.getSerdes(), 0);
+ FrameTupleAppender secondaryTupleAppender = new FrameTupleAppender(new VSizeFrame(loadTaskCtx));
+ for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ ITupleReference tuple = secondaryLoadTupleGenerator.next();
+ DataflowUtils.addTupleToFrame(secondaryTupleAppender, tuple, indexLoadOp);
+ }
+ if (secondaryTupleAppender.getTupleCount() > 0) {
+ secondaryTupleAppender.write(indexLoadOp, true);
+ }
+ break;
+ default:
+ break;
+ }
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ } finally {
+ req.complete();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a33bda1..017c59f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -27,7 +27,7 @@ import java.util.function.Predicate;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -122,7 +122,7 @@ public class ComponentRollbackTest {
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -190,7 +190,7 @@ public class ComponentRollbackTest {
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -272,7 +272,7 @@ public class ComponentRollbackTest {
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -356,7 +356,7 @@ public class ComponentRollbackTest {
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -411,7 +411,7 @@ public class ComponentRollbackTest {
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -474,7 +474,7 @@ public class ComponentRollbackTest {
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -533,7 +533,7 @@ public class ComponentRollbackTest {
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -594,7 +594,7 @@ public class ComponentRollbackTest {
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -664,7 +664,7 @@ public class ComponentRollbackTest {
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
index eb16cf4..b618727 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -22,7 +22,7 @@ import java.io.File;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -118,7 +118,7 @@ public class IoCallbackFailureTest {
throws Exception {
NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
boolean failed = false;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index e4623fd..79e6368 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -92,7 +92,7 @@ public class LSMFlushRecoveryTest {
private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
private static ITransactionContext txnCtx;
private static LSMInsertDeleteOperatorNodePushable[] insertOps;
- private static TupleGenerator tupleGenerator;
+ private static RecordTupleGenerator tupleGenerator;
private static final int NUM_PARTITIONS = 2;
private static final int PARTITION_0 = 0;
@@ -478,6 +478,8 @@ public class LSMFlushRecoveryTest {
ILSMMemoryComponent primaryMemComponent = primaryIndexes[partitionIndex].getCurrentMemoryComponent();
ILSMMemoryComponent secondaryMemComponent = secondaryIndexes[partitionIndex].getCurrentMemoryComponent();
Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId());
+ Assert.assertEquals(primaryIndexes[partitionIndex].getCurrentMemoryComponentIndex(),
+ secondaryIndexes[partitionIndex].getCurrentMemoryComponentIndex());
}
List<ILSMDiskComponent> primaryDiskComponents = primaryIndexes[partitionIndex].getDiskComponents();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 2121327..e2c99b0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -26,10 +26,9 @@ import java.util.List;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
@@ -37,10 +36,7 @@ import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
@@ -113,22 +109,20 @@ public class LogMarkerTest {
StorageComponentProvider storageManager = new StorageComponentProvider();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
- NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
- partitioningKeys, null, null, null, false, null),
- null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+ PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE,
+ META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
JobId jobId = nc.newJobId();
IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
- LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
- RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp =
+ nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator =
+ new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
VSizeFrame marker = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
@@ -178,9 +172,9 @@ public class LogMarkerTest {
nc.newJobId();
TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
Collections.emptyList(), Collections.emptyList(), false);
- IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
- META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST,
- storageManager);
+ IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, StorageTestUtils.DATASET, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager);
emptyTupleOp.open();
emptyTupleOp.close();
Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 1795c93..a7225a1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -619,14 +619,14 @@ public class MultiPartitionLSMIndexTest {
public class Actor extends SingleThreadEventProcessor<Request> {
private final int partition;
- private final TupleGenerator tupleGenerator;
+ private final RecordTupleGenerator tupleGenerator;
private final VSizeFrame frame;
private final FrameTupleAppender tupleAppender;
public Actor(String name, int partition) throws HyracksDataException {
super(name);
this.partition = partition;
- tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
frame = new VSizeFrame(taskCtxs[partition]);
tupleAppender = new FrameTupleAppender(frame);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 72026a2..61c1fb2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -26,8 +26,8 @@ import java.util.List;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -161,8 +161,8 @@ public class SearchCursorComponentSwitchTest {
// except search
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+ KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
Searcher firstSearcher = null;
@@ -207,8 +207,8 @@ public class SearchCursorComponentSwitchTest {
// except search
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+ KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
Searcher firstSearcher = null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index d08fc72..589e8b2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -33,9 +33,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.DatasetInfo;
@@ -100,7 +100,7 @@ public class StorageTestUtils {
private StorageTestUtils() {
}
- static void allowAllOps(TestLsmBtree lsmBtree) {
+ public static void allowAllOps(TestLsmBtree lsmBtree) {
lsmBtree.clearModifyCallbacks();
lsmBtree.clearFlushCallbacks();
lsmBtree.clearSearchCallbacks();
@@ -118,6 +118,12 @@ public class StorageTestUtils {
KEY_INDICATORS_LIST, partition);
}
+ public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, Dataset dataset, int partition)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES,
+ KEY_INDICATORS_LIST, partition);
+ }
+
public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
return getInsertPipeline(nc, ctx, null);
@@ -131,13 +137,27 @@ public class StorageTestUtils {
}
public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+ Dataset dataset, Index secondaryIndex, IndexOperation op)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft();
+ }
+
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
}
- public static TupleGenerator getTupleGenerator() {
- return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+ Dataset dataset, Index secondaryIndex)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
+ }
+
+ public static RecordTupleGenerator getTupleGenerator() {
+ return new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
}
@@ -146,6 +166,11 @@ public class StorageTestUtils {
searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords);
}
+ public static void searchAndAssertCount(TestNodeController nc, Dataset dataset, int partition, int numOfRecords)
+ throws HyracksDataException, AlgebricksException {
+ searchAndAssertCount(nc, partition, dataset, STORAGE_MANAGER, numOfRecords);
+ }
+
public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
StorageComponentProvider storageManager, int numOfRecords)
throws HyracksDataException, AlgebricksException {
@@ -182,6 +207,11 @@ public class StorageTestUtils {
flushPartition(dslLifecycleMgr, lsmBtree, DATASET, async);
}
+ public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree,
+ boolean async) throws Exception {
+ flushPartition(dslLifecycleMgr, lsmBtree, dataset, async);
+ }
+
public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
boolean async) throws Exception {
waitForOperations(lsmBtree);
@@ -211,6 +241,11 @@ public class StorageTestUtils {
flush(dsLifecycleMgr, lsmBtree, DATASET, async);
}
+ public static void flush(IDatasetLifecycleManager dsLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree,
+ boolean async) throws Exception {
+ flush(dsLifecycleMgr, lsmBtree, dataset, async);
+ }
+
public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
boolean async) throws Exception {
waitForOperations(lsmBtree);
@@ -240,6 +275,11 @@ public class StorageTestUtils {
this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords);
}
+ public Searcher(TestNodeController nc, Dataset dataset, int partition, TestLsmBtree lsmBtree,
+ int numOfRecords) {
+ this(nc, partition, dataset, STORAGE_MANAGER, lsmBtree, numOfRecords);
+ }
+
public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
TestLsmBtree lsmBtree, int numOfRecords) {
lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index 20875a3..bcf68b5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -21,6 +21,7 @@ package org.apache.asterix.test.dataflow;
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -28,9 +29,14 @@ import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
+import org.apache.asterix.transaction.management.runtime.CommitRuntime;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
@@ -48,6 +54,19 @@ public class TestDataset extends Dataset {
}
@Override
+ public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
+ int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
+ return new IPushRuntimeFactory() {
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new IPushRuntime[] { new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()),
+ getDatasetId(), primaryKeyFieldPermutation, true,
+ ctx.getTaskAttemptId().getTaskId().getPartition(), true) };
+ }
+ };
+ }
+
+ @Override
public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
throws AlgebricksException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
index 7a3e475..bee2f8d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Semaphore;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -68,7 +68,7 @@ public class TransactionAbortTest {
private static IHyracksTaskContext abortCtx;
private static ITransactionContext abortTxnCtx;
private static LSMInsertDeleteOperatorNodePushable abortOp;
- private static TupleGenerator tupleGenerator;
+ private static RecordTupleGenerator tupleGenerator;
@Rule
public TestRule watcher = new TestMethodTracer();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 418282e..3634bf1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.test.logging;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -26,10 +30,9 @@ import java.util.Collections;
import java.util.List;
import org.apache.asterix.app.bootstrap.TestNodeController;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.RecoveryManager;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.Checkpoint;
@@ -43,14 +46,12 @@ import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.dataflow.StorageTestUtils;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
@@ -60,17 +61,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-
public class CheckpointingTest {
private static final String TEST_CONFIG_FILE_NAME = "cc-small-txn-log-partition.conf";
@@ -116,23 +112,21 @@ public class CheckpointingTest {
nc.init();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
- NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
- partitioningKeys, null, null, null, false, null),
- null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
- KEY_INDICATOR_LIST, 0);
+ nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager,
+ KEY_INDEXES, KEY_INDICATOR_LIST, 0);
JobId jobId = nc.newJobId();
IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false);
ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
- RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp =
+ nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator =
+ new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
@@ -197,8 +191,9 @@ public class CheckpointingTest {
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
- RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp2 =
+ nc.getInsertPipeline(ctx2, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
insertOp2.open();
VSizeFrame frame2 = new VSizeFrame(ctx2);
FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
@@ -220,6 +215,7 @@ public class CheckpointingTest {
}
}
Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread th, Throwable ex) {
threadException = true;
exception = ex;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index c7ae2df..62c882d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -107,6 +106,10 @@ public class ExecutionTestUtil {
tearDown(cleanup, integrationUtil, true);
}
+ public static void tearDown(boolean cleanup, boolean stopHdfs) throws Exception {
+ tearDown(cleanup, integrationUtil, stopHdfs);
+ }
+
public static void tearDown(boolean cleanup, AsterixHyracksIntegrationUtil integrationUtil, boolean stopHdfs)
throws Exception {
// validateBufferCacheState(); <-- Commented out until bug is fixed -->
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 9eb6259..b6581ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IReplicaManager;
+import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -65,7 +66,8 @@ public interface INcApplicationContext extends IApplicationContext {
IResourceIdFactory getResourceIdFactory();
- void initialize(boolean initialRun) throws IOException, AlgebricksException;
+ void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun)
+ throws IOException, AlgebricksException;
void setShuttingdown(boolean b);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index f4d764a..6e2e320 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -55,7 +55,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
this.setRegistered(false);
this.setMemoryAllocated(false);
this.logManager = logManager;
- waitLog.setLogType(LogType.WAIT);
+ waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e5d18cf..50a4bef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,9 +23,7 @@ import static org.apache.asterix.common.metadata.MetadataIndexImmutablePropertie
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +32,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -42,19 +40,16 @@ import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -70,7 +65,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
private final ILocalResourceRepository resourceRepository;
private final IDatasetMemoryManager memoryManager;
private final ILogManager logManager;
- private final LogRecord logRecord;
+ private final LogRecord waitLog;
private final int numPartitions;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
@@ -84,7 +79,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
this.memoryManager = memoryManager;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.numPartitions = numPartitions;
- logRecord = new LogRecord();
+ waitLog = new LogRecord();
+ waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
+ waitLog.computeAndSetLogSize();
}
@Override
@@ -371,7 +368,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
@Override
public synchronized void flushAllDatasets() throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
- flushDatasetOpenIndexes(dsr, false);
+ if (dsr.getDatasetInfo().isOpen()) {
+ flushDatasetOpenIndexes(dsr, false);
+ }
}
}
@@ -423,77 +422,48 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
*/
private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
DatasetInfo dsInfo = dsr.getDatasetInfo();
+ if (!dsInfo.isOpen()) {
+ throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed");
+ }
if (dsInfo.isExternal()) {
// no memory components for external dataset
return;
}
+ // ensure all in-flight flushes gets scheduled
+ logManager.log(waitLog);
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
// flush each partition one by one
if (primaryOpTracker.getNumActiveOperations() > 0) {
throw new IllegalStateException(
"flushDatasetOpenIndexes is called on a dataset with currently active operations");
}
- int partition = primaryOpTracker.getPartition();
- Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
- ILSMIndex flushIndex = null;
- for (ILSMIndex lsmIndex : indexes) {
- if (!lsmIndex.isCurrentMutableComponentEmpty()) {
- flushIndex = lsmIndex;
- break;
- }
- }
- if (flushIndex == null) {
- // all open indexes are empty, nothing to flush
- continue;
- }
- LSMComponentId componentId = (LSMComponentId) flushIndex.getCurrentMemoryComponent().getId();
- ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
- idGenerator.refresh();
- if (dsInfo.isDurable()) {
- synchronized (logRecord) {
- TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), partition,
- componentId.getMinId(), componentId.getMaxId(), null);
- try {
- logManager.log(logRecord);
- } catch (ACIDException e) {
- throw new HyracksDataException("could not write flush log while closing dataset", e);
- }
-
- try {
- //notification will come from LogBuffer class (notifyFlushTerminator)
- logRecord.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
- }
- }
- }
- long flushLsn = logRecord.getLSN();
- ILSMComponentId nextComponentId = idGenerator.getId();
- Map<String, Object> flushMap = new HashMap<>();
- flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
- flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
- for (ILSMIndex index : indexes) {
- ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.getOpContext().setParameters(flushMap);
- accessor.scheduleFlush();
- }
- if (!asyncFlush) {
- // Wait for the above flush op.
- dsInfo.waitForIO();
+ primaryOpTracker.setFlushOnExit(true);
+ primaryOpTracker.flushIfNeeded();
+ }
+ // ensure requested flushes were scheduled
+ logManager.log(waitLog);
+ if (!asyncFlush) {
+ List<FlushOperation> flushes = new ArrayList<>();
+ for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+ flushes.addAll(primaryOpTracker.getScheduledFlushes());
}
+ LSMIndexUtil.waitFor(flushes);
}
}
private void closeDataset(DatasetResource dsr) throws HyracksDataException {
// First wait for any ongoing IO operations
DatasetInfo dsInfo = dsr.getDatasetInfo();
- dsInfo.waitForIO();
try {
flushDatasetOpenIndexes(dsr, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
+ // wait for merges that were scheduled due to the above flush
+ // ideally, we shouldn't need this since merges should still work.
+ // They don't need a special memory budget but there is a problem
+ // for some merge policies that need to access dataset info (correlated prefix)
+ dsInfo.waitForIO();
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
closeIndex(iInfo);
}
@@ -505,7 +475,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
public synchronized void closeAllDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
- closeDataset(dsr);
+ if (dsr.isOpen()) {
+ closeDataset(dsr);
+ }
}
}
@@ -612,7 +584,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
@Override
public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
- if (replicationStrategy.isMatch(dsr.getDatasetID())) {
+ if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
flushDatasetOpenIndexes(dsr, false);
}
}