You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2017/11/10 16:58:17 UTC

[2/3] asterixdb git commit: [ASTERIXDB-2115] Add Component Ids to LSM Indexes

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2f94ad7..fb9901d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -87,6 +87,8 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -133,7 +135,6 @@ import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -595,9 +596,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             // bulkload?)
             IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
-                    new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
-                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory);
+            LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
+                    fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
+                    indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -1001,8 +1002,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
+                        BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
                         null, true, modificationCallbackFactory);
@@ -1135,8 +1137,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
+                        BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
                         filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1237,9 +1240,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
-                        indexDataflowHelperFactory);
+                        indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
                         indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1353,8 +1356,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory);
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
+                        null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
                         filterFactory, modificationCallbackFactory, prevFieldPermutation);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 3fec73b..e5f97f0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveNotificationHandler;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
@@ -107,6 +108,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -505,15 +507,15 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             case BTREE:
                 return getDatasetType() == DatasetType.EXTERNAL
                         && !index.getIndexName().equals(IndexingConstants.getFilesIndexName(getDatasetName()))
-                                ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
-                                : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+                                ? new LSMBTreeWithBuddyIOOperationCallbackFactory(getComponentIdGeneratorFactory())
+                                : new LSMBTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
             case RTREE:
-                return LSMRTreeIOOperationCallbackFactory.INSTANCE;
+                return new LSMRTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
             case LENGTH_PARTITIONED_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case SINGLE_PARTITION_WORD_INVIX:
-                return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE;
+                return new LSMInvertedIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory());
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
                         index.getIndexType().toString());
@@ -532,6 +534,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
                 : new SecondaryIndexOperationTrackerFactory(getDatasetId());
     }
 
+    public ILSMComponentIdGeneratorFactory getComponentIdGeneratorFactory() {
+        return new DatasetLSMComponentIdGeneratorFactory(getDatasetId());
+    }
+
     /**
      * Get search callback factory for this dataset with the passed index and operation
      *

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5dac407..7701f65 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.metadata.utils;
 
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +49,8 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,6 +71,8 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -124,8 +126,8 @@ public abstract class SecondaryIndexOperationsHelper {
         this.index = index;
         this.physOptConf = physOptConf;
         this.metadataProvider = metadataProvider;
-        this.itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
-                dataset.getItemTypeName());
+        this.itemType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         this.metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
         Pair<ARecordType, ARecordType> enforcedTypes = getEnforcedType(index, itemType, metaType);
         this.enforcedItemType = enforcedTypes.first;
@@ -341,11 +343,15 @@ public abstract class SecondaryIndexOperationsHelper {
         return sortOp;
     }
 
-    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+    protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
             int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
             throws AlgebricksException {
-        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory);
+        IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
+                metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+
+        LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
+                primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..74590c7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+
+public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public enum BulkLoadUsage {
+        LOAD,
+        CREATE_INDEX
+    }
+
+    protected final IIndexDataflowHelperFactory primaryIndexHelperFactory;
+
+    protected final BulkLoadUsage usage;
+
+    protected final int datasetId;
+
+    public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
+            IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId) {
+        super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                indexHelperFactory);
+        this.primaryIndexHelperFactory = primaryIndexHelperFactory;
+        this.usage = usage;
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition,
+                fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
new file mode 100644
index 0000000..2415556
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+
+public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
+    protected final BulkLoadUsage usage;
+
+    protected final IIndexDataflowHelper primaryIndexHelper;
+    protected final IDatasetLifecycleManager datasetManager;
+    protected final int datasetId;
+    protected final int partition;
+
+    protected ILSMIndex primaryIndex;
+
+    public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
+            IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId)
+            throws HyracksDataException {
+        super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint,
+                checkIfEmptyIndex, recDesc);
+
+        if (priamryIndexDataflowHelperFactory != null) {
+            this.primaryIndexHelper =
+                    priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        } else {
+            this.primaryIndexHelper = null;
+        }
+        this.usage = usage;
+        this.datasetId = datasetId;
+        this.partition = partition;
+        INcApplicationContext ncCtx =
+                (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+        datasetManager = ncCtx.getDatasetLifecycleManager();
+    }
+
+    @Override
+    protected void initializeBulkLoader() throws HyracksDataException {
+        ILSMIndex targetIndex = (ILSMIndex) index;
+        if (usage.equals(BulkLoadUsage.LOAD)) {
+            // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
+            // than Ids of all memory components
+
+            // TODO handle component Id for datasets loaded multiple times
+            // TODO move this piece of code to io operation callback
+            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+            LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+        } else {
+            primaryIndexHelper.open();
+            primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
+            List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            if (!primaryComponents.isEmpty()) {
+                // TODO move this piece of code to io operation callback
+                // Ideally, this should be done in io operation callback when a bulk load operation is finished
+                // However, currently we don't have an extensible callback mechanism to support this
+                ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
+                        primaryComponents.get(primaryComponents.size() - 1).getId());
+                ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+                LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            super.close();
+        } finally {
+            if (primaryIndex != null) {
+                primaryIndexHelper.close();
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index cee20ce..6d9ec47 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,24 +19,20 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
 /**
  * This operator node is used to bulk load incoming tuples (scanned from the primary index)
@@ -56,12 +52,9 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
     private ILSMIndex primaryIndex;
     private ILSMIndex secondaryIndex;
 
-    private ILSMDiskComponent component;
-    private ILSMDiskComponentBulkLoader componentBulkLoader;
+    private LSMIndexDiskComponentBulkLoader componentBulkLoader;
     private int currentComponentPos = -1;
 
-    private ILSMDiskComponent[] diskComponents;
-
     public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
             IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
@@ -92,7 +85,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
         super.open();
         primaryIndexHelper.open();
         primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
-        diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()];
         secondaryIndexHelper.open();
         secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance();
 
@@ -107,8 +99,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
             closeException = e;
         }
 
-        activateComponents();
-
         try {
             if (primaryIndexHelper != null) {
                 primaryIndexHelper.close();
@@ -184,24 +174,22 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
     }
 
     private void endCurrentComponent() throws HyracksDataException {
-        if (component != null) {
-            // set disk component id
-
+        if (componentBulkLoader != null) {
             componentBulkLoader.end();
-            diskComponents[currentComponentPos] = component;
-
             componentBulkLoader = null;
-            component = null;
         }
     }
 
     private void loadNewComponent(int componentPos) throws HyracksDataException {
         endCurrentComponent();
 
-        component = secondaryIndex.createBulkLoadTarget();
         int numTuples = getNumDeletedTuples(componentPos);
-        componentBulkLoader = component.createBulkLoader(1.0f, false, numTuples, false, true, true);
-
+        ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
+        componentBulkLoader =
+                (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
+        ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
+        // TODO move this piece of code to io operation callback
+        LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
     }
 
     private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {
@@ -220,30 +208,6 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
 
     }
 
-    private void activateComponents() throws HyracksDataException {
-        List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
-        for (int i = diskComponents.length - 1; i >= 0; i--) {
-            // start from the oldest component to the newest component
-            if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) {
-                secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null,
-                        diskComponents[i]);
-
-                // setting component id has to be place between afterOperation and addBulkLoadedComponent,
-                // since afterOperation would set a flush component id (but it's not invalid)
-                // and addBulkLoadedComponent would finalize the component
-                ILSMDiskComponentId primaryComponentId = primaryComponents.get(i).getComponentId();
-                //set component id
-                diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
-                        LongPointable.FACTORY.createPointable(primaryComponentId.getMinId()));
-                diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
-                        LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId()));
-
-                ((AbstractLSMIndex) secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]);
-
-            }
-        }
-    }
-
     private int getNumDeletedTuples(int componentPos) {
         DeletedTupleCounter counter = (DeletedTupleCounter) ctx.getStateObject(partition);
         return counter.get(componentPos);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 5fc07ad..095159b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -65,7 +65,7 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
         index = indexHelper.getIndexInstance();
         try {
             writer.open();
-            bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            initializeBulkLoader();
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -116,4 +116,8 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
             writer.fail();
         }
     }
+
+    protected void initializeBulkLoader() throws HyracksDataException {
+        bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 6083637..a859f68 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -57,6 +57,7 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource {
     public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
+        ioOpCallbackFactory.initialize(serviceCtx);
         return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits,
                 cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 04b63f9..9422253 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -60,6 +60,7 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource {
     public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
+        ioOpCallbackFactory.initialize(serviceCtx);
         return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index dfa88da..1988736 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -70,6 +70,7 @@ public class LSMBTreeLocalResource extends LsmResource {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+        ioOpCallbackFactory.initialize(serviceCtx);
         //TODO: enable updateAwareness for secondary LSMBTree indexes
         boolean updateAware = false;
         return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index a8e707f..d759167 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -24,11 +24,10 @@ import java.util.Set;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
-public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent implements ILSMDiskComponent {
+public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
     protected final BTree btree;
 
     public LSMBTreeDiskComponent(AbstractLSMIndex lsmIndex, BTree btree, ILSMComponentFilter filter) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index a60f544..ab8e899 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -127,4 +127,11 @@ public interface ILSMComponent {
      * @return index data structure that is the stored in the component
      */
     IIndex getIndex();
+
+    /**
+     *
+     * @return id of the component
+     * @throws HyracksDataException
+     */
+    ILSMComponentId getId() throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
new file mode 100644
index 0000000..5662862
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * Stores the id of the disk component, which is a interval (minId, maxId).
+ * It is generated by {@link ILSMComponentIdGenerator}
+ *
+ */
+public interface ILSMComponentId {
+    public enum IdCompareResult {
+        UNKNOWN,
+        LESS_THAN,
+        GREATER_THAN,
+        INTERSECT,
+        INCLUDE
+    }
+
+    /**
+     * @return whether the id is missing
+     */
+    boolean missing();
+
+    IdCompareResult compareTo(ILSMComponentId id);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
new file mode 100644
index 0000000..5dd3061
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * This interface generates component Ids for LSM components (both memory and disk components).
+ */
+public interface ILSMComponentIdGenerator {
+
+    /**
+     * @return An Id for LSM component
+     */
+    public ILSMComponentId getId();
+
+    /**
+     * Refresh the component Id generator to generate the next Id.
+     * {@link #getId()} would always return the same Id before this method is called.
+     */
+    public void refresh();
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c0f530b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+@FunctionalInterface
+public interface ILSMComponentIdGeneratorFactory extends Serializable {
+
+    ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 43c5482..bd2bb45 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -48,14 +48,6 @@ public interface ILSMDiskComponent extends ILSMComponent {
     int getFileReferenceCount();
 
     /**
-     * Return the component Id of this disk component from its metadata
-     *
-     * @return
-     * @throws HyracksDataException
-     */
-    ILSMDiskComponentId getComponentId() throws HyracksDataException;
-
-    /**
      * @return LsmIndex of the component
      */
     AbstractLSMIndex getLsmIndex();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
deleted file mode 100644
index 5d38ace..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.api;
-
-import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
-
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and
- * currently are set as the flush LSN.
- * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of
- * all ids of merged disk components.
- *
- * @author luochen
- *
- */
-public interface ILSMDiskComponentId {
-
-    public static final long NOT_FOUND = -1;
-
-    public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
-            new MutableArrayValueReference("Component_Id_Min".getBytes());
-
-    public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
-            new MutableArrayValueReference("Component_Id_Max".getBytes());
-
-    long getMinId();
-
-    long getMaxId();
-
-    default boolean notFound() {
-        return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
index b291f7c..a9dc50e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -20,7 +20,15 @@ package org.apache.hyracks.storage.am.lsm.common.api;
 
 import java.io.Serializable;
 
-@FunctionalInterface
+import org.apache.hyracks.api.application.INCServiceContext;
+
 public interface ILSMIOOperationCallbackFactory extends Serializable {
+    /**
+     * Initialize the callback factory with the given ncCtx
+     *
+     * @param ncCtx
+     */
+    void initialize(INCServiceContext ncCtx);
+
     ILSMIOOperationCallback createIoOpCallback(ILSMIndex index);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 13543e4..f892585 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -101,4 +101,12 @@ public interface ILSMMemoryComponent extends ILSMComponent {
      * @return the size of the memory component
      */
     long getSize();
+
+    /**
+     * Reset the component Id of the memory component after it's recycled
+     *
+     * @param newId
+     * @throws HyracksDataException
+     */
+    void resetId(ILSMComponentId newId) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index a0d1c23..b664102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -18,19 +18,29 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
 
+    private static final Logger LOGGER = Logger.getLogger(AbstractLSMDiskComponent.class.getName());
+
     private final DiskComponentMetadata metadata;
 
+    // a variable cache of componentId stored in metadata.
+    // since componentId is immutable, we do not want to read from metadata every time the componentId
+    // is requested.
+    private ILSMComponentId componentId;
+
     public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
             ILSMComponentFilter filter) {
         super(lsmIndex, filter);
@@ -109,13 +119,23 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl
     }
 
     @Override
-    public ILSMDiskComponentId getComponentId() throws HyracksDataException {
-        long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
-                ILSMDiskComponentId.NOT_FOUND);
-        long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
-                ILSMDiskComponentId.NOT_FOUND);
-        //TODO: do we need to throw an exception when ID is not found?
-        return new LSMDiskComponentId(minID, maxID);
+    public ILSMComponentId getId() throws HyracksDataException {
+        if (componentId != null) {
+            return componentId;
+        }
+        synchronized (this) {
+            if (componentId == null) {
+                componentId = LSMComponentIdUtils.readFrom(metadata);
+            }
+        }
+        if (componentId.missing()) {
+            // For normal datasets, componentId shouldn't be missing, since otherwise it'll be a bug.
+            // However, we cannot throw an exception here to be compatible with legacy datasets.
+            // In this case, the disk component would always get a garbage Id [-1, -1], which makes the
+            // component Id-based optimization useless but still correct.
+            LOGGER.warning("Component Id not found from disk component metadata");
+        }
+        return componentId;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2b2fe0d..b0cc318 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -44,6 +44,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -438,6 +440,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
+        assert checkComponentIds();
     }
 
     @Override
@@ -448,6 +451,25 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         if (newComponent != EmptyComponent.INSTANCE) {
             diskComponents.add(swapIndex, newComponent);
         }
+        assert checkComponentIds();
+    }
+
+    /**
+     * A helper method to ensure disk components have proper Ids (non-decreasing)
+     * We may get rid of this method once component Id is stablized
+     *
+     * @throws HyracksDataException
+     */
+    private boolean checkComponentIds() throws HyracksDataException {
+        for (int i = 0; i < diskComponents.size() - 1; i++) {
+            ILSMComponentId id1 = diskComponents.get(i).getId();
+            ILSMComponentId id2 = diskComponents.get(i + 1).getId();
+            IdCompareResult cmp = id1.compareTo(id2);
+            if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
+                return false;
+            }
+        }
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index b7c3350..0378aae 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -22,9 +22,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent {
@@ -34,6 +37,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     private int writerCount;
     private boolean requestedToBeActive;
     private final MemoryComponentMetadata metadata;
+    private ILSMComponentId componentId;
 
     public AbstractLSMMemoryComponent(AbstractLSMIndex lsmIndex, IVirtualBufferCache vbc, boolean isActive,
             ILSMComponentFilter filter) {
@@ -247,6 +251,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     protected void doDeallocate() throws HyracksDataException {
         getIndex().deactivate();
         getIndex().destroy();
+        componentId = null;
     }
 
     @Override
@@ -259,4 +264,19 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         IBufferCache virtualBufferCache = getIndex().getBufferCache();
         return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
     }
+
+    @Override
+    public ILSMComponentId getId() {
+        return componentId;
+    }
+
+    @Override
+    public void resetId(ILSMComponentId componentId) throws HyracksDataException {
+        if (this.componentId != null && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
+            throw new IllegalStateException(
+                    "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId);
+        }
+        this.componentId = componentId;
+        LSMComponentIdUtils.persist(this.componentId, metadata);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index f2751bf..e3ca9f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -25,8 +25,8 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndex;
 
@@ -83,8 +83,8 @@ public class EmptyComponent implements ILSMDiskComponent {
     }
 
     @Override
-    public ILSMDiskComponentId getComponentId() throws HyracksDataException {
-        return null;
+    public ILSMComponentId getId() {
+        return LSMComponentId.MISSING_COMPONENT_ID;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
new file mode 100644
index 0000000..dd86f65
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public class LSMComponentId implements ILSMComponentId {
+
+    public static final long NOT_FOUND = -1;
+
+    // Use to handle legacy datasets which do not have the component Id
+    public static final ILSMComponentId MISSING_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
+
+    // A default component id used for bulk loaded component
+    public static final ILSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+
+    private long minId;
+
+    private long maxId;
+
+    public LSMComponentId(long minId, long maxId) {
+        assert minId <= maxId;
+        this.minId = minId;
+        this.maxId = maxId;
+    }
+
+    public void reset(long minId, long maxId) {
+        this.minId = minId;
+        this.maxId = maxId;
+    }
+
+    public long getMinId() {
+        return this.minId;
+    }
+
+    public long getMaxId() {
+        return this.maxId;
+    }
+
+    @Override
+    public boolean missing() {
+        return minId == NOT_FOUND || maxId == NOT_FOUND;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + minId + "," + maxId + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof LSMComponentId)) {
+            return false;
+        }
+        LSMComponentId other = (LSMComponentId) obj;
+        if (maxId != other.maxId) {
+            return false;
+        }
+        if (minId != other.minId) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public IdCompareResult compareTo(ILSMComponentId id) {
+        if (this.missing() || id == null || id.missing()) {
+            return IdCompareResult.UNKNOWN;
+        }
+        LSMComponentId componentId = (LSMComponentId) id;
+        if (this.getMinId() > componentId.getMaxId()) {
+            return IdCompareResult.GREATER_THAN;
+        } else if (this.getMaxId() < componentId.getMinId()) {
+            return IdCompareResult.LESS_THAN;
+        } else if (this.getMinId() <= componentId.getMinId() && this.getMaxId() >= componentId.getMaxId()) {
+            return IdCompareResult.INCLUDE;
+        } else {
+            return IdCompareResult.INTERSECT;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
new file mode 100644
index 0000000..e174153
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGenerator}.
+ *
+ */
+public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
+
+    protected long previousTimestamp = -1L;
+
+    private ILSMComponentId componentId;
+
+    public LSMComponentIdGenerator() {
+        refresh();
+    }
+
+    @Override
+    public void refresh() {
+        long ts = getCurrentTimestamp();
+        componentId = new LSMComponentId(ts, ts);
+    }
+
+    @Override
+    public ILSMComponentId getId() {
+        return componentId;
+    }
+
+    protected long getCurrentTimestamp() {
+        long timestamp = System.currentTimeMillis();
+        while (timestamp <= previousTimestamp) {
+            // make sure timestamp is strictly increasing
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            timestamp = System.currentTimeMillis();
+        }
+        previousTimestamp = timestamp;
+        return timestamp;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c55ef19
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGeneratorFactory}.
+ *
+ */
+public class LSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory {
+
+    @Override
+    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+        return new LSMComponentIdGenerator();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
deleted file mode 100644
index f448c84..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.lsm.common.impls;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-
-public class LSMDiskComponentId implements ILSMDiskComponentId {
-
-    private final long minId;
-
-    private final long maxId;
-
-    public LSMDiskComponentId(long minId, long maxId) {
-        this.minId = minId;
-        this.maxId = maxId;
-    }
-
-    @Override
-    public long getMinId() {
-        return this.minId;
-    }
-
-    @Override
-    public long getMaxId() {
-        return this.maxId;
-    }
-
-    @Override
-    public String toString() {
-        return "[" + minId + "," + maxId + "]";
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (!(obj instanceof LSMDiskComponentId)) {
-            return false;
-        }
-        LSMDiskComponentId other = (LSMDiskComponentId) obj;
-        if (maxId != other.maxId) {
-            return false;
-        }
-        if (minId != other.minId) {
-            return false;
-        }
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 48b6d8f..b0abeb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -95,15 +95,24 @@ public class LSMHarness implements ILSMHarness {
                 // Before entering the components, prune those corner cases that indeed should not proceed.
                 switch (opType) {
                     case FLUSH:
+                        // if the lsm index does not have memory components allocated, then nothing to flush
+                        if (!lsmIndex.isMemoryComponentsAllocated()) {
+                            return false;
+                        }
                         ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
                         if (!flushingComponent.isModified()) {
-                            //The mutable component has not been modified by any writer. There is nothing to flush.
-                            //since the component is empty, set its state back to READABLE_WRITABLE
                             if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) {
+                                //The mutable component has not been modified by any writer. There is nothing to flush.
+                                //since the component is empty, set its state back to READABLE_WRITABLE only when it's
+                                //state has been set to READABLE_UNWRITABLE
                                 flushingComponent.setState(ComponentState.READABLE_WRITABLE);
                                 opTracker.notifyAll();
+
+                                // Call recycled only when we change it's state is reset back to READABLE_WRITABLE
+                                // Otherwise, if the component is in other state, e.g., INACTIVE, or
+                                // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here.
+                                lsmIndex.getIOOperationCallback().recycled(flushingComponent);
                             }
-                            lsmIndex.getIOOperationCallback().recycled(flushingComponent);
                             return false;
                         }
                         if (flushingComponent.getWriterCount() > 0) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 08b8bb6..000d5cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -21,13 +21,14 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
 public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     private final AbstractLSMIndex lsmIndex;
     private final ILSMDiskComponent component;
-    private final IIndexBulkLoader componentBulkLoader;
+    private final ILSMDiskComponentBulkLoader componentBulkLoader;
 
     public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput,
             long numElementsHint) throws HyracksDataException {
@@ -39,11 +40,19 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
                 component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
     }
 
+    public ILSMDiskComponent getComponent() {
+        return component;
+    }
+
     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
         componentBulkLoader.add(tuple);
     }
 
+    public void delete(ITupleReference tuple) throws HyracksDataException {
+        componentBulkLoader.delete(tuple);
+    }
+
     @Override
     public void end() throws HyracksDataException {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/39390edc/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 09ca553..21d10d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.util.List;
 
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -37,6 +38,11 @@ public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFac
         return NoOpIOOperationCallback.INSTANCE;
     }
 
+    @Override
+    public void initialize(INCServiceContext ncCtx) {
+        // No op
+    }
+
     public static class NoOpIOOperationCallback implements ILSMIOOperationCallback {
         private static final NoOpIOOperationCallback INSTANCE = new NoOpIOOperationCallback();