You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2017/11/10 16:58:17 UTC
[2/3] asterixdb git commit: [ASTERIXDB-2115] Add Component Ids to LSM
Indexes
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2f94ad7..fb9901d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -87,6 +87,8 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -133,7 +135,6 @@ import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -595,9 +596,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
// bulkload?)
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
- new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory);
+ LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
+ fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
+ indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
@@ -1001,8 +1002,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
+ BulkLoadUsage.LOAD, dataset.getDatasetId());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
null, true, modificationCallbackFactory);
@@ -1135,8 +1137,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
+ BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1237,9 +1240,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
- indexDataflowHelperFactory);
+ indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1353,8 +1356,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory);
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
+ null, BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
filterFactory, modificationCallbackFactory, prevFieldPermutation);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 3fec73b..e5f97f0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveNotificationHandler;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
@@ -107,6 +108,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -505,15 +507,15 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
case BTREE:
return getDatasetType() == DatasetType.EXTERNAL
&& !index.getIndexName().equals(IndexingConstants.getFilesIndexName(getDatasetName()))
- ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
- : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+ ? new LSMBTreeWithBuddyIOOperationCallbackFactory(getComponentIdGeneratorFactory())
+ : new LSMBTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
case RTREE:
- return LSMRTreeIOOperationCallbackFactory.INSTANCE;
+ return new LSMRTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
case LENGTH_PARTITIONED_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case SINGLE_PARTITION_WORD_INVIX:
- return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE;
+ return new LSMInvertedIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory());
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
index.getIndexType().toString());
@@ -532,6 +534,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
: new SecondaryIndexOperationTrackerFactory(getDatasetId());
}
+ public ILSMComponentIdGeneratorFactory getComponentIdGeneratorFactory() {
+ return new DatasetLSMComponentIdGeneratorFactory(getDatasetId());
+ }
+
/**
* Get search callback factory for this dataset with the passed index and operation
*
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5dac407..7701f65 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,8 +19,6 @@
package org.apache.asterix.metadata.utils;
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
-
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -51,6 +49,8 @@ import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,6 +71,8 @@ import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@@ -124,8 +126,8 @@ public abstract class SecondaryIndexOperationsHelper {
this.index = index;
this.physOptConf = physOptConf;
this.metadataProvider = metadataProvider;
- this.itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
- dataset.getItemTypeName());
+ this.itemType =
+ (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
this.metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
Pair<ARecordType, ARecordType> enforcedTypes = getEnforcedType(index, itemType, metaType);
this.enforcedItemType = enforcedTypes.first;
@@ -341,11 +343,15 @@ public abstract class SecondaryIndexOperationsHelper {
return sortOp;
}
- protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+ protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
throws AlgebricksException {
- TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory);
+ IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
+ metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+
+ LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
+ secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
+ primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..74590c7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+
+public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public enum BulkLoadUsage {
+ LOAD,
+ CREATE_INDEX
+ }
+
+ protected final IIndexDataflowHelperFactory primaryIndexHelperFactory;
+
+ protected final BulkLoadUsage usage;
+
+ protected final int datasetId;
+
+ public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
+ IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId) {
+ super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ indexHelperFactory);
+ this.primaryIndexHelperFactory = primaryIndexHelperFactory;
+ this.usage = usage;
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition,
+ fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
new file mode 100644
index 0000000..2415556
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+
+public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
+ protected final BulkLoadUsage usage;
+
+ protected final IIndexDataflowHelper primaryIndexHelper;
+ protected final IDatasetLifecycleManager datasetManager;
+ protected final int datasetId;
+ protected final int partition;
+
+ protected ILSMIndex primaryIndex;
+
+ public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
+ IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition,
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId)
+ throws HyracksDataException {
+ super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint,
+ checkIfEmptyIndex, recDesc);
+
+ if (priamryIndexDataflowHelperFactory != null) {
+ this.primaryIndexHelper =
+ priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ } else {
+ this.primaryIndexHelper = null;
+ }
+ this.usage = usage;
+ this.datasetId = datasetId;
+ this.partition = partition;
+ INcApplicationContext ncCtx =
+ (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ datasetManager = ncCtx.getDatasetLifecycleManager();
+ }
+
+ @Override
+ protected void initializeBulkLoader() throws HyracksDataException {
+ ILSMIndex targetIndex = (ILSMIndex) index;
+ if (usage.equals(BulkLoadUsage.LOAD)) {
+ // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
+ // than Ids of all memory components
+
+ // TODO handle component Id for datasets loaded multiple times
+ // TODO move this piece of code to io operation callback
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+ LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+ } else {
+ primaryIndexHelper.open();
+ primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
+ List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ if (!primaryComponents.isEmpty()) {
+ // TODO move this piece of code to io operation callback
+ // Ideally, this should be done in io operation callback when a bulk load operation is finished
+ // However, currently we don't have an extensible callback mechanism to support this
+ ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
+ primaryComponents.get(primaryComponents.size() - 1).getId());
+ ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+ LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ super.close();
+ } finally {
+ if (primaryIndex != null) {
+ primaryIndexHelper.close();
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index cee20ce..6d9ec47 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,24 +19,20 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
/**
* This operator node is used to bulk load incoming tuples (scanned from the primary index)
@@ -56,12 +52,9 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
private ILSMIndex primaryIndex;
private ILSMIndex secondaryIndex;
- private ILSMDiskComponent component;
- private ILSMDiskComponentBulkLoader componentBulkLoader;
+ private LSMIndexDiskComponentBulkLoader componentBulkLoader;
private int currentComponentPos = -1;
- private ILSMDiskComponent[] diskComponents;
-
public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
IIndexDataflowHelperFactory primaryIndexHelperFactory,
IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
@@ -92,7 +85,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
super.open();
primaryIndexHelper.open();
primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
- diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()];
secondaryIndexHelper.open();
secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance();
@@ -107,8 +99,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
closeException = e;
}
- activateComponents();
-
try {
if (primaryIndexHelper != null) {
primaryIndexHelper.close();
@@ -184,24 +174,22 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
}
private void endCurrentComponent() throws HyracksDataException {
- if (component != null) {
- // set disk component id
-
+ if (componentBulkLoader != null) {
componentBulkLoader.end();
- diskComponents[currentComponentPos] = component;
-
componentBulkLoader = null;
- component = null;
}
}
private void loadNewComponent(int componentPos) throws HyracksDataException {
endCurrentComponent();
- component = secondaryIndex.createBulkLoadTarget();
int numTuples = getNumDeletedTuples(componentPos);
- componentBulkLoader = component.createBulkLoader(1.0f, false, numTuples, false, true, true);
-
+ ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
+ componentBulkLoader =
+ (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
+ ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
+ // TODO move this piece of code to io operation callback
+ LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
}
private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {
@@ -220,30 +208,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
}
- private void activateComponents() throws HyracksDataException {
- List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
- for (int i = diskComponents.length - 1; i >= 0; i--) {
- // start from the oldest component to the newest component
- if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) {
- secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null,
- diskComponents[i]);
-
- // setting component id has to be place between afterOperation and addBulkLoadedComponent,
- // since afterOperation would set a flush component id (but it's not invalid)
- // and addBulkLoadedComponent would finalize the component
- ILSMDiskComponentId primaryComponentId = primaryComponents.get(i).getComponentId();
- //set component id
- diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
- LongPointable.FACTORY.createPointable(primaryComponentId.getMinId()));
- diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
- LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId()));
-
- ((AbstractLSMIndex) secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]);
-
- }
- }
- }
-
private int getNumDeletedTuples(int componentPos) {
DeletedTupleCounter counter = (DeletedTupleCounter) ctx.getStateObject(partition);
return counter.get(componentPos);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 5fc07ad..095159b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -65,7 +65,7 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
index = indexHelper.getIndexInstance();
try {
writer.open();
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ initializeBulkLoader();
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -116,4 +116,8 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
writer.fail();
}
}
+
+ protected void initializeBulkLoader() throws HyracksDataException {
+ bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 6083637..a859f68 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -57,6 +57,7 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource {
public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
+ ioOpCallbackFactory.initialize(serviceCtx);
return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits,
cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 04b63f9..9422253 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -60,6 +60,7 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource {
public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
+ ioOpCallbackFactory.initialize(serviceCtx);
return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index dfa88da..1988736 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -70,6 +70,7 @@ public class LSMBTreeLocalResource extends LsmResource {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ ioOpCallbackFactory.initialize(serviceCtx);
//TODO: enable updateAwareness for secondary LSMBTree indexes
boolean updateAware = false;
return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index a8e707f..d759167 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -24,11 +24,10 @@ import java.util.Set;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent implements ILSMDiskComponent {
+public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
protected final BTree btree;
public LSMBTreeDiskComponent(AbstractLSMIndex lsmIndex, BTree btree, ILSMComponentFilter filter) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index a60f544..ab8e899 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -127,4 +127,11 @@ public interface ILSMComponent {
* @return index data structure that is the stored in the component
*/
IIndex getIndex();
+
+ /**
+ *
+ * @return id of the component
+ * @throws HyracksDataException
+ */
+ ILSMComponentId getId() throws HyracksDataException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
new file mode 100644
index 0000000..5662862
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * Stores the id of the disk component, which is a interval (minId, maxId).
+ * It is generated by {@link ILSMComponentIdGenerator}
+ *
+ */
+public interface ILSMComponentId {
+ public enum IdCompareResult {
+ UNKNOWN,
+ LESS_THAN,
+ GREATER_THAN,
+ INTERSECT,
+ INCLUDE
+ }
+
+ /**
+ * @return whether the id is missing
+ */
+ boolean missing();
+
+ IdCompareResult compareTo(ILSMComponentId id);
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
new file mode 100644
index 0000000..5dd3061
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * This interface generates component Ids for LSM components (both memory and disk components).
+ */
+public interface ILSMComponentIdGenerator {
+
+ /**
+ * @return An Id for LSM component
+ */
+ public ILSMComponentId getId();
+
+ /**
+ * Refresh the component Id generator to generate the next Id.
+ * {@link #getId()} would always return the same Id before this method is called.
+ */
+ public void refresh();
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c0f530b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+@FunctionalInterface
+public interface ILSMComponentIdGeneratorFactory extends Serializable {
+
+ ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx);
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 43c5482..bd2bb45 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -48,14 +48,6 @@ public interface ILSMDiskComponent extends ILSMComponent {
int getFileReferenceCount();
/**
- * Return the component Id of this disk component from its metadata
- *
- * @return
- * @throws HyracksDataException
- */
- ILSMDiskComponentId getComponentId() throws HyracksDataException;
-
- /**
* @return LsmIndex of the component
*/
AbstractLSMIndex getLsmIndex();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
deleted file mode 100644
index 5d38ace..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.api;
-
-import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
-
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and
- * currently are set as the flush LSN.
- * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of
- * all ids of merged disk components.
- *
- * @author luochen
- *
- */
-public interface ILSMDiskComponentId {
-
- public static final long NOT_FOUND = -1;
-
- public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
- new MutableArrayValueReference("Component_Id_Min".getBytes());
-
- public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
- new MutableArrayValueReference("Component_Id_Max".getBytes());
-
- long getMinId();
-
- long getMaxId();
-
- default boolean notFound() {
- return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
index b291f7c..a9dc50e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -20,7 +20,15 @@ package org.apache.hyracks.storage.am.lsm.common.api;
import java.io.Serializable;
-@FunctionalInterface
+import org.apache.hyracks.api.application.INCServiceContext;
+
public interface ILSMIOOperationCallbackFactory extends Serializable {
+ /**
+ * Initialize the callback factory with the given ncCtx
+ *
+ * @param ncCtx
+ */
+ void initialize(INCServiceContext ncCtx);
+
ILSMIOOperationCallback createIoOpCallback(ILSMIndex index);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 13543e4..f892585 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -101,4 +101,12 @@ public interface ILSMMemoryComponent extends ILSMComponent {
* @return the size of the memory component
*/
long getSize();
+
+ /**
+ * Reset the component Id of the memory component after it's recycled
+ *
+ * @param newId
+ * @throws HyracksDataException
+ */
+ void resetId(ILSMComponentId newId) throws HyracksDataException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index a0d1c23..b664102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -18,19 +18,29 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
import org.apache.hyracks.storage.common.MultiComparator;
public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
+ private static final Logger LOGGER = Logger.getLogger(AbstractLSMDiskComponent.class.getName());
+
private final DiskComponentMetadata metadata;
+ // a variable cache of componentId stored in metadata.
+ // since componentId is immutable, we do not want to read from metadata every time the componentId
+ // is requested.
+ private ILSMComponentId componentId;
+
public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
ILSMComponentFilter filter) {
super(lsmIndex, filter);
@@ -109,13 +119,23 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl
}
@Override
- public ILSMDiskComponentId getComponentId() throws HyracksDataException {
- long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
- ILSMDiskComponentId.NOT_FOUND);
- long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
- ILSMDiskComponentId.NOT_FOUND);
- //TODO: do we need to throw an exception when ID is not found?
- return new LSMDiskComponentId(minID, maxID);
+ public ILSMComponentId getId() throws HyracksDataException {
+ if (componentId != null) {
+ return componentId;
+ }
+ synchronized (this) {
+ if (componentId == null) {
+ componentId = LSMComponentIdUtils.readFrom(metadata);
+ }
+ }
+ if (componentId.missing()) {
+ // For normal datasets, componentId shouldn't be missing, since otherwise it'll be a bug.
+ // However, we cannot throw an exception here to be compatible with legacy datasets.
+ // In this case, the disk component would always get a garbage Id [-1, -1], which makes the
+ // component Id-based optimization useless but still correct.
+ LOGGER.warning("Component Id not found from disk component metadata");
+ }
+ return componentId;
}
/**
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2b2fe0d..b0cc318 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -44,6 +44,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -438,6 +440,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
if (c != EmptyComponent.INSTANCE) {
diskComponents.add(0, c);
}
+ assert checkComponentIds();
}
@Override
@@ -448,6 +451,25 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
if (newComponent != EmptyComponent.INSTANCE) {
diskComponents.add(swapIndex, newComponent);
}
+ assert checkComponentIds();
+ }
+
+ /**
+ * A helper method to ensure disk components have proper Ids (non-decreasing)
+ * We may get rid of this method once component Id is stablized
+ *
+ * @throws HyracksDataException
+ */
+ private boolean checkComponentIds() throws HyracksDataException {
+ for (int i = 0; i < diskComponents.size() - 1; i++) {
+ ILSMComponentId id1 = diskComponents.get(i).getId();
+ ILSMComponentId id2 = diskComponents.get(i + 1).getId();
+ IdCompareResult cmp = id1.compareTo(id2);
+ if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
+ return false;
+ }
+ }
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index b7c3350..0378aae 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -22,9 +22,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent {
@@ -34,6 +37,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
private int writerCount;
private boolean requestedToBeActive;
private final MemoryComponentMetadata metadata;
+ private ILSMComponentId componentId;
public AbstractLSMMemoryComponent(AbstractLSMIndex lsmIndex, IVirtualBufferCache vbc, boolean isActive,
ILSMComponentFilter filter) {
@@ -247,6 +251,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
protected void doDeallocate() throws HyracksDataException {
getIndex().deactivate();
getIndex().destroy();
+ componentId = null;
}
@Override
@@ -259,4 +264,19 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
IBufferCache virtualBufferCache = getIndex().getBufferCache();
return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
}
+
+ @Override
+ public ILSMComponentId getId() {
+ return componentId;
+ }
+
+ @Override
+ public void resetId(ILSMComponentId componentId) throws HyracksDataException {
+ if (this.componentId != null && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
+ throw new IllegalStateException(
+ "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId);
+ }
+ this.componentId = componentId;
+ LSMComponentIdUtils.persist(this.componentId, metadata);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index f2751bf..e3ca9f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -25,8 +25,8 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndex;
@@ -83,8 +83,8 @@ public class EmptyComponent implements ILSMDiskComponent {
}
@Override
- public ILSMDiskComponentId getComponentId() throws HyracksDataException {
- return null;
+ public ILSMComponentId getId() {
+ return LSMComponentId.MISSING_COMPONENT_ID;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
new file mode 100644
index 0000000..dd86f65
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public class LSMComponentId implements ILSMComponentId {
+
+ public static final long NOT_FOUND = -1;
+
+ // Use to handle legacy datasets which do not have the component Id
+ public static final ILSMComponentId MISSING_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
+
+ // A default component id used for bulk loaded component
+ public static final ILSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+
+ private long minId;
+
+ private long maxId;
+
+ public LSMComponentId(long minId, long maxId) {
+ assert minId <= maxId;
+ this.minId = minId;
+ this.maxId = maxId;
+ }
+
+ public void reset(long minId, long maxId) {
+ this.minId = minId;
+ this.maxId = maxId;
+ }
+
+ public long getMinId() {
+ return this.minId;
+ }
+
+ public long getMaxId() {
+ return this.maxId;
+ }
+
+ @Override
+ public boolean missing() {
+ return minId == NOT_FOUND || maxId == NOT_FOUND;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + minId + "," + maxId + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof LSMComponentId)) {
+ return false;
+ }
+ LSMComponentId other = (LSMComponentId) obj;
+ if (maxId != other.maxId) {
+ return false;
+ }
+ if (minId != other.minId) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public IdCompareResult compareTo(ILSMComponentId id) {
+ if (this.missing() || id == null || id.missing()) {
+ return IdCompareResult.UNKNOWN;
+ }
+ LSMComponentId componentId = (LSMComponentId) id;
+ if (this.getMinId() > componentId.getMaxId()) {
+ return IdCompareResult.GREATER_THAN;
+ } else if (this.getMaxId() < componentId.getMinId()) {
+ return IdCompareResult.LESS_THAN;
+ } else if (this.getMinId() <= componentId.getMinId() && this.getMaxId() >= componentId.getMaxId()) {
+ return IdCompareResult.INCLUDE;
+ } else {
+ return IdCompareResult.INTERSECT;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
new file mode 100644
index 0000000..e174153
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGenerator}.
+ *
+ */
+public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
+
+ protected long previousTimestamp = -1L;
+
+ private ILSMComponentId componentId;
+
+ public LSMComponentIdGenerator() {
+ refresh();
+ }
+
+ @Override
+ public void refresh() {
+ long ts = getCurrentTimestamp();
+ componentId = new LSMComponentId(ts, ts);
+ }
+
+ @Override
+ public ILSMComponentId getId() {
+ return componentId;
+ }
+
+ protected long getCurrentTimestamp() {
+ long timestamp = System.currentTimeMillis();
+ while (timestamp <= previousTimestamp) {
+ // make sure timestamp is strictly increasing
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ timestamp = System.currentTimeMillis();
+ }
+ previousTimestamp = timestamp;
+ return timestamp;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c55ef19
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGeneratorFactory}.
+ *
+ */
+public class LSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory {
+
+ @Override
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ return new LSMComponentIdGenerator();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
deleted file mode 100644
index f448c84..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.lsm.common.impls;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-
-public class LSMDiskComponentId implements ILSMDiskComponentId {
-
- private final long minId;
-
- private final long maxId;
-
- public LSMDiskComponentId(long minId, long maxId) {
- this.minId = minId;
- this.maxId = maxId;
- }
-
- @Override
- public long getMinId() {
- return this.minId;
- }
-
- @Override
- public long getMaxId() {
- return this.maxId;
- }
-
- @Override
- public String toString() {
- return "[" + minId + "," + maxId + "]";
- }
-
- @Override
- public int hashCode() {
- return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof LSMDiskComponentId)) {
- return false;
- }
- LSMDiskComponentId other = (LSMDiskComponentId) obj;
- if (maxId != other.maxId) {
- return false;
- }
- if (minId != other.minId) {
- return false;
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 48b6d8f..b0abeb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -95,15 +95,24 @@ public class LSMHarness implements ILSMHarness {
// Before entering the components, prune those corner cases that indeed should not proceed.
switch (opType) {
case FLUSH:
+ // if the lsm index does not have memory components allocated, then nothing to flush
+ if (!lsmIndex.isMemoryComponentsAllocated()) {
+ return false;
+ }
ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
if (!flushingComponent.isModified()) {
- //The mutable component has not been modified by any writer. There is nothing to flush.
- //since the component is empty, set its state back to READABLE_WRITABLE
if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) {
+ //The mutable component has not been modified by any writer. There is nothing to flush.
+ //since the component is empty, set its state back to READABLE_WRITABLE only when it's
+ //state has been set to READABLE_UNWRITABLE
flushingComponent.setState(ComponentState.READABLE_WRITABLE);
opTracker.notifyAll();
+
+ // Call recycled only when we change it's state is reset back to READABLE_WRITABLE
+ // Otherwise, if the component is in other state, e.g., INACTIVE, or
+ // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here.
+ lsmIndex.getIOOperationCallback().recycled(flushingComponent);
}
- lsmIndex.getIOOperationCallback().recycled(flushingComponent);
return false;
}
if (flushingComponent.getWriterCount() > 0) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 08b8bb6..000d5cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -21,13 +21,14 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
private final AbstractLSMIndex lsmIndex;
private final ILSMDiskComponent component;
- private final IIndexBulkLoader componentBulkLoader;
+ private final ILSMDiskComponentBulkLoader componentBulkLoader;
public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput,
long numElementsHint) throws HyracksDataException {
@@ -39,11 +40,19 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
}
+ public ILSMDiskComponent getComponent() {
+ return component;
+ }
+
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
componentBulkLoader.add(tuple);
}
+ public void delete(ITupleReference tuple) throws HyracksDataException {
+ componentBulkLoader.delete(tuple);
+ }
+
@Override
public void end() throws HyracksDataException {
try {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 09ca553..21d10d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.List;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -37,6 +38,11 @@ public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFac
return NoOpIOOperationCallback.INSTANCE;
}
+ @Override
+ public void initialize(INCServiceContext ncCtx) {
+ // No op
+ }
+
public static class NoOpIOOperationCallback implements ILSMIOOperationCallback {
private static final NoOpIOOperationCallback INSTANCE = new NoOpIOOperationCallback();