You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/09/02 13:56:44 UTC

[2/3] asterixdb git commit: ASTERIXDB-1238: Refactor AqlMetadataProvider

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/55a558f2/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 72360b6..0975163 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -68,17 +68,14 @@ import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
@@ -123,11 +120,9 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -174,23 +169,27 @@ import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
 
+    private final AsterixStorageProperties storageProperties;
+    private final ILibraryManager libraryManager;
+    private final Dataverse defaultDataverse;
+
     private MetadataTransactionContext mdTxnCtx;
     private boolean isWriteTransaction;
-    private final Map<String, String[]> stores;
     private Map<String, String> config;
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
     private boolean asyncResults;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
-
-    private final Dataverse defaultDataverse;
     private JobId jobId;
     private Map<String, Integer> locks;
     private boolean isTemporaryDatasetWriteJob = true;
 
-    private final AsterixStorageProperties storageProperties;
-    private final ILibraryManager libraryManager;
+    public AqlMetadataProvider(Dataverse defaultDataverse) {
+        this.defaultDataverse = defaultDataverse;
+        this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+    }
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -200,21 +199,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         this.config = config;
     }
 
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
     public Map<String, String> getConfig() {
         return config;
     }
 
-    public AqlMetadataProvider(Dataverse defaultDataverse) {
-        this.defaultDataverse = defaultDataverse;
-        this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-        this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
-    }
-
     public ILibraryManager getLibraryManager() {
         return libraryManager;
     }
@@ -283,43 +271,106 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return resultSerializerFactoryProvider;
     }
 
+    public boolean isWriteTransaction() {
+        // The transaction writes persistent datasets.
+        return isWriteTransaction;
+    }
+
+    public boolean isTemporaryDatasetWriteJob() {
+        // The transaction only writes temporary datasets.
+        return isTemporaryDatasetWriteJob;
+    }
+
+    public IDataFormat getFormat() {
+        return FormatUtils.getDefaultFormat();
+    }
+
+    public AsterixStorageProperties getStorageProperties() {
+        return storageProperties;
+    }
+
+    public Map<String, Integer> getLocks() {
+        return locks;
+    }
+
+    public void setLocks(Map<String, Integer> locks) {
+        this.locks = locks;
+    }
+
     /**
      * Retrieve the Output RecordType, as defined by "set output-record-type".
      */
     public ARecordType findOutputRecordType() throws AlgebricksException {
-        String outputRecordType = getPropertyValue("output-record-type");
-        if (outputRecordType == null) {
+        return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
+                getPropertyValue("output-record-type"));
+    }
+
+    public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
+        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
+                : dataverse;
+        if (dv == null) {
             return null;
         }
-        String dataverse = getDefaultDataverseName();
-        if (dataverse == null) {
-            throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
-        }
-        IAType type = findType(dataverse, outputRecordType);
-        if (!(type instanceof ARecordType)) {
-            throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
-        }
-        return (ARecordType) type;
+        return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
+    }
+
+    public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
+        return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+    }
+
+    public IAType findType(String dataverse, String typeName) throws AlgebricksException {
+        return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
+    }
+
+    public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
+        return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
+    }
+
+    public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
+        return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName);
     }
 
     @Override
     public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
-        AqlSourceId aqlId = id;
+        return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+    }
+
+    public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException {
+        return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+    }
+
+    @Override
+    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
+            throws AlgebricksException {
+        AqlDataSource ads = findDataSource(dataSourceId);
+        Dataset dataset = ((DatasetDataSource) ads).getDataset();
         try {
-            return lookupSourceInMetadata(aqlId);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
+            String indexName = indexId;
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+            if (secondaryIndex != null) {
+                return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+            } else {
+                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                        dataset.getDatasetName(), dataset.getDatasetName());
+                if (primaryIndex.getIndexName().equals(indexId)) {
+                    return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+                } else {
+                    return null;
+                }
+            }
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
         }
     }
 
-    public boolean isWriteTransaction() {
-        // The transaction writes persistent datasets.
-        return isWriteTransaction;
+    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
+        return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
     }
 
-    public boolean isTemporaryDatasetWriteJob() {
-        // The transaction only writes temporary datasets.
-        return isTemporaryDatasetWriteJob;
+    @Override
+    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+        return AsterixBuiltinFunctions.lookupFunction(fid);
     }
 
     @Override
@@ -337,8 +388,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         }
     }
 
-    public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
-            throws AsterixException {
+    public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) {
         return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
     }
 
@@ -367,64 +417,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return format;
     }
 
-    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
-            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
-            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
-        try {
-            configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
-            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
-                    configuration, itemType, metaType);
-
-            // check to see if dataset is indexed
-            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(),
-                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
-
-            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
-                // get files
-                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
-                Iterator<ExternalFile> iterator = files.iterator();
-                while (iterator.hasNext()) {
-                    if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
-                        iterator.remove();
-                    }
-                }
-                // TODO Check this call, result of merge from master!
-                // ((IGenericAdapterFactory) adapterFactory).setFiles(files);
-            }
-
-            return adapterFactory;
-        } catch (Exception e) {
-            throw new AlgebricksException("Unable to create adapter", e);
-        }
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
-            throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
-
-        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
-                adapterFactory);
-
-        AlgebricksPartitionConstraint constraint;
-        try {
-            constraint = adapterFactory.getPartitionConstraint();
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
-    }
-
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
+        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
         factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx,
                 libraryManager);
         ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
@@ -445,8 +440,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         }
 
         AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
-        return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
-                partitionConstraint, adapterFactory);
+        return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
@@ -454,7 +448,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
             Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
@@ -496,10 +489,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 }
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
                         getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-                                secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
-                                secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
-                                dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
-                                secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+                        secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
+                        DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+                        dataset.hasMetaPart(), primaryKeyIndicators, secondaryIndex.getKeyFieldSourceIndicators(),
+                        metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -519,22 +512,18 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 // get meta item type
                 ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
                 typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
-                        metaItemType, context.getBinaryComparatorFactoryProvider());
+                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
+                        context.getBinaryComparatorFactoryProvider());
                 filterFields = DatasetUtils.createFilterFields(dataset);
                 btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            try {
-                spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
-                        dataset.getDatasetName(), indexName, temp);
-            } catch (Exception e) {
-                throw new AlgebricksException(e);
-            }
+            spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
+                    indexName, temp);
 
-            ISearchOperationCallbackFactory searchCallbackFactory = null;
+            ISearchOperationCallbackFactory searchCallbackFactory;
             if (isSecondary) {
                 searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
                         : new SecondaryIndexSearchOperationCallbackFactory();
@@ -580,83 +569,29 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 int[] buddyBreeFields = new int[] { numSecondaryKeys };
                 ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
                         new ExternalBTreeWithBuddyDataflowHelperFactory(
-                                compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                                getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+                        compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                        getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
                 btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
                         retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
             }
-
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-
+            return new Pair<>(btreeSearchOp, spPc.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-            IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
-            List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType, boolean hasMeta,
-            List<Integer> primaryIndexKeyIndicators, List<Integer> secondaryIndexIndicators, ARecordType metaType)
-            throws AlgebricksException {
-
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        int sidxKeyFieldCount = sidxKeyFieldNames.size();
-        int pidxKeyFieldCount = pidxKeyFieldNames.size();
-        typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
-        comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
-
-        int i = 0;
-        for (; i < sidxKeyFieldCount; ++i) {
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
-                    sidxKeyFieldNames.get(i),
-                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
-            IAType keyType = keyPairType.first;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-
-        for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
-            IAType keyType = null;
-            try {
-                switch (dsType) {
-                    case INTERNAL:
-                        keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
-                                ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
-                                : recType.getSubFieldType(pidxKeyFieldNames.get(j));
-                        break;
-                    case EXTERNAL:
-                        keyType = IndexingConstants.getFieldType(j);
-                        break;
-                    default:
-                        throw new AlgebricksException("Unknown Dataset Type");
-                }
-            } catch (AsterixException e) {
-                throw new AlgebricksException(e);
-            }
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-
-        return new Pair<IBinaryComparatorFactory[], ITypeTraits[]>(comparatorFactories, typeTraits);
-    }
-
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
             int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
         try {
-            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(),
-                    dataset.getItemTypeName());
+            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
 
             boolean temp = dataset.getDatasetDetails().isTemp();
@@ -670,9 +605,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
             int numSecondaryKeys = secondaryKeyFields.size();
             if (numSecondaryKeys != 1) {
-                throw new AlgebricksException("Cannot use " + numSecondaryKeys
-                        + " fields as a key for the R-tree index. "
-                        + "There can be only one field as a key for the R-tree index.");
+                throw new AlgebricksException(
+                        "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. "
+                                + "There can be only one field as a key for the R-tree index.");
             }
             Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
                     secondaryKeyFields.get(0), recType);
@@ -701,8 +636,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName, temp);
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
                 metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
@@ -770,28 +705,12 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                         retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
             }
 
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
-
+            return new Pair<>(rtreeSearchOp, spPc.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    private IBinaryComparatorFactory[] getMergedComparatorFactories(IBinaryComparatorFactory[] comparatorFactories,
-            IBinaryComparatorFactory[] primaryComparatorFactories) {
-        IBinaryComparatorFactory[] btreeCompFactories = null;
-        int btreeCompFactoriesCount = comparatorFactories.length + primaryComparatorFactories.length;
-        btreeCompFactories = new IBinaryComparatorFactory[btreeCompFactoriesCount];
-        int i = 0;
-        for (; i < comparatorFactories.length; i++) {
-            btreeCompFactories[i] = comparatorFactories[i];
-        }
-        for (int j = 0; i < btreeCompFactoriesCount; i++, j++) {
-            btreeCompFactories[i] = primaryComparatorFactories[j];
-        }
-        return btreeCompFactories;
-    }
-
     @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
@@ -804,7 +723,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
                 getWriterFactory(), inputDesc);
         AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
-        return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
+        return new Pair<>(runtime, apc);
     }
 
     @Override
@@ -814,7 +733,6 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         ResultSetDataSink rsds = (ResultSetDataSink) sink;
         ResultSetSinkId rssId = rsds.getId();
         ResultSetId rsId = rssId.getResultSetId();
-
         ResultWriterOperatorDescriptor resultWriter = null;
         try {
             IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
@@ -824,70 +742,24 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
-
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
+        return new Pair<>(resultWriter, null);
     }
 
     @Override
-    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
-            throws AlgebricksException {
-        AqlDataSource ads = findDataSource(dataSourceId);
-        Dataset dataset = ((DatasetDataSource) ads).getDataset();
-
-        try {
-            String indexName = indexId;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            if (secondaryIndex != null) {
-                return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-            } else {
-                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), dataset.getDatasetName());
-                if (primaryIndex.getIndexName().equals(indexId)) {
-                    return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-                } else {
-                    return null;
-                }
-            }
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
-        Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
-        if (dataset == null) {
-            throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
-        }
-        IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-        IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
-        INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
-        byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
-                ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
-        return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
-                dataset.getDatasetDetails(), domain);
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        String dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasourceName();
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        String dataverseName = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
 
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         int numKeys = keys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
 
         // move key fields to front
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
         int[] bloomFilterKeyFields = new int[numKeys];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
         int i = 0;
         for (LogicalVariable varKey : keys) {
             int idx = propagatedSchema.findVariable(varKey);
@@ -923,8 +795,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                     itemType, metaType, context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
+                            datasetName, indexName, temp);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -953,114 +825,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                             LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
                             filterCmpFactories, btreeFields, filterFields, !temp));
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
-                    splitsAndConstraint.second);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
-            List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
-
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException(
-                    "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        int numKeys = keys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-        // Move key fields to front.
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
-                + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[i++] = idx;
-        }
-        if (additionalNonFilteringFields != null) {
-            for (LogicalVariable variable : additionalNonFilteringFields) {
-                int idx = propagatedSchema.findVariable(variable);
-                fieldPermutation[i++] = idx;
-            }
-        }
-
-        try {
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            int[] primaryKeyFields = new int[numKeys];
-            for (i = 0; i < numKeys; i++) {
-                primaryKeyFields[i] = i;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-
+            return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
@@ -1072,7 +837,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
             JobSpecification spec, boolean bulkload) throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
+        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload,
                 additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
     }
 
@@ -1081,58 +846,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
+        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload,
                 additionalNonKeyFields, recordDesc, context, spec, false, null);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
-            IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
-        String indexName = dataSourceIndex.getId();
-        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        Index secondaryIndex;
-        try {
-            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        switch (secondaryIndex.getIndexType()) {
-            case BTREE: {
-                return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        bulkload);
-            }
-            case RTREE: {
-                return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        bulkload);
-            }
-            case SINGLE_PARTITION_WORD_INVIX:
-            case SINGLE_PARTITION_NGRAM_INVIX:
-            case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                return getInvertedIndexDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
-                        primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
-                        indexOp, secondaryIndex.getIndexType(), bulkload);
-            }
-            default: {
-                throw new AlgebricksException(
-                        "Insert and delete not implemented for index type: " + secondaryIndex.getIndexType());
-            }
-        }
-    }
-
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
             IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -1140,9 +857,34 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
             boolean bulkload) throws AlgebricksException {
-        return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
-                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
-                bulkload);
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+                context, spec, bulkload, null, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+            throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+                context, spec, false, null, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
+                context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey);
     }
 
     @Override
@@ -1156,17 +898,14 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 
-        IOperatorSchema inputSchema = new OperatorSchemaImpl();
+        IOperatorSchema inputSchema;
         if (inputSchemas.length > 0) {
             inputSchema = inputSchemas[0];
         } else {
             throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
         }
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         Index secondaryIndex;
         try {
             secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1174,1171 +913,890 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
         // TokenizeOperator only supports a keyword or n-gram index.
         switch (secondaryIndex.getIndexType()) {
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
                 return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
-                        typeEnv, primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
-                        IndexOperation.INSERT, secondaryIndex.getIndexType(), bulkload);
-            }
-            default: {
+                        primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType());
+            default:
                 throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
                         + secondaryIndex.getIndexType());
-            }
         }
-
     }
 
-    // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
-            throws AlgebricksException {
-
-        // Sanity checks.
-        if (primaryKeys.size() > 1) {
-            throw new AlgebricksException("Cannot tokenize composite primary key.");
+    /**
+     * Calculate an estimate size of the bloom filter. Note that this is an
+     * estimation which assumes that the data is going to be uniformly
+     * distributed across all partitions.
+     *
+     * @param dataset
+     * @return Number of elements that will be used to create a bloom filter per
+     *         dataset per partition
+     * @throws MetadataException
+     * @throws AlgebricksException
+     */
+    public long getCardinalityPerPartitionHint(Dataset dataset) throws MetadataException, AlgebricksException {
+        String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME);
+        long numElementsHint;
+        if (numElementsHintString == null) {
+            numElementsHint = DatasetCardinalityHint.DEFAULT;
+        } else {
+            numElementsHint = Long.parseLong(numElementsHintString);
         }
-        if (secondaryKeys.size() > 1) {
-            throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
+        int numPartitions = 0;
+        List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                .getNodeNames();
+        for (String nd : nodeGroup) {
+            numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
         }
+        numElementsHint = numElementsHint / numPartitions;
+        return numElementsHint;
+    }
 
-        boolean isPartitioned;
-        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-            isPartitioned = true;
-        } else {
-            isPartitioned = false;
-        }
+    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
+            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
+        try {
+            configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
+            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
+                    configuration, itemType, metaType);
 
-        // Number of Keys that needs to be propagated
-        int numKeys = inputSchema.getSize();
+            // check to see if dataset is indexed
+            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
 
-        // Get the rest of Logical Variables that are not (PK or SK) and each
-        // variable's positions.
-        // These variables will be propagated through TokenizeOperator.
-        List<LogicalVariable> otherKeys = new ArrayList<LogicalVariable>();
-        if (inputSchema.getSize() > 0) {
-            for (int k = 0; k < inputSchema.getSize(); k++) {
-                boolean found = false;
-                for (LogicalVariable varKey : primaryKeys) {
-                    if (varKey.equals(inputSchema.getVariable(k))) {
-                        found = true;
-                        break;
-                    } else {
-                        found = false;
+            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
+                // get files
+                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+                Iterator<ExternalFile> iterator = files.iterator();
+                while (iterator.hasNext()) {
+                    if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+                        iterator.remove();
                     }
                 }
-                if (!found) {
-                    for (LogicalVariable varKey : secondaryKeys) {
-                        if (varKey.equals(inputSchema.getVariable(k))) {
-                            found = true;
-                            break;
-                        } else {
-                            found = false;
-                        }
-                    }
-                }
-                if (!found) {
-                    otherKeys.add(inputSchema.getVariable(k));
-                }
             }
-        }
-
-        // For tokenization, sorting and loading.
-        // One token (+ optional partitioning field) + primary keys + secondary
-        // keys + other variables
-        // secondary keys and other variables will be just passed to the
-        // IndexInsertDelete Operator.
-        int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
-
-        // generate field permutations for the input
-        int[] fieldPermutation = new int[numKeys];
-
-        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-        int i = 0;
-        int j = 0;
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            modificationCallbackPrimaryKeyFields[j] = i;
-            i++;
-            j++;
-        }
-        for (LogicalVariable varKey : otherKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+            return adapterFactory;
+        } catch (Exception e) {
+            throw new AlgebricksException("Unable to create adapter", e);
         }
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
-
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be tokenized.");
-            }
-
-            ARecordType recType = (ARecordType) itemType;
-
-            // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-
-            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
+    }
 
-            int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
-            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
-            ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
+    public JobId getJobId() {
+        return jobId;
+    }
 
-            // Find the key type of the secondary key. If it's a derived type,
-            // return the derived type.
-            // e.g. UNORDERED LIST -> return UNORDERED LIST type
-            IAType secondaryKeyType = null;
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
-                    secondaryKeyExprs.get(0), recType);
-            secondaryKeyType = keyPairType.first;
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-            i = 0;
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
+    public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
+            throws AlgebricksException {
+        return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
+                numKeyFields / 2);
+    }
 
-            tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
-            if (isPartitioned) {
-                // The partitioning field is hardcoded to be a short *without*
-                // an Asterix type tag.
-                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
-            }
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
+            String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
 
-            IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
-                    secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+            String dataverse) {
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse);
+    }
 
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
+    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
+            String targetIdxName, boolean temp) throws AlgebricksException {
+        return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+    }
 
-            // Generate Output Record format
-            ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
-            ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
-            ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+    public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
+            throws MetadataException {
+        DatasourceAdapter adapter;
+        // search in default namespace (built-in adapter)
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
 
-            // The order of the output record: propagated variables (including
-            // PK and SK), token, and number of token.
-            // #1. propagate all input variables
-            for (int k = 0; k < recordDesc.getFieldCount(); k++) {
-                tokenKeyPairFields[k] = recordDesc.getFields()[k];
-                tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
-            }
-            int tokenOffset = recordDesc.getFieldCount();
+        // search in dataverse (user-defined adapter)
+        if (adapter == null) {
+            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+        }
+        return adapter;
+    }
 
-            // #2. Specify the token type
-            tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
-            tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
-            tokenOffset++;
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
+    }
 
-            // #3. Specify the length-partitioning key: number of token
-            if (isPartitioned) {
-                tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
-                tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
-            }
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+            String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+        return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
+                datasetName, targetIdxName, create);
+    }
 
-            RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
-            IOperatorDescriptor tokenizerOp;
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
+            JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
+            JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainMissing)
+            throws AlgebricksException {
+        try {
+            // Get data type
+            IAType itemType;
+            itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                    dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
 
-            // Keys to be tokenized : SK
-            int docField = fieldPermutation[fieldPermutation.length - 1];
+            // Create the adapter factory <- right now there is only one. if there are more in the future, we can create
+            // a map->
+            ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(libraryManager,
+                    datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainMissing,
+                    context.getMissingWriterFactory());
 
-            // Keys to be propagated
-            int[] keyFields = new int[numKeys];
-            for (int k = 0; k < keyFields.length; k++) {
-                keyFields[k] = k;
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
+            try {
+                compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+            } catch (MetadataException e) {
+                throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
             }
 
-            tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
-                    keyFields, isPartitioned, true);
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp,
-                    splitsAndConstraint.second);
+            boolean temp = datasetDetails.isTemp();
+            // Create the file index data flow helper
+            ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+                    compactionInfo.first, compactionInfo.second,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+                    ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
 
+            // Create the out record descriptor, appContext and fileSplitProvider for the files index
+            RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+            spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
+            ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                    : new SecondaryIndexSearchOperationCallbackFactory();
+            // Create the operator
+            ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
+                    outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
+                    appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
+                    metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
+                    retainMissing, context.getMissingWriterFactory());
+            return new Pair<>(op, spPc.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
 
     @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
-                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
-                false);
-    }
-
-    private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
-            throws AlgebricksException {
-        // No filtering condition.
-        if (filterExpr == null) {
-            return null;
-        }
-        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
-        IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
-                typeEnv, inputSchemas, context);
-        return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload) throws AlgebricksException {
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
+            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
         if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+            throw new AlgebricksException(
+                    "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
         }
         boolean temp = dataset.getDatasetDetails().isTemp();
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 
-        int numKeys = primaryKeys.size() + secondaryKeys.size();
+        int numKeys = primaryKeys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
-        // generate field permutations
-        int[] fieldPermutation = new int[numKeys + numFilterFields];
-        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
-        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
+        // Move key fields to front. [keys, record, filters]
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
+        int[] bloomFilterKeyFields = new int[numKeys];
         int i = 0;
-        int j = 0;
-        for (LogicalVariable varKey : secondaryKeys) {
+        // set the keys' permutations
+        for (LogicalVariable varKey : primaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
             bloomFilterKeyFields[i] = i;
             i++;
         }
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            modificationCallbackPrimaryKeyFields[j] = i;
-            i++;
-            j++;
-        }
+        // set the record permutation
+        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+        // set the filters' permutations.
         if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys] = idx;
+            int idx = propagatedSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[i++] = idx;
         }
 
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
-
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-
-            ARecordType recType = (ARecordType) itemType;
-
-            // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] btreeFields = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numKeys;
-                btreeFields = new int[numKeys];
-                for (int k = 0; k < btreeFields.length; k++) {
-                    btreeFields[k] = k;
-                }
+        if (additionalNonFilterFields != null) {
+            for (LogicalVariable var : additionalNonFilterFields) {
+                int idx = propagatedSchema.findVariable(var);
+                fieldPermutation[i++] = idx;
             }
+        }
 
-            List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-            for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), recType);
-                IAType keyType = keyPairType.first;
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                        true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                        true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
+        try {
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            String indexName = primaryIndex.getIndexName();
 
+            String itemTypeName = dataset.getItemTypeName();
+            String itemTypeDataverseName = dataset.getItemTypeDataverseName();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype();
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+                            indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = DatasetUtils.createFilterFields(dataset);
+            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+                    : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
+                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
+
+            LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
+                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
                     .getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
                     new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
                     btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp,
-                        new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
-                                compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, !temp),
-                        filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
+            AsterixLSMTreeUpsertOperatorDescriptor op;
+
+            ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+                outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
+                outputSerDes[j] = recordDesc.getFields()[j];
             }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
+            outputSerDes[outputSerDes.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
+                    .getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+            outputTypeTraits[outputTypeTraits.length - (dataset.hasMetaPart() ? 2 : 1) - numFilterFields] = FormatUtils
+                    .getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType);
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            IndexType indexType, boolean bulkload) throws AlgebricksException {
+            if (dataset.hasMetaPart()) {
+                outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+                        .getSerdeProvider().getSerializerDeserializer(metaItemType);
+                outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+                        .getTypeTraitProvider().getTypeTrait(metaItemType);
+            }
 
-        // Check the index is length-partitioned or not.
-        boolean isPartitioned;
-        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-            isPartitioned = true;
-        } else {
-            isPartitioned = false;
-        }
+            int fieldIdx = -1;
+            if (numFilterFields > 0) {
+                String filterField = DatasetUtils.getFilterField(dataset).get(0);
+                for (i = 0; i < itemType.getFieldNames().length; i++) {
+                    if (itemType.getFieldNames()[i].equals(filterField)) {
+                        break;
+                    }
+                }
+                fieldIdx = i;
+                outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+                outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                        .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            }
 
-        // Sanity checks.
-        if (primaryKeys.size() > 1) {
-            throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
-        }
-        // The size of secondaryKeys can be two if it receives input from its
-        // TokenizeOperator- [token, number of token]
-        if (secondaryKeys.size() > 1 && !isPartitioned) {
-            throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
-        } else if (secondaryKeys.size() > 2 && isPartitioned) {
-            throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
-        }
+            RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+            op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
+                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                    idfh, null, true, indexName, context.getMissingWriterFactory(), modificationCallbackFactory,
+                    searchCallbackFactory, null);
+            op.setType(itemType);
+            op.setFilterIndex(fieldIdx);
+            return new Pair<>(op, splitsAndConstraint.second);
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new Alge

<TRUNCATED>