You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/01 09:31:28 UTC

[09/13] incubator-asterixdb git commit: Add Support for Upsert Operation

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 14a3e2a..d523c6e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -636,11 +636,11 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
                     token = admLexer.next();
                     this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput());
                     if (openRecordField) {
-                        if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
+                        if (fieldValueBuffer.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                             recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
                         }
                     } else if (NonTaggedFormatUtil.isOptional(recType)) {
-                        if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
+                        if (fieldValueBuffer.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                             recBuilder.addField(fieldId, fieldValueBuffer);
                         }
                     } else {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterix-lang-aql/src/main/javacc/AQL.jj b/asterix-lang-aql/src/main/javacc/AQL.jj
index 8f62f74..93e3f68 100644
--- a/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -124,6 +124,7 @@ import org.apache.asterix.lang.common.statement.SetStatement;
 import org.apache.asterix.lang.common.statement.TypeDecl;
 import org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.UpdateStatement;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
 import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.QuantifiedPair;
@@ -884,12 +885,17 @@ InsertStatement InsertStatement() throws ParseException:
 {
   Pair<Identifier,Identifier> nameComponents = null;
   Query query;
+  boolean upsert = false;
 }
 {
-  "insert" "into" <DATASET> nameComponents = QualifiedName() query = Query()
+  ("insert"|"upsert"{ upsert = true; }) "into" <DATASET> nameComponents = QualifiedName() query = Query()
     {
       query.setTopLevel(true);
-      return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+      if(upsert){
+        return new UpsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+      } else{
+        return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+      }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 7cf12c7..3184d1e 100644
--- a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -26,6 +26,7 @@ public interface Statement extends ILangExpression {
         DATASET_DROP,
         DELETE,
         INSERT,
+        UPSERT,
         UPDATE,
         DML_CMD_LIST,
         FUNCTION_DECL,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
new file mode 100644
index 0000000..f415951
--- /dev/null
+++ b/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.lang.common.struct.Identifier;
+
+public class UpsertStatement extends InsertStatement {
+
+    public UpsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter) {
+        super(dataverseName, datasetName, query, varCounter);
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.UPSERT;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index f15540a..f3523da 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -101,6 +101,9 @@ import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
+import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
+import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -110,6 +113,7 @@ import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOpera
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -1184,7 +1188,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
                         appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
                         splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, idfh, null, modificationCallbackFactory, true, indexName);
+                        fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
 
@@ -1670,7 +1675,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                                 LSMBTreeIOOperationCallbackFactory.INSTANCE,
                                 storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
                                 filterCmpFactories, btreeFields, filterFields, !temp),
-                        filterFactory, modificationCallbackFactory, false, indexName);
+                        filterFactory, false, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -2029,7 +2035,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                                 proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
                                 storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
                                 filterTypeTraits, filterCmpFactories, filterFields, !temp),
-                        filterFactory, modificationCallbackFactory, false, indexName);
+                        filterFactory, false, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
             }
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
         } catch (MetadataException | IOException e) {
@@ -2237,4 +2244,721 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             throw new AlgebricksException(e);
         }
     }
+
+    //TODO: refactor this method
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
+            LogicalVariable prevPayload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+                    throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException(
+                    "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = primaryKeys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+        // Move key fields to front. {keys, record, filters}
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        // set the keys' permutations
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        // set the record permutation
+        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        // set the filters' permutations.
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[numKeys + 1] = idx;
+        }
+
+        try {
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            String indexName = primaryIndex.getIndexName();
+
+            String itemTypeName = dataset.getItemTypeName();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataSource.getId().getDataverseName(), itemTypeName).getDatatype();
+
+            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = DatasetUtils.createFilterFields(dataset);
+            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
+                    : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
+                            IndexOperation.UPSERT, ResourceType.LSM_BTREE);
+
+            LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
+                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                    btreeFields, filterFields, !temp);
+            AsterixLSMTreeUpsertOperatorDescriptor op;
+
+            ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() + 1 + numFilterFields];
+            ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + 1
+                    + numFilterFields];
+            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+                outputTypeTraits[j] = recordDesc.getTypeTraits()[j];
+                outputSerDes[j] = recordDesc.getFields()[j];
+            }
+            outputSerDes[outputSerDes.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                    .getSerializerDeserializer(itemType);
+            outputTypeTraits[outputTypeTraits.length - 1 - numFilterFields] = FormatUtils.getDefaultFormat()
+                    .getTypeTraitProvider().getTypeTrait(itemType);
+            int fieldIdx = -1;
+            if (numFilterFields > 0) {
+                String filterField = DatasetUtils.getFilterField(dataset).get(0);
+                for (i = 0; i < itemType.getFieldNames().length; i++) {
+                    if (itemType.getFieldNames()[i].equals(filterField)) {
+                        break;
+                    }
+                }
+                fieldIdx = i;
+                outputTypeTraits[outputTypeTraits.length - 1] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+                outputSerDes[outputSerDes.length - 1] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                        .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            }
+
+            RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+            op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc,
+                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                    idfh, null, true, indexName, context.getNullWriterFactory(), modificationCallbackFactory,
+                    searchCallbackFactory, null);
+            op.setType(itemType);
+            op.setFilterIndex(fieldIdx);
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    // TODO refactor this method
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        String indexName = dataSourceIndex.getId();
+        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
+        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
+
+        Dataset dataset = findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        Index secondaryIndex;
+        try {
+            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+        ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
+        if (prevAdditionalFilteringKey != null) {
+            prevAdditionalFilteringKeys = new ArrayList<LogicalVariable>();
+            prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
+        }
+        switch (secondaryIndex.getIndexType()) {
+            case BTREE: {
+                return getBTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+                        primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
+                        prevSecondaryKeys, prevAdditionalFilteringKeys);
+            }
+            case RTREE: {
+                return getRTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+                        primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
+                        prevSecondaryKeys, prevAdditionalFilteringKeys);
+            }
+            case SINGLE_PARTITION_WORD_INVIX:
+            case SINGLE_PARTITION_NGRAM_INVIX:
+            case LENGTH_PARTITIONED_WORD_INVIX:
+            case LENGTH_PARTITIONED_NGRAM_INVIX: {
+                return getInvertedIndexUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
+                        primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
+                        secondaryIndex.getIndexType(), prevSecondaryKeys, prevAdditionalFilteringKeys);
+            }
+            default: {
+                throw new AlgebricksException(
+                        "upsert is not implemented for index type: " + secondaryIndex.getIndexType());
+            }
+        }
+    }
+
+    //TODO: refactor this method
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexUpsertRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexType indexType,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+                    throws AlgebricksException {
+        // Check the index is length-partitioned or not.
+        boolean isPartitioned;
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
+
+        // Sanity checks.
+        if (primaryKeys.size() > 1) {
+            throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
+        }
+        // The size of secondaryKeys can be two if it receives input from its
+        // TokenizeOperator- [token, number of token]
+        if (secondaryKeys.size() > 1 && !isPartitioned) {
+            throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
+        } else if (secondaryKeys.size() > 2 && isPartitioned) {
+            throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
+        }
+
+        Dataset dataset = findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys: [token,
+        // number of token, PK]
+        int numKeys = primaryKeys.size() + secondaryKeys.size();
+        int numTokenKeyPairFields = (!isPartitioned) ? 1 + primaryKeys.size() : 2 + primaryKeys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+
+        // generate field permutations
+        int[] fieldPermutation = new int[numKeys + numFilterFields];
+        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int i = 0;
+        int j = 0;
+
+        // If the index is partitioned: [token, number of token]
+        // Otherwise: [token]
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            modificationCallbackPrimaryKeyFields[j] = i;
+            i++;
+            j++;
+        }
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+            fieldPermutation[numKeys] = idx;
+        }
+
+        // Find permutations for prev value
+        int[] prevFieldPermutation = new int[numKeys + numFilterFields];
+        i = 0;
+
+        // If the index is partitioned: [token, number of token]
+        // Otherwise: [token]
+        for (LogicalVariable varKey : prevSecondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            prevFieldPermutation[i] = idx;
+            i++;
+        }
+
+        for (int k = 0; k < primaryKeys.size(); k++) {
+            prevFieldPermutation[k + i] = fieldPermutation[k + i];
+            i++;
+        }
+
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+            prevFieldPermutation[numKeys] = idx;
+        }
+
+        String itemTypeName = dataset.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+                    .getDatatype();
+
+            if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                throw new AlgebricksException("Only record types can be indexed.");
+            }
+
+            ARecordType recType = (ARecordType) itemType;
+
+            // Index parameters.
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+
+            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+
+            int numTokenFields = 0;
+
+            // SecondaryKeys.size() can be two if it comes from the bulkload.
+            // In this case, [token, number of token] are the secondaryKeys.
+            if (!isPartitioned || secondaryKeys.size() > 1) {
+                numTokenFields = secondaryKeys.size();
+            } else if (isPartitioned && secondaryKeys.size() == 1) {
+                numTokenFields = secondaryKeys.size() + 1;
+            }
+
+            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
+            ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
+            IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
+            IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+
+            IAType secondaryKeyType = null;
+
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
+            secondaryKeyType = keyPairType.first;
+
+            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+
+            i = 0;
+            for (List<String> partitioningKey : partitioningKeys) {
+                IAType keyType = recType.getSubFieldType(partitioningKey);
+                invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                ++i;
+            }
+
+            tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+            tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+            if (isPartitioned) {
+                // The partitioning field is hardcoded to be a short *without*
+                // an Asterix type tag.
+                tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+            }
+            IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
+                    secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recType, context.getBinaryComparatorFactoryProvider());
+
+            int[] filterFields = null;
+            int[] invertedIndexFields = null;
+            int[] filterFieldsForNonBulkLoadOps = null;
+            int[] invertedIndexFieldsForNonBulkLoadOps = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numTokenFields + primaryKeys.size();
+                invertedIndexFields = new int[numTokenFields + primaryKeys.size()];
+                for (int k = 0; k < invertedIndexFields.length; k++) {
+                    invertedIndexFields[k] = k;
+                }
+
+                filterFieldsForNonBulkLoadOps = new int[numFilterFields];
+                filterFieldsForNonBulkLoadOps[0] = numTokenKeyPairFields;
+                invertedIndexFieldsForNonBulkLoadOps = new int[numTokenKeyPairFields];
+                for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) {
+                    invertedIndexFieldsForNonBulkLoadOps[k] = k;
+                }
+            }
+
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            ResourceType.LSM_INVERTED_INDEX)
+                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            ResourceType.LSM_INVERTED_INDEX);
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory indexDataFlowFactory;
+            if (!isPartitioned) {
+                indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
+                        new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+                        invertedIndexFieldsForNonBulkLoadOps, !temp);
+            } else {
+                indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                        new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+                        invertedIndexFieldsForNonBulkLoadOps, !temp);
+            }
+            IOperatorDescriptor op = new AsterixLSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
+                    appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+                    appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                    invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
+                    indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, prevFieldPermutation);
+
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    //TODO: refactor this method
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeUpsertRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+                    throws AlgebricksException {
+        try {
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+            String itemTypeName = dataset.getItemTypeName();
+            IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, itemTypeName).getDatatype();
+            if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                throw new AlgebricksException("Only record types can be indexed.");
+            }
+            ARecordType recType = (ARecordType) itemType;
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+
+            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
+            IAType spatialType = keyPairType.first;
+
+            int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+            int numSecondaryKeys = dimension * 2;
+            int numPrimaryKeys = primaryKeys.size();
+            int numKeys = numSecondaryKeys + numPrimaryKeys;
+            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
+
+            int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+            int[] fieldPermutation = new int[numKeys + numFilterFields];
+            int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+            int i = 0;
+            int j = 0;
+
+            // Get field permutation for new value
+            for (LogicalVariable varKey : secondaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                fieldPermutation[i] = idx;
+                i++;
+            }
+            for (LogicalVariable varKey : primaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                fieldPermutation[i] = idx;
+                modificationCallbackPrimaryKeyFields[j] = i;
+                i++;
+                j++;
+            }
+
+            if (numFilterFields > 0) {
+                int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+                fieldPermutation[numKeys] = idx;
+            }
+
+            // Get field permutation for previous value
+            int[] prevFieldPermutation = new int[numKeys + numFilterFields];
+            i = 0;
+
+            // Get field permutation for new value
+            for (LogicalVariable varKey : prevSecondaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                prevFieldPermutation[i] = idx;
+                i++;
+            }
+            for (int k = 0; k < numPrimaryKeys; k++) {
+                prevFieldPermutation[k + i] = fieldPermutation[k + i];
+                i++;
+            }
+
+            if (numFilterFields > 0) {
+                int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+                prevFieldPermutation[numKeys] = idx;
+            }
+
+            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+            IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
+            for (i = 0; i < numSecondaryKeys; i++) {
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                        .getBinaryComparatorFactory(nestedKeyType, true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+            }
+            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+            for (List<String> partitioningKey : partitioningKeys) {
+                IAType keyType = recType.getSubFieldType(partitioningKey);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                ++i;
+            }
+
+            IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                    dataset, recType, context.getBinaryComparatorFactoryProvider());
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
+            int[] btreeFields = new int[primaryComparatorFactories.length];
+            for (int k = 0; k < btreeFields.length; k++) {
+                btreeFields[k] = k + numSecondaryKeys;
+            }
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = null;
+            int[] rtreeFields = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numSecondaryKeys + numPrimaryKeys;
+                rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+                for (int k = 0; k < rtreeFields.length; k++) {
+                    rtreeFields[k] = k;
+                }
+            }
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            ResourceType.LSM_RTREE)
+                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            ResourceType.LSM_RTREE);
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                    splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation,
+                    new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                            primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                            compactionInfo.first, compactionInfo.second,
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                            proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+                            storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
+                            filterTypeTraits, filterCmpFactories, filterFields, !temp),
+                    filterFactory, false, indexName, null, modificationCallbackFactory,
+                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+        } catch (MetadataException | IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    //TODO: refactor this method
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeUpsertRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+                    throws AlgebricksException {
+        // we start with the btree
+        Dataset dataset = findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = primaryKeys.size() + secondaryKeys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+
+        // generate field permutations
+        int[] fieldPermutation = new int[numKeys + numFilterFields];
+        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
+        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int i = 0;
+        int j = 0;
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            modificationCallbackPrimaryKeyFields[j] = i;
+            i++;
+            j++;
+        }
+        // Filter can only be one field!
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+            fieldPermutation[numKeys] = idx;
+        }
+
+        // generate field permutations for prev record
+        int[] prevFieldPermutation = new int[numKeys + numFilterFields];
+        int k = 0;
+        for (LogicalVariable varKey : prevSecondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            prevFieldPermutation[k] = idx;
+            k++;
+        }
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            prevFieldPermutation[k] = idx;
+            k++;
+        }
+        // Filter can only be one field!
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+            prevFieldPermutation[numKeys] = idx;
+        }
+
+        String itemTypeName = dataset.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+                    .getDatatype();
+
+            if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                throw new AlgebricksException("Only record types can be indexed.");
+            }
+
+            ARecordType recType = (ARecordType) itemType;
+
+            // Index parameters.
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = null;
+            int[] btreeFields = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numKeys;
+                btreeFields = new int[numKeys];
+                for (int l = 0; l < btreeFields.length; l++) {
+                    btreeFields[l] = l;
+                }
+            }
+
+            List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+            for (i = 0; i < secondaryKeys.size(); ++i) {
+                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+                        secondaryKeyNames.get(i), recType);
+                IAType keyType = keyPairType.first;
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                        true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+            for (List<String> partitioningKey : partitioningKeys) {
+                IAType keyType = recType.getSubFieldType(partitioningKey);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                        true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                ++i;
+            }
+
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
+                    dataverseName, datasetName, indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            ResourceType.LSM_BTREE)
+                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            ResourceType.LSM_BTREE);
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+                    btreeFields, filterFields, !temp);
+            AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                    idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
+                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index 8a95f87..03fc4c0 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -391,7 +391,7 @@ public class MetadataLockManager {
         releaseDataverseReadLock(dataverseName);
     }
 
-    public void insertDeleteBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
             List<String> datasets) {
         dataverses.add(dataverseName);
         datasets.add(datasetFullyQualifiedName);
@@ -420,7 +420,7 @@ public class MetadataLockManager {
         }
     }
 
-    public void insertDeleteEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+    public void insertDeleteUpsertEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
             List<String> datasets) {
         String previous = null;
         for (int i = dataverses.size() - 1; i >= 0; i--) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java b/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
index a74f0fa..2b05761 100644
--- a/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
+++ b/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
@@ -32,9 +32,6 @@ import org.apache.hyracks.data.std.util.GrowableArray;
 import org.apache.hyracks.storage.am.common.ophelpers.IntArrayList;
 
 public abstract class AbstractListBuilder implements IAsterixListBuilder {
-
-    protected static final byte serNullTypeTag = ATypeTag.NULL.serialize();
-
     protected final GrowableArray outputStorage;
     protected final DataOutputStream outputStream;
     protected final IntArrayList offsets;
@@ -80,13 +77,13 @@ public abstract class AbstractListBuilder implements IAsterixListBuilder {
     @Override
     public void addItem(IValueReference item) throws HyracksDataException {
         try {
-            if (!fixedSize && (item.getByteArray()[0] != serNullTypeTag || itemTypeTag == ATypeTag.ANY))
+            if (!fixedSize && (item.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG || itemTypeTag == ATypeTag.ANY))
                 this.offsets.add(outputStorage.getLength());
             if (itemTypeTag == ATypeTag.ANY
-                    || (itemTypeTag == ATypeTag.NULL && item.getByteArray()[0] == serNullTypeTag)) {
+                    || (itemTypeTag == ATypeTag.NULL && item.getByteArray()[0] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)) {
                 this.numberOfItems++;
                 this.outputStream.write(item.getByteArray(), item.getStartOffset(), item.getLength());
-            } else if (item.getByteArray()[0] != serNullTypeTag) {
+            } else if (item.getByteArray()[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
                 this.numberOfItems++;
                 this.outputStream.write(item.getByteArray(), item.getStartOffset() + 1, item.getLength() - 1);
             }
@@ -110,8 +107,8 @@ public abstract class AbstractListBuilder implements IAsterixListBuilder {
             if (!fixedSize) {
                 offsetPosition += 8;
                 for (int i = 0; i < offsets.size(); i++) {
-                    SerializerDeserializerUtil.writeIntToByteArray(offsetArray, offsets.get(i) + metadataInfoSize
-                            + headerSize, offsetPosition);
+                    SerializerDeserializerUtil.writeIntToByteArray(offsetArray,
+                            offsets.get(i) + metadataInfoSize + headerSize, offsetPosition);
                     offsetPosition += 4;
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java b/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
index e3ed6fb..a36238d 100644
--- a/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
+++ b/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
@@ -44,8 +44,6 @@ import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerD
 
 public class RecordBuilder implements IARecordBuilder {
     private final static int DEFAULT_NUM_OPEN_FIELDS = 10;
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
     private final UTF8StringSerializerDeserializer utf8SerDer = new UTF8StringSerializerDeserializer();
 
     private int openPartOffsetArraySize;
@@ -163,7 +161,7 @@ public class RecordBuilder implements IARecordBuilder {
         // +1 because we do not store the value tag.
         closedPartOutputStream.write(value.getByteArray(), value.getStartOffset() + 1, len);
         numberOfClosedFields++;
-        if (isNullable && value.getByteArray()[value.getStartOffset()] != SER_NULL_TYPE_TAG) {
+        if (isNullable && value.getByteArray()[value.getStartOffset()] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
             nullBitMap[id / 8] |= (byte) (1 << (7 - (id % 8)));
         }
     }
@@ -173,7 +171,7 @@ public class RecordBuilder implements IARecordBuilder {
         // We assume the tag is not included (closed field)
         closedPartOutputStream.write(value, 0, value.length);
         numberOfClosedFields++;
-        if (isNullable && value[0] != SER_NULL_TYPE_TAG) {
+        if (isNullable && value[0] != ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
             nullBitMap[id / 8] |= (byte) (1 << (7 - (id % 8)));
         }
     }
@@ -255,7 +253,7 @@ public class RecordBuilder implements IARecordBuilder {
 
         // write the record header
         if (writeTypeTag) {
-            out.writeByte(RECORD_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
         }
         out.writeInt(recordLength);
         if (isOpen) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
index ed19224..8fa881a 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/AqlNullWriterFactory.java
@@ -42,7 +42,7 @@ public class AqlNullWriterFactory implements INullWriterFactory {
             @Override
             public void writeNull(DataOutput out) throws HyracksDataException {
                 try {
-                    out.writeByte(ATypeTag.NULL.serialize());
+                    out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
index 6a24cf3..4ce9617 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
@@ -113,13 +113,13 @@ public class AObjectAscBinaryComparatorFactory implements IBinaryComparatorFacto
                 // Normally, comparing between NULL and non-NULL values should return UNKNOWN as the result.
                 // However, at this point, we assume that NULL check between two types is already done.
                 // Therefore, inside this method, we return an order between two values even if one value is NULL.
-                if (b1[s1] == ATypeTag.NULL.serialize()) {
-                    if (b2[s2] == ATypeTag.NULL.serialize())
+                if (b1[s1] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+                    if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
                         return 0;
                     else
                         return -1;
                 } else {
-                    if (b2[s2] == ATypeTag.NULL.serialize())
+                    if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
                         return 1;
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
index 77b8b3a..8907bca 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
@@ -29,8 +29,8 @@ import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
 import org.apache.hyracks.data.std.primitive.DoublePointable;
 import org.apache.hyracks.data.std.primitive.FloatPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringLowercasePointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 
 public class ListItemBinaryComparatorFactory implements IBinaryComparatorFactory {
 
@@ -84,13 +84,13 @@ public class ListItemBinaryComparatorFactory implements IBinaryComparatorFactory
             @Override
             public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) throws HyracksDataException {
 
-                if (b1[s1] == ATypeTag.NULL.serialize()) {
-                    if (b2[s2] == ATypeTag.NULL.serialize())
+                if (b1[s1] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+                    if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
                         return 0;
                     else
                         return -1;
                 } else {
-                    if (b2[s2] == ATypeTag.NULL.serialize())
+                    if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
                         return 1;
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
index 6d09bf8..247e6fd 100644
--- a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
+++ b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryBooleanInspectorImpl.java
@@ -34,14 +34,12 @@ public class AqlBinaryBooleanInspectorImpl implements IBinaryBooleanInspector {
         }
     };
 
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-
     private AqlBinaryBooleanInspectorImpl() {
     }
 
     @Override
     public boolean getBooleanValue(byte[] bytes, int offset, int length) {
-        if (bytes[offset] == SER_NULL_TYPE_TAG)
+        if (bytes[offset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG)
             return false;
         /** check if the runtime type is boolean */
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
index e73f769..52248c8 100644
--- a/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
+++ b/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlBinaryTokenizerFactoryProvider.java
@@ -35,10 +35,12 @@ public class AqlBinaryTokenizerFactoryProvider implements IBinaryTokenizerFactor
     public static final AqlBinaryTokenizerFactoryProvider INSTANCE = new AqlBinaryTokenizerFactoryProvider();
 
     private static final IBinaryTokenizerFactory aqlStringTokenizer = new DelimitedUTF8StringBinaryTokenizerFactory(
-            true, true, new UTF8WordTokenFactory(ATypeTag.STRING.serialize(), ATypeTag.INT32.serialize()));
+            true, true,
+            new UTF8WordTokenFactory(ATypeTag.SERIALIZED_STRING_TYPE_TAG, ATypeTag.SERIALIZED_INT32_TYPE_TAG));
 
     private static final IBinaryTokenizerFactory aqlHashingStringTokenizer = new DelimitedUTF8StringBinaryTokenizerFactory(
-            true, true, new HashedUTF8WordTokenFactory(ATypeTag.INT32.serialize(), ATypeTag.INT32.serialize()));
+            true, true,
+            new HashedUTF8WordTokenFactory(ATypeTag.SERIALIZED_INT32_TYPE_TAG, ATypeTag.SERIALIZED_INT32_TYPE_TAG));
 
     private static final IBinaryTokenizerFactory orderedListTokenizer = new AOrderedListBinaryTokenizerFactory(
             new AListElementTokenFactory());
@@ -77,7 +79,8 @@ public class AqlBinaryTokenizerFactoryProvider implements IBinaryTokenizerFactor
                     return null;
                 } else {
                     return new NGramUTF8StringBinaryTokenizerFactory(gramLength, usePrePost, true, true,
-                            new UTF8NGramTokenFactory(ATypeTag.STRING.serialize(), ATypeTag.INT32.serialize()));
+                            new UTF8NGramTokenFactory(ATypeTag.SERIALIZED_STRING_TYPE_TAG,
+                                    ATypeTag.SERIALIZED_INT32_TYPE_TAG));
                 }
             }
             case ORDEREDLIST: {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 03e3895..67b62d6 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -19,6 +19,11 @@
 
 package org.apache.asterix.om.pointables;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.AqlNullWriterFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -35,11 +40,6 @@ import org.apache.asterix.om.util.container.IObjectFactory;
 import org.apache.hyracks.api.dataflow.value.INullWriter;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * This class interprets the binary data representation of a record. One can
  * call getFieldNames, getFieldTypeTags and getFieldValues to get pointable
@@ -52,6 +52,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
      * object pool based allocator, in order to have object reuse
      */
     static IObjectFactory<IVisitablePointable, IAType> FACTORY = new IObjectFactory<IVisitablePointable, IAType>() {
+        @Override
         public IVisitablePointable create(IAType type) {
             return new ARecordVisitablePointable((ARecordType) type);
         }
@@ -114,7 +115,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
 
                 // add type name Reference (including a astring type tag)
                 int nameStart = typeBos.size();
-                typeDos.writeByte(ATypeTag.STRING.serialize());
+                typeDos.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
                 utf8Writer.writeUTF8(fieldNameStrs[i], typeDos);
                 int nameEnd = typeBos.size();
                 IVisitablePointable typeNameReference = AFlatValuePointable.FACTORY.create(null);
@@ -183,8 +184,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
                 boolean hasNullableFields = NonTaggedFormatUtil.hasNullableField(inputRecType);
                 if (hasNullableFields) {
                     nullBitMapOffset = s;
-                    offsetArrayOffset = s
-                            + (this.numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
+                    offsetArrayOffset = s + (this.numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
                             : numberOfSchemaFields / 8 + 1);
                 } else {
                     offsetArrayOffset = s;
@@ -238,7 +238,7 @@ public class ARecordVisitablePointable extends AbstractVisitablePointable {
                     int fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, ATypeTag.STRING,
                             false);
                     int fnstart = dataBos.size();
-                    dataDos.writeByte(ATypeTag.STRING.serialize());
+                    dataDos.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
                     dataDos.write(b, fieldOffset, fieldValueLength);
                     int fnend = dataBos.size();
                     IVisitablePointable fieldName = allocator.allocateEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
index d89ae6a..d632d70 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
@@ -19,6 +19,14 @@
 
 package org.apache.asterix.om.pointables.cast;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.TypeException;
@@ -47,14 +55,6 @@ import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * This class is to do the runtime type cast for a record. It is ONLY visible to
  * ACastVisitor.
@@ -75,8 +75,8 @@ class ARecordCaster {
     private final IVisitablePointable nullReference = allocator.allocateEmpty();
     private final IVisitablePointable nullTypeTag = allocator.allocateEmpty();
 
-    private final IBinaryComparator fieldNameComparator = PointableBinaryComparatorFactory.of(
-            UTF8StringPointable.FACTORY).createBinaryComparator();
+    private final IBinaryComparator fieldNameComparator = PointableBinaryComparatorFactory
+            .of(UTF8StringPointable.FACTORY).createBinaryComparator();
 
     private final ByteArrayAccessibleOutputStream outputBos = new ByteArrayAccessibleOutputStream();
     private final DataOutputStream outputDos = new DataOutputStream(outputBos);
@@ -107,7 +107,7 @@ class ARecordCaster {
             int end = bos.size();
             nullReference.set(bos.getByteArray(), start, end - start);
             start = bos.size();
-            dos.write(ATypeTag.NULL.serialize());
+            dos.write(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
             end = bos.size();
             nullTypeTag.set(bos.getByteArray(), start, end - start);
         } catch (IOException e) {
@@ -115,8 +115,8 @@ class ARecordCaster {
         }
     }
 
-    public void castRecord(ARecordVisitablePointable recordAccessor, IVisitablePointable resultAccessor, ARecordType reqType,
-            ACastVisitor visitor) throws IOException, TypeException {
+    public void castRecord(ARecordVisitablePointable recordAccessor, IVisitablePointable resultAccessor,
+            ARecordType reqType, ACastVisitor visitor) throws IOException, TypeException {
         List<IVisitablePointable> fieldNames = recordAccessor.getFieldNames();
         List<IVisitablePointable> fieldTypeTags = recordAccessor.getFieldTypeTags();
         List<IVisitablePointable> fieldValues = recordAccessor.getFieldValues();
@@ -214,15 +214,15 @@ class ARecordCaster {
                 IVisitablePointable reqFieldTypeTag = reqFieldTypeTags.get(reqFnPos);
                 if (fieldTypeTag.equals(reqFieldTypeTag) || (
                 // match the null type of optional field
-                        optionalFields[reqFnPos] && fieldTypeTag.equals(nullTypeTag))) {
+                optionalFields[reqFnPos] && fieldTypeTag.equals(nullTypeTag))) {
                     fieldPermutation[reqFnPos] = fnPos;
                     openFields[fnPos] = false;
                 } else {
                     // if mismatch, check whether input type can be promoted to the required type
-                    ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(fieldTypeTag
-                            .getByteArray()[fieldTypeTag.getStartOffset()]);
-                    ATypeTag requiredTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(reqFieldTypeTag
-                            .getByteArray()[reqFieldTypeTag.getStartOffset()]);
+                    ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                            .deserialize(fieldTypeTag.getByteArray()[fieldTypeTag.getStartOffset()]);
+                    ATypeTag requiredTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                            .deserialize(reqFieldTypeTag.getByteArray()[reqFieldTypeTag.getStartOffset()]);
 
                     if (ATypeHierarchy.canPromote(inputTypeTag, requiredTypeTag)
                             || ATypeHierarchy.canDemote(inputTypeTag, requiredTypeTag)) {
@@ -255,8 +255,8 @@ class ARecordCaster {
 
                 //print the field type
                 IVisitablePointable fieldType = fieldTypeTags.get(i);
-                ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(fieldType.getByteArray()[fieldType
-                        .getStartOffset()]);
+                ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                        .deserialize(fieldType.getByteArray()[fieldType.getStartOffset()]);
                 ps.print(typeTag);
 
                 //collect the output message
@@ -281,8 +281,8 @@ class ARecordCaster {
     }
 
     private void writeOutput(List<IVisitablePointable> fieldNames, List<IVisitablePointable> fieldTypeTags,
-            List<IVisitablePointable> fieldValues, DataOutput output, ACastVisitor visitor) throws IOException,
-            AsterixException {
+            List<IVisitablePointable> fieldValues, DataOutput output, ACastVisitor visitor)
+                    throws IOException, AsterixException {
         // reset the states of the record builder
         recBuilder.reset(cachedReqType);
         recBuilder.init();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index ee91aab..ac50312 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -216,10 +216,10 @@ public class ARecordPointable extends AbstractPointable {
     // Closed field accessors.
     // -----------------------
 
-    public void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException,
-            AsterixException {
+    public void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut)
+            throws IOException, AsterixException {
         if (isClosedFieldNull(recordType, fieldId)) {
-            dOut.writeByte(ATypeTag.NULL.serialize());
+            dOut.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
         } else {
             dOut.write(getClosedFieldTag(recordType, fieldId));
             dOut.write(bytes, getClosedFieldOffset(recordType, fieldId), getClosedFieldSize(recordType, fieldId));
@@ -231,7 +231,7 @@ public class ARecordPointable extends AbstractPointable {
     }
 
     public void getClosedFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
-        dOut.writeByte(ATypeTag.STRING.serialize());
+        dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
         utf8Writer.writeUTF8(getClosedFieldName(recordType, fieldId), dOut);
     }
 
@@ -281,8 +281,8 @@ public class ARecordPointable extends AbstractPointable {
     // Open field accessors.
     // -----------------------
 
-    public void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException,
-            AsterixException {
+    public void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut)
+            throws IOException, AsterixException {
         dOut.write(bytes, getOpenFieldValueOffset(recordType, fieldId), getOpenFieldValueSize(recordType, fieldId));
     }
 
@@ -297,7 +297,7 @@ public class ARecordPointable extends AbstractPointable {
     }
 
     public void getOpenFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
-        dOut.writeByte(ATypeTag.STRING.serialize());
+        dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
         dOut.write(bytes, getOpenFieldNameOffset(recordType, fieldId), getOpenFieldNameSize(recordType, fieldId));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/5dc958ce/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
index 0a341f0..6196c60 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
@@ -24,9 +24,7 @@ import java.util.List;
 
 /**
  * There is a unique tag for each primitive type and for each kind of
- * non-primitive type in the object model.
- *
- * @author Nicola
+ * non-primitive type in the object model
  */
 public enum ATypeTag implements IEnumSerializer {
     INT8(1),
@@ -69,19 +67,39 @@ public enum ATypeTag implements IEnumSerializer {
     UUID(38),
     SHORTWITHOUTTYPEINFO(40);
 
-    private byte value;
-
-    private ATypeTag(int value) {
-        this.value = (byte) value;
-    }
-
-    @Override
-    public byte serialize() {
-        return value;
-    }
-
+    /*
+     * Serialized Tags begin
+     */
+    public static final byte SERIALIZED_STRING_TYPE_TAG = STRING.serialize();
+    public static final byte SERIALIZED_NULL_TYPE_TAG = NULL.serialize();
+    public static final byte SERIALIZED_DOUBLE_TYPE_TAG = DOUBLE.serialize();
+    public static final byte SERIALIZED_RECORD_TYPE_TAG = RECORD.serialize();
+    public static final byte SERIALIZED_INT32_TYPE_TAG = INT32.serialize();
+    public static final byte SERIALIZED_ORDEREDLIST_TYPE_TAG = ORDEREDLIST.serialize();
+    public static final byte SERIALIZED_UNORDEREDLIST_TYPE_TAG = UNORDEREDLIST.serialize();
+    public static final byte SERIALIZED_POLYGON_TYPE_TAG = POLYGON.serialize();
+    public static final byte SERIALIZED_DATE_TYPE_TAG = DATE.serialize();
+    public static final byte SERIALIZED_TIME_TYPE_TAG = TIME.serialize();
+    public static final byte SERIALIZED_DATETIME_TYPE_TAG = DATETIME.serialize();
+    public static final byte SERIALIZED_SYSTEM_NULL_TYPE_TAG = SYSTEM_NULL.serialize();
+    public static final byte SERIALIZED_DURATION_TYPE_TAG = DURATION.serialize();
+    public static final byte SERIALIZED_DAY_TIME_DURATION_TYPE_TAG = DAYTIMEDURATION.serialize();
+    public static final byte SERIALIZED_POINT_TYPE_TAG = POINT.serialize();
+    public static final byte SERIALIZED_INTERVAL_TYPE_TAG = INTERVAL.serialize();
+    public static final byte SERIALIZED_CIRCLE_TYPE_TAG = CIRCLE.serialize();
+    public static final byte SERIALIZED_YEAR_MONTH_DURATION_TYPE_TAG = YEARMONTHDURATION.serialize();
+    public static final byte SERIALIZED_LINE_TYPE_TAG = LINE.serialize();
+    public static final byte SERIALIZED_RECTANGLE_TYPE_TAG = RECTANGLE.serialize();
+    public static final byte SERIALIZED_BOOLEAN_TYPE_TAG = BOOLEAN.serialize();
+    public static final byte SERIALIZED_INT8_TYPE_TAG = INT8.serialize();
+    public static final byte SERIALIZED_INT16_TYPE_TAG = INT16.serialize();
+    public static final byte SERIALIZED_INT64_TYPE_TAG = INT64.serialize();
+    public static final byte SERIALIZED_FLOAT_TYPE_TAG = FLOAT.serialize();
+    /*
+     * Serialized Tags end
+     */
     public static final int TYPE_COUNT = ATypeTag.values().length;
-
+    private byte value;
     public static final ATypeTag[] VALUE_TYPE_MAPPING;
 
     static {
@@ -96,6 +114,15 @@ public enum ATypeTag implements IEnumSerializer {
         VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]);
     }
 
+    private ATypeTag(int value) {
+        this.value = (byte) value;
+    }
+
+    @Override
+    public byte serialize() {
+        return value;
+    }
+
     public boolean isDerivedType() {
         if (this == ATypeTag.RECORD || this == ATypeTag.ORDEREDLIST || this == ATypeTag.UNORDEREDLIST
                 || this == ATypeTag.UNION)