You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/10/04 05:53:13 UTC
[4/4] asterixdb git commit: [NO ISSUE][STO] Component Deletes Through
flushes and merges
[NO ISSUE][STO] Component Deletes Through flushes and merges
- user model changes: no
- storage format changes: no
- interface changes: yes
- moved validation of component from the index:
- ILSMIndex and all of its implementations
to the component:
- ILSMDiskComponent and all of its implementations
details:
- This change enables component level deletes.
Change-Id: I178656207bfa1d15e6ae5ff2403a16df33940773
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2017
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b2e50b7d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b2e50b7d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b2e50b7d
Branch: refs/heads/master
Commit: b2e50b7d1dc233f7c34af24083f3f3fded7f9d05
Parents: 9f04f97
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Tue Oct 3 10:10:54 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Tue Oct 3 22:52:35 2017 -0700
----------------------------------------------------------------------
asterixdb/asterix-app/pom.xml | 17 +-
.../apache/asterix/app/nc/RecoveryManager.java | 2 +-
.../app/bootstrap/TestNodeController.java | 120 +--
.../test/dataflow/ComponentRollbackTest.java | 889 +++++++++++++++++++
.../asterix/test/dataflow/LogMarkerTest.java | 19 +-
.../asterix/test/dataflow/TestDataset.java | 67 ++
.../TestLsmBTreeResourceFactoryProvider.java | 171 ++++
.../TestLsmBtreeIoOpCallbackFactory.java | 125 +++
.../context/CorrelatedPrefixMergePolicy.java | 4 +-
.../AbstractLSMIOOperationCallback.java | 15 +-
.../PrimaryIndexLogMarkerCallback.java | 30 +-
.../CorrelatedPrefixMergePolicyTest.java | 2 +-
.../LSMSecondaryIndexBulkLoadNodePushable.java | 4 +-
asterixdb/pom.xml | 6 +
.../hyracks/api/exceptions/ErrorCode.java | 4 +
.../src/main/resources/errormsg/en.properties | 4 +
.../am/common/ophelpers/IndexOperation.java | 4 +-
.../am/lsm/btree/impls/ExternalBTree.java | 13 +-
.../lsm/btree/impls/ExternalBTreeWithBuddy.java | 31 +-
.../storage/am/lsm/btree/impls/LSMBTree.java | 42 +-
.../lsm/btree/impls/LSMBTreeDiskComponent.java | 11 +
.../lsm/btree/impls/LSMBTreeFlushOperation.java | 8 +-
.../lsm/btree/impls/LSMBTreeMergeOperation.java | 10 +-
.../impls/LSMBTreeWithBuddyDiskComponent.java | 8 +
.../impls/LSMBTreeWithBuddyMergeOperation.java | 12 +-
.../am/lsm/common/api/ILSMDiskComponent.java | 11 +
.../storage/am/lsm/common/api/ILSMHarness.java | 11 +
.../am/lsm/common/api/ILSMIOOperation.java | 9 +-
.../storage/am/lsm/common/api/ILSMIndex.java | 11 +-
.../am/lsm/common/api/ILSMIndexAccessor.java | 15 +
.../lsm/common/impls/AbstractIoOperation.java | 1 +
.../common/impls/AbstractLSMDiskComponent.java | 6 +-
.../am/lsm/common/impls/AbstractLSMIndex.java | 84 +-
.../lsm/common/impls/ConstantMergePolicy.java | 4 +-
.../am/lsm/common/impls/EmptyComponent.java | 85 ++
.../impls/EmptyDiskComponentMetadata.java | 51 ++
.../lsm/common/impls/ExternalIndexHarness.java | 4 +-
.../am/lsm/common/impls/FlushOperation.java | 10 +-
.../am/lsm/common/impls/LSMComponentState.java | 27 -
.../storage/am/lsm/common/impls/LSMHarness.java | 110 ++-
.../lsm/common/impls/LSMTreeIndexAccessor.java | 13 +-
.../am/lsm/common/impls/MergeOperation.java | 7 +-
.../am/lsm/common/impls/PrefixMergePolicy.java | 6 +-
.../am/lsm/common/impls/TracedIOOperation.java | 8 +-
.../am/lsm/common/util/ComponentUtils.java | 178 ++++
.../am/lsm/common/util/IOOperationUtils.java | 43 +
.../lsm/common/utils/ComponentMetadataUtil.java | 155 ----
.../invertedindex/impls/LSMInvertedIndex.java | 45 +-
.../impls/LSMInvertedIndexAccessor.java | 12 +
.../impls/LSMInvertedIndexDiskComponent.java | 15 +
.../impls/LSMInvertedIndexFlushOperation.java | 7 +-
.../impls/LSMInvertedIndexMergeOperation.java | 11 +-
.../am/lsm/rtree/impls/AbstractLSMRTree.java | 5 -
.../am/lsm/rtree/impls/ExternalRTree.java | 16 +-
.../storage/am/lsm/rtree/impls/LSMRTree.java | 32 +-
.../lsm/rtree/impls/LSMRTreeDiskComponent.java | 12 +
.../lsm/rtree/impls/LSMRTreeFlushOperation.java | 8 +-
.../lsm/rtree/impls/LSMRTreeMergeOperation.java | 11 +-
.../impls/LSMRTreeWithAntiMatterTuples.java | 26 +-
.../hyracks-storage-am-lsm-btree-test/pom.xml | 21 +
.../am/lsm/btree/LSMBTreeFileManagerTest.java | 6 +-
.../btree/LSMBTreeFilterMergeTestDriver.java | 6 +-
.../am/lsm/btree/LSMBTreeMergeTestDriver.java | 2 +-
.../btree/LSMBTreeScanDiskComponentsTest.java | 2 +-
...TreeUpdateInPlaceScanDiskComponentsTest.java | 4 +-
.../am/lsm/btree/impl/ITestOpCallback.java | 25 +
.../storage/am/lsm/btree/impl/TestLsmBtree.java | 260 ++++++
.../btree/impl/TestLsmBtreeLocalResource.java | 70 ++
.../impl/TestLsmBtreeLocalResourceFactory.java | 61 ++
.../btree/impl/TestLsmBtreeSearchCursor.java | 51 ++
.../am/lsm/btree/impl/TestLsmBtreeUtil.java | 105 +++
.../btree/multithread/LSMBTreeTestWorker.java | 2 +-
.../lsm/common/test/PrefixMergePolicyTest.java | 2 +-
.../LSMInvertedIndexMergeTest.java | 2 +-
.../PartitionedLSMInvertedIndexMergeTest.java | 2 +-
.../multithread/LSMInvertedIndexTestWorker.java | 2 +-
.../am/lsm/rtree/LSMRTreeMergeTestDriver.java | 2 +-
.../rtree/multithread/LSMRTreeTestWorker.java | 2 +-
.../LSMRTreeWithAntiMatterTuplesTestWorker.java | 2 +-
79 files changed, 2690 insertions(+), 606 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 489e5de..717b950 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -16,7 +16,8 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>apache-asterixdb</artifactId>
@@ -169,7 +170,7 @@
</goals>
<configuration>
<licenses combine.children="append">
- <license implementation="org.apache.rat.analysis.license.MITLicense"/>
+ <license implementation="org.apache.rat.analysis.license.MITLicense" />
</licenses>
<excludes combine.children="append">
<exclude>src/test/resources/**/results_parser_sqlpp/**</exclude>
@@ -192,7 +193,7 @@
<configuration>
<reportFile>${project.build.directory}/webqueryui-rat.txt</reportFile>
<licenses combine.children="append">
- <license implementation="org.apache.rat.analysis.license.MITLicense"/>
+ <license implementation="org.apache.rat.analysis.license.MITLicense" />
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
<licenseFamilyCategory>MIT</licenseFamilyCategory>
<licenseFamilyName>JQuery</licenseFamilyName>
@@ -210,7 +211,7 @@
</license>
</licenses>
<licenseFamilies combine.children="append">
- <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
+ <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily" />
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
<familyName>JQuery</familyName>
</licenseFamily>
@@ -567,5 +568,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-btree-test</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 10af9ff..04e6313 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -362,7 +362,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
try {
maxDiskLastLsn =
((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
- .getComponentLSN(lsmIndex.getImmutableComponents());
+ .getComponentLSN(lsmIndex.getDiskComponents());
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
throw e;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 53a4f23..104f80b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -39,14 +39,12 @@ import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
@@ -68,7 +66,6 @@ import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -83,7 +80,6 @@ import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
-import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -93,11 +89,10 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.file.FileUtil;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestNodeController {
protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
@@ -144,12 +139,7 @@ public class TestNodeController {
}
jobletCtx = Mockito.mock(IHyracksJobletContext.class);
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
- Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
- @Override
- public JobId answer(InvocationOnMock invocation) throws Throwable {
- return jobId;
- }
- });
+ Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
}
public void deInit() throws Exception {
@@ -167,8 +157,7 @@ public class TestNodeController {
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
+ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
@@ -176,7 +165,7 @@ public class TestNodeController {
ResourceType.LSM_BTREE);
IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+ storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, PARTITION,
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
@@ -196,10 +185,9 @@ public class TestNodeController {
IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
JobSpecification spec = new JobSpecification();
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
+ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+ storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
null, null, true, true, indexDataflowHelperFactory, false, false, null,
NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
@@ -236,13 +224,12 @@ public class TestNodeController {
}
}
- public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+ public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
+ mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
MetadataUtil.PENDING_NO_OP);
MetadataProvider mdProvider = new MetadataProvider(
@@ -252,34 +239,17 @@ public class TestNodeController {
recordType, metaType, mergePolicyFactory, mergePolicyProperties);
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
- primaryIndexInfo.fileSplitProvider, resourceFactory, !dataset.isTemp());
+ primaryIndexInfo.getFileSplitProvider(), resourceFactory, !dataset.isTemp());
IHyracksTaskContext ctx = createTestContext(false);
IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 0);
indexBuilder.build();
} finally {
mdProvider.getLocks().unlock();
}
+ return primaryIndexInfo;
}
- private int[] createPrimaryIndexBloomFilterFields(int length) {
- int[] primaryIndexBloomFilterKeyFields = new int[length];
- for (int j = 0; j < length; ++j) {
- primaryIndexBloomFilterKeyFields[j] = j;
- }
- return primaryIndexBloomFilterKeyFields;
- }
-
- private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
- IBinaryComparatorFactory[] primaryIndexComparatorFactories =
- new IBinaryComparatorFactory[primaryKeyTypes.length];
- for (int j = 0; j < primaryKeyTypes.length; ++j) {
- primaryIndexComparatorFactories[j] =
- BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
- }
- return primaryIndexComparatorFactories;
- }
-
- private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
+ public static ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
int i = 0;
ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
@@ -294,7 +264,7 @@ public class TestNodeController {
return primaryIndexSerdes;
}
- private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+ public static ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
ARecordType recordType, ARecordType metaType) {
ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
int i = 0;
@@ -336,55 +306,34 @@ public class TestNodeController {
return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
}
- @SuppressWarnings("unused")
- private class PrimaryIndexInfo {
- private Dataset dataset;
+ public static class PrimaryIndexInfo {
private IAType[] primaryKeyTypes;
private ARecordType recordType;
private ARecordType metaType;
private ILSMMergePolicyFactory mergePolicyFactory;
private Map<String, String> mergePolicyProperties;
- private int[] filterFields;
private int primaryIndexNumOfTupleFields;
- private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
private ITypeTraits[] primaryIndexTypeTraits;
private ISerializerDeserializer<?>[] primaryIndexSerdes;
- private int[] primaryIndexBloomFilterKeyFields;
- private ITypeTraits[] filterTypeTraits;
- private IBinaryComparatorFactory[] filterCmpFactories;
- private int[] btreeFields;
private ConstantFileSplitProvider fileSplitProvider;
private RecordDescriptor rDesc;
private int[] primaryIndexInsertFieldsPermutations;
private int[] primaryKeyIndexes;
- private List<List<String>> keyFieldNames;
- private List<Integer> keyFieldSourceIndicators;
- private List<IAType> keyFieldTypes;
private Index index;
- private IStorageComponentProvider storageComponentProvider;
public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
- int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
- IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
- this.storageComponentProvider = storageComponentProvider;
- this.dataset = dataset;
+ int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+ throws AlgebricksException {
this.primaryKeyTypes = primaryKeyTypes;
this.recordType = recordType;
this.metaType = metaType;
this.mergePolicyFactory = mergePolicyFactory;
this.mergePolicyProperties = mergePolicyProperties;
- this.filterFields = filterFields;
this.primaryKeyIndexes = primaryKeyIndexes;
primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
primaryIndexTypeTraits =
createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
- primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
- primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
- filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recordType);
- filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset, recordType,
- NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
- btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
primaryIndexSerdes =
createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
@@ -392,23 +341,22 @@ public class TestNodeController {
for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
primaryIndexInsertFieldsPermutations[i] = i;
}
- keyFieldSourceIndicators = primaryKeyIndicators;
- keyFieldNames = new ArrayList<>();
- keyFieldTypes = Arrays.asList(primaryKeyTypes);
- for (int i = 0; i < keyFieldSourceIndicators.size(); i++) {
- Integer indicator = keyFieldSourceIndicators.get(i);
+ List<List<String>> keyFieldNames = new ArrayList<>();
+ List<IAType> keyFieldTypes = Arrays.asList(primaryKeyTypes);
+ for (int i = 0; i < primaryKeyIndicators.size(); i++) {
+ Integer indicator = primaryKeyIndicators.get(i);
String[] fieldNames =
indicator == Index.RECORD_INDICATOR ? recordType.getFieldNames() : metaType.getFieldNames();
keyFieldNames.add(Arrays.asList(fieldNames[primaryKeyIndexes[i]]));
}
index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
- IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, false, true,
+ IndexType.BTREE, keyFieldNames, primaryKeyIndicators, keyFieldTypes, false, false, true,
MetadataUtil.PENDING_NO_OP);
List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
- FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(
- ((ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext())
- .getClusterStateManager(),
- dataset, index.getIndexName(), nodes);
+ CcApplicationContext appCtx =
+ (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+ FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset,
+ index.getIndexName(), nodes);
fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
}
@@ -436,6 +384,10 @@ public class TestNodeController {
.thenReturn(searcgRecDesc);
return rDescProvider;
}
+
+ public ConstantFileSplitProvider getFileSplitProvider() {
+ return fileSplitProvider;
+ }
}
public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
@@ -450,18 +402,12 @@ public class TestNodeController {
public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo,
IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
- primaryIndexInfo.fileSplitProvider);
+ primaryIndexInfo.getFileSplitProvider());
}
- public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
- ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
- Map<String, String> mergePolicyProperties, int[] filterFields,
- IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
- List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
- PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
- storageComponentProvider);
- return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider)
- .create(createTestContext(true).getJobletContext().getServiceContext(), PARTITION);
+ public IStorageManager getStorageManager() {
+ CcApplicationContext appCtx =
+ (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+ return appCtx.getStorageManager();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
new file mode 100644
index 0000000..76bec8c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -0,0 +1,889 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Predicate;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ComponentRollbackTest {
+
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+ { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ private static final ARecordType META_TYPE = null;
+ private static final GenerationFunction[] META_GEN_FUNCTION = null;
+ private static final boolean[] UNIQUE_META_FIELDS = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+ private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final int TOTAL_NUM_OF_RECORDS = 10000;
+ private static final int RECORDS_PER_COMPONENT = 1000;
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent;
+ private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+ private static TestNodeController nc;
+ private static TestLsmBtree lsmBtree;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+ private static Dataset dataset;
+ private static IHyracksTaskContext ctx;
+ private static IIndexDataflowHelper indexDataflowHelper;
+ private static ITransactionContext txnCtx;
+ private static LSMInsertDeleteOperatorNodePushable insertOp;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ nc = new TestNodeController(null, false);
+ nc.init();
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.out.println("TearDown");
+ nc.deInit();
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Before
+ public void createIndex() throws Exception {
+ List<List<String>> partitioningKeys = new ArrayList<>();
+ partitioningKeys.add(Collections.singletonList("key"));
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+ NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ partitioningKeys, null, null, null, false, null, false),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ ctx = nc.createTestContext(false);
+ indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+ indexDataflowHelper.open();
+ lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+ indexDataflowHelper.close();
+ nc.newJobId();
+ txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+ null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+ }
+
+ @After
+ public void destroyIndex() throws Exception {
+ indexDataflowHelper.destroy();
+ }
+
+ private void allowAllOps(TestLsmBtree lsmBtree) {
+ lsmBtree.addModifyCallback(sem -> sem.release());
+ lsmBtree.addFlushCallback(sem -> sem.release());
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.addMergeCallback(sem -> sem.release());
+ }
+
+ @Test
+ public void testRollbackWhileNoOp() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // rollback a memory component
+ lsmAccessor.deleteComponents(memoryComponentsPredicate);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ // rollback the last disk component
+ lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ lsmAccessor.deleteComponents(pred);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackThenInsert() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // rollback a memory component
+ lsmAccessor.deleteComponents(memoryComponentsPredicate);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+
+ // insert again
+ nc.newJobId();
+ txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+ null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+ insertOp.open();
+ for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // rollback the last disk component
+ lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ lsmAccessor.deleteComponents(pred);
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileSearch() {
+ try {
+ // allow all operations but search
+ allowAllOps(lsmBtree);
+ lsmBtree.clearSearchCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // rollback a memory component
+ lsmAccessor.deleteComponents(
+ c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
+ // now that the rollback has completed, we will unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ // search now and ensure
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ // rollback the last disk component
+ // re-block searches
+ lsmBtree.clearSearchCallbacks();
+ Searcher secondSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree,
+ TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ // wait till firstSearcher enter the components
+ secondSearcher.waitUntilEntered();
+ lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ lsmAccessor.deleteComponents(pred);
+ // now that the rollback has completed, we will unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(secondSearcher.result());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileFlush() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // disable flushes
+ lsmBtree.clearFlushCallbacks();
+ Flusher firstFlusher = new Flusher(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+ firstFlusher.waitUntilCount(1);
+ // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
+ Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+ // now that the rollback has completed, we will search
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ //unblock the flush
+ lsmBtree.allowFlush(1);
+ // ensure rollback completed
+ rollerback.complete();
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileMerge() {
+ try {
+ // allow all operations but merge
+ allowAllOps(lsmBtree);
+ lsmBtree.clearMergeCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // Now, we will start a full merge
+ Merger merger = new Merger(lsmBtree);
+ ILSMIndexAccessor mergeAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // select the components to merge... the last three
+ int numMergedComponents = 3;
+ List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ for (int i = 0; i < numMergedComponents; i++) {
+ mergedComponents.add(diskComponents.get(i));
+ }
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ merger.waitUntilCount(1);
+ // now that we enetered, we will rollback
+ Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+ // rollback is now waiting for the merge to complete
+ // we will search
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ //unblock the merge
+ lsmBtree.allowMerge(1);
+ // ensure rollback completes
+ rollerback.complete();
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure that we rolled back the merged component
+ searchAndAssertCount(nc, ctx, dataset, storageManager,
+ TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileFlushAndSearchFlushExistsFirst() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // disable flushes
+ // disable searches
+ lsmBtree.clearFlushCallbacks();
+ lsmBtree.clearSearchCallbacks();
+ Flusher firstFlusher = new Flusher(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+ firstFlusher.waitUntilCount(1);
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback rollback a memory component
+ Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+ //unblock the flush
+ lsmBtree.allowFlush(1);
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ // ensure current mem component is not modified
+ rollerback.complete();
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure the rollback was no op since it waits for ongoing flushes
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileFlushAndSearchSearchExistsFirst() {
+ try {
+ // allow all operations
+ allowAllOps(lsmBtree);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // disable flushes
+ // disable searches
+ lsmBtree.clearFlushCallbacks();
+ Flusher firstFlusher = new Flusher(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+ firstFlusher.waitUntilCount(1);
+ lsmBtree.clearSearchCallbacks();
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+ // The rollback will be waiting for the flush to complete
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ //unblock the flush
+ lsmBtree.allowFlush(1);
+ // ensure current mem component is not modified
+ rollerback.complete();
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ // search now and ensure
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileMergeAndSearchMergeExitsFirst() {
+ try {
+ // allow all operations except merge
+ allowAllOps(lsmBtree);
+ lsmBtree.clearMergeCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // Now, we will start a merge
+ Merger merger = new Merger(lsmBtree);
+ ILSMIndexAccessor mergeAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // select the components to merge... the last three
+ int numMergedComponents = 3;
+ List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ for (int i = 0; i < numMergedComponents; i++) {
+ mergedComponents.add(diskComponents.get(i));
+ }
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ merger.waitUntilCount(1);
+ // we will block search
+ lsmBtree.clearSearchCallbacks();
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+ // the rollback is waiting for all flushes and merges to complete before it proceeds
+ // unblock the merge
+ lsmBtree.allowMerge(1);
+ // unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ rollerback.complete();
+ // now that the rollback has completed, we will search
+ searchAndAssertCount(nc, ctx, dataset, storageManager,
+ TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRollbackWhileMergeAndSearchSearchExitsFirst() {
+ try {
+ // allow all operations except merge
+ allowAllOps(lsmBtree);
+ lsmBtree.clearMergeCallbacks();
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ // flush every 1000 records
+ if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+ // get all components
+ List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+ List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+ Assert.assertEquals(9, diskComponents.size());
+ Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ // Now, we will start a merge
+ Merger merger = new Merger(lsmBtree);
+ ILSMIndexAccessor mergeAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ // select the components to merge... the last three
+ List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+ long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+ int numMergedComponents = 3;
+ for (int i = 0; i < numMergedComponents; i++) {
+ mergedComponents.add(diskComponents.get(i));
+ }
+ mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+ merger.waitUntilCount(1);
+ // we will block search
+ lsmBtree.clearSearchCallbacks();
+ Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ // wait till firstSearcher enter the components
+ firstSearcher.waitUntilEntered();
+ // now that we enetered, we will rollback
+ Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+ // unblock the search
+ lsmBtree.addSearchCallback(sem -> sem.release());
+ lsmBtree.allowSearch(1);
+ Assert.assertTrue(firstSearcher.result());
+ // even though rollback has been called, it is still waiting for the merge to complete
+ searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ //unblock the merge
+ lsmBtree.allowMerge(1);
+ rollerBack.complete();
+ searchAndAssertCount(nc, ctx, dataset, storageManager,
+ TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ // ensure current mem component is not modified
+ Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private class Rollerback {
+ private Thread task;
+ private Exception failure;
+
+ public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) {
+ // now that we enetered, we will rollback
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ ILSMIndexAccessor lsmAccessor =
+ lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ try {
+ lsmAccessor.deleteComponents(predicate);
+ } catch (HyracksDataException e) {
+ failure = e;
+ }
+ }
+ };
+ task = new Thread(runnable);
+ task.start();
+ }
+
+ void complete() throws Exception {
+ task.join();
+ if (failure != null) {
+ throw failure;
+ }
+ }
+ }
+
+ private class Searcher {
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private Future<Boolean> task;
+ private volatile boolean entered = false;
+
+ public Searcher(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+ StorageComponentProvider storageManager, TestLsmBtree lsmBtree, int numOfRecords) {
+ lsmBtree.addSearchCallback(sem -> {
+ synchronized (Searcher.this) {
+ entered = true;
+ Searcher.this.notifyAll();
+ }
+ });
+ Callable<Boolean> callable = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ searchAndAssertCount(nc, ctx, dataset, storageManager, numOfRecords);
+ return true;
+ }
+ };
+ task = executor.submit(callable);
+ }
+
+ boolean result() throws Exception {
+ return task.get();
+ }
+
+ synchronized void waitUntilEntered() throws InterruptedException {
+ while (!entered) {
+ this.wait();
+ }
+ }
+ }
+
+ private class Merger {
+ private volatile int count = 0;
+
+ public Merger(TestLsmBtree lsmBtree) {
+ lsmBtree.addMergeCallback(sem -> {
+ synchronized (Merger.this) {
+ count++;
+ Merger.this.notifyAll();
+ }
+ });
+ }
+
+ synchronized void waitUntilCount(int count) throws InterruptedException {
+ while (this.count != count) {
+ this.wait();
+ }
+ }
+ }
+
+ private class Flusher {
+ private volatile int count = 0;
+
+ public Flusher(TestLsmBtree lsmBtree) {
+ lsmBtree.addFlushCallback(sem -> {
+ synchronized (Flusher.this) {
+ count++;
+ Flusher.this.notifyAll();
+ }
+ });
+ }
+
+ synchronized void waitUntilCount(int count) throws InterruptedException {
+ while (this.count != count) {
+ this.wait();
+ }
+ }
+ }
+
+ private class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
+ private final long lsn;
+
+ public DiskComponentLsnPredicate(long lsn) {
+ this.lsn = lsn;
+ }
+
+ @Override
+ public boolean test(ILSMComponent c) {
+ try {
+ return c instanceof ILSMMemoryComponent
+ || (c instanceof ILSMDiskComponent && AbstractLSMIOOperationCallback
+ .getTreeIndexLSN(((ILSMDiskComponent) c).getMetadata()) >= lsn);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+
+ private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+ StorageComponentProvider storageManager, int numOfRecords)
+ throws HyracksDataException, AlgebricksException {
+ nc.newJobId();
+ TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+ Collections.emptyList(), Collections.emptyList(), false);
+ IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+ emptyTupleOp.open();
+ emptyTupleOp.close();
+ Assert.assertEquals(numOfRecords, countOp.getCount());
+ }
+
+ public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+ Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+ CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+ exceptionThrowingOperations, errorThrowingOperations);
+ return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+ closeAnswer, deepCopyInputFrames);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1e3960f..a0e3aa9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
import org.apache.asterix.app.data.gen.TupleGenerator;
import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
@@ -58,9 +59,10 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -113,8 +115,8 @@ public class LogMarkerTest {
partitioningKeys, null, null, null, false, null, false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
- null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
IHyracksTaskContext ctx = nc.createTestContext(true);
nc.newJobId();
ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
@@ -147,13 +149,14 @@ public class LogMarkerTest {
}
insertOp.close();
nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
- IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES, RECORD_TYPE,
- META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
- KEY_INDICATORS_LIST);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
+ IIndexDataflowHelper dataflowHelper =
+ iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
dataflowHelper.open();
LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
LongPointable longPointable = LongPointable.FACTORY.createPointable();
- ComponentMetadataUtil.get(btree, ComponentMetadataUtil.MARKER_LSN_KEY, longPointable);
+ ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, longPointable);
long lsn = longPointable.getLong();
int numOfMarkers = 0;
LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
@@ -204,4 +207,4 @@ public class LogMarkerTest {
return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
closeAnswer, deepCopyInputFrames);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
new file mode 100644
index 0000000..893b428
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.IDatasetDetails;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.common.IResourceFactory;
+
+public class TestDataset extends Dataset {
+
+ private static final long serialVersionUID = 1L;
+
+ public TestDataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName,
+ String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties,
+ IDatasetDetails datasetDetails, Map<String, String> hints, DatasetType datasetType, int datasetId,
+ int pendingOp) {
+ super(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, nodeGroupName, compactionPolicy,
+ compactionPolicyProperties, datasetDetails, hints, datasetType, datasetId, pendingOp);
+ }
+
+ @Override
+ public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
+ ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
+ throws AlgebricksException {
+ ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(this,
+ recordType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+ IResourceFactory resourceFactory =
+ TestLsmBTreeResourceFactoryProvider.INSTANCE.getResourceFactory(mdProvider, this, index, recordType,
+ metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories);
+ return new DatasetLocalResourceFactory(getDatasetId(), resourceFactory);
+ }
+
+ @Override
+ public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
+ return TestLsmBtreeIoOpCallbackFactory.INSTANCE;
+ }
+}