You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 16:59:43 UTC

[08/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index a629cdf,0000000..90ebfb7
mode 100644,000000..100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@@ -1,3017 -1,0 +1,3014 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.metadata.declared;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.logging.Level;
 +import java.util.logging.Logger;
 +
 +import org.apache.asterix.common.config.AsterixStorageProperties;
 +import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 +import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 +import org.apache.asterix.common.config.DatasetConfig.IndexType;
 +import org.apache.asterix.common.config.GlobalConfig;
 +import org.apache.asterix.common.config.MetadataConstants;
 +import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 +import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 +import org.apache.asterix.common.context.TransactionSubsystemProvider;
 +import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
 +import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
 +import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
 +import org.apache.asterix.common.exceptions.AsterixException;
 +import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 +import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 +import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 +import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 +import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 +import org.apache.asterix.common.transactions.JobId;
 +import org.apache.asterix.common.utils.StoragePathUtil;
 +import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 +import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
 +import org.apache.asterix.external.api.IAdapterFactory;
 +import org.apache.asterix.external.api.IDataSourceAdapter;
 +import org.apache.asterix.external.feed.api.ICentralFeedManager;
 +import org.apache.asterix.external.feed.management.FeedConnectionId;
 +import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 +import org.apache.asterix.external.feed.watch.FeedActivity;
 +import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
 +import org.apache.asterix.external.indexing.ExternalFile;
 +import org.apache.asterix.external.indexing.IndexingConstants;
 +import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor;
 +import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 +import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor;
 +import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor;
 +import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 +import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 +import org.apache.asterix.external.provider.AdapterFactoryProvider;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.FeedConstants;
 +import org.apache.asterix.formats.base.IDataFormat;
 +import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 +import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
 +import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 +import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
 +import org.apache.asterix.metadata.MetadataException;
 +import org.apache.asterix.metadata.MetadataManager;
 +import org.apache.asterix.metadata.MetadataTransactionContext;
 +import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
 +import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 +import org.apache.asterix.metadata.entities.Dataset;
 +import org.apache.asterix.metadata.entities.DatasourceAdapter;
 +import org.apache.asterix.metadata.entities.Datatype;
 +import org.apache.asterix.metadata.entities.Dataverse;
 +import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 +import org.apache.asterix.metadata.entities.Feed;
 +import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 +import org.apache.asterix.metadata.entities.Index;
 +import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 +import org.apache.asterix.metadata.entities.NodeGroup;
 +import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 +import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 +import org.apache.asterix.metadata.utils.DatasetUtils;
 +import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 +import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 +import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 +import org.apache.asterix.om.types.ARecordType;
 +import org.apache.asterix.om.types.ATypeTag;
 +import org.apache.asterix.om.types.IAType;
 +import org.apache.asterix.om.util.AsterixAppContextInfo;
 +import org.apache.asterix.om.util.AsterixClusterProperties;
 +import org.apache.asterix.om.util.NonTaggedFormatUtil;
 +import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 +import org.apache.asterix.runtime.formats.FormatUtils;
 +import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 +import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
 +import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
 +import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
 +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 +import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 +import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 +import org.apache.hyracks.algebricks.common.utils.Pair;
 +import org.apache.hyracks.algebricks.common.utils.Triple;
 +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 +import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
 +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 +import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 +import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
 +import org.apache.hyracks.algebricks.data.IAWriterFactory;
 +import org.apache.hyracks.algebricks.data.IPrinterFactory;
 +import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 +import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 +import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 +import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 +import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 +import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 +import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 +import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 +import org.apache.hyracks.api.dataset.ResultSetId;
 +import org.apache.hyracks.api.job.JobSpecification;
 +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 +import org.apache.hyracks.data.std.primitive.ShortPointable;
 +import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 +import org.apache.hyracks.dataflow.std.file.FileSplit;
 +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 +import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 +import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 +import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
 +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 +import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
 +import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 +import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 +
 +public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
 +
 +    private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
 +    private MetadataTransactionContext mdTxnCtx;
 +    private boolean isWriteTransaction;
 +    private final Map<String, String[]> stores;
 +    private Map<String, String> config;
 +    private IAWriterFactory writerFactory;
 +    private FileSplit outputFile;
 +    private boolean asyncResults;
 +    private ResultSetId resultSetId;
 +    private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
 +    private final ICentralFeedManager centralFeedManager;
 +
 +    private final Dataverse defaultDataverse;
 +    private JobId jobId;
 +    private Map<String, Integer> locks;
 +    private boolean isTemporaryDatasetWriteJob = true;
 +
 +    private final AsterixStorageProperties storageProperties;
 +
 +    public String getPropertyValue(String propertyName) {
 +        return config.get(propertyName);
 +    }
 +
 +    public void setConfig(Map<String, String> config) {
 +        this.config = config;
 +    }
 +
 +    public Map<String, String[]> getAllStores() {
 +        return stores;
 +    }
 +
 +    public Map<String, String> getConfig() {
 +        return config;
 +    }
 +
 +    public AqlMetadataProvider(Dataverse defaultDataverse, ICentralFeedManager centralFeedManager) {
 +        this.defaultDataverse = defaultDataverse;
 +        this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
 +        this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
 +        this.centralFeedManager = centralFeedManager;
 +    }
 +
 +    public void setJobId(JobId jobId) {
 +        this.jobId = jobId;
 +    }
 +
 +    public Dataverse getDefaultDataverse() {
 +        return defaultDataverse;
 +    }
 +
 +    public String getDefaultDataverseName() {
 +        return defaultDataverse == null ? null : defaultDataverse.getDataverseName();
 +    }
 +
 +    public void setWriteTransaction(boolean writeTransaction) {
 +        this.isWriteTransaction = writeTransaction;
 +    }
 +
 +    public void setWriterFactory(IAWriterFactory writerFactory) {
 +        this.writerFactory = writerFactory;
 +    }
 +
 +    public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
 +        this.mdTxnCtx = mdTxnCtx;
 +    }
 +
 +    public MetadataTransactionContext getMetadataTxnContext() {
 +        return mdTxnCtx;
 +    }
 +
 +    public IAWriterFactory getWriterFactory() {
 +        return this.writerFactory;
 +    }
 +
 +    public FileSplit getOutputFile() {
 +        return outputFile;
 +    }
 +
 +    public void setOutputFile(FileSplit outputFile) {
 +        this.outputFile = outputFile;
 +    }
 +
 +    public boolean getResultAsyncMode() {
 +        return asyncResults;
 +    }
 +
 +    public void setResultAsyncMode(boolean asyncResults) {
 +        this.asyncResults = asyncResults;
 +    }
 +
 +    public ResultSetId getResultSetId() {
 +        return resultSetId;
 +    }
 +
 +    public void setResultSetId(ResultSetId resultSetId) {
 +        this.resultSetId = resultSetId;
 +    }
 +
 +    public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) {
 +        this.resultSerializerFactoryProvider = rafp;
 +    }
 +
 +    public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
 +        return resultSerializerFactoryProvider;
 +    }
 +
 +    /**
 +     * Retrieve the Output RecordType, as defined by "set output-record-type".
 +     */
 +    public ARecordType findOutputRecordType() throws AlgebricksException {
 +        String outputRecordType = getPropertyValue("output-record-type");
 +        if (outputRecordType == null) {
 +            return null;
 +        }
 +        String dataverse = getDefaultDataverseName();
 +        if (dataverse == null) {
 +            throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
 +        }
 +        IAType type = findType(dataverse, outputRecordType);
 +        if (!(type instanceof ARecordType)) {
 +            throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
 +        }
 +        return (ARecordType) type;
 +    }
 +
 +    @Override
 +    public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
 +        AqlSourceId aqlId = id;
 +        try {
 +            return lookupSourceInMetadata(aqlId);
 +        } catch (MetadataException e) {
 +            throw new AlgebricksException(e);
 +        }
 +    }
 +
 +    public boolean isWriteTransaction() {
 +        // The transaction writes persistent datasets.
 +        return isWriteTransaction;
 +    }
 +
 +    public boolean isTemporaryDatasetWriteJob() {
 +        // The transaction only writes temporary datasets.
 +        return isTemporaryDatasetWriteJob;
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
 +            IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
 +            List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
 +            List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
 +            JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
 +        try {
 +            switch (((AqlDataSource) dataSource).getDatasourceType()) {
 +                case FEED:
 +                    return buildFeedCollectRuntime(jobSpec, (FeedDataSource) dataSource);
 +                case INTERNAL_DATASET: {
 +                    // querying an internal dataset
 +                    return buildInternalDatasetScan(jobSpec, scanVariables, minFilterVars, maxFilterVars, opSchema,
 +                            typeEnv, dataSource, context, implConfig);
 +                }
 +                case EXTERNAL_DATASET: {
 +                    // querying an external dataset
 +                    Dataset dataset = ((DatasetDataSource) dataSource).getDataset();
 +                    String itemTypeName = dataset.getItemTypeName();
 +                    IAType itemType = MetadataManager.INSTANCE
 +                            .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
 +
 +                    ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
 +                    IAdapterFactory adapterFactory = getConfiguredAdapterFactory(dataset, edd.getAdapter(),
 +                            edd.getProperties(), (ARecordType) itemType, false, null, null);
 +                    return buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
 +                            NonTaggedDataFormat.INSTANCE);
 +                }
 +                case LOADABLE: {
 +                    // This is a load into dataset operation
 +                    LoadableDataSource alds = (LoadableDataSource) dataSource;
 +                    List<List<String>> partitioningKeys = alds.getPartitioningKeys();
 +                    boolean isPKAutoGenerated = ((InternalDatasetDetails) alds.getTargetDataset().getDatasetDetails())
 +                            .isAutogenerated();
 +                    ARecordType itemType = (ARecordType) alds.getLoadedType();
 +                    int pkIndex = 0;
 +                    IAdapterFactory adapterFactory = getConfiguredAdapterFactory(alds.getTargetDataset(),
 +                            alds.getAdapter(), alds.getAdapterProperties(), itemType, isPKAutoGenerated,
 +                            partitioningKeys, null);
 +                    RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
 +                    return buildLoadableDatasetScan(jobSpec, alds, adapterFactory, rDesc, isPKAutoGenerated,
 +                            partitioningKeys, itemType, pkIndex);
 +                }
 +                default: {
 +                    throw new IllegalArgumentException();
 +                }
 +
 +            }
 +        } catch (AsterixException e) {
 +            throw new AlgebricksException(e);
 +        }
 +    }
 +
 +    @SuppressWarnings("rawtypes")
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedCollectRuntime(JobSpecification jobSpec,
 +            FeedDataSource feedDataSource) throws AlgebricksException {
 +
 +        try {
 +            ARecordType feedOutputType = (ARecordType) feedDataSource.getItemType();
 +            ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
 +                    .getSerializerDeserializer(feedOutputType);
 +            IAType metaType = feedDataSource.getMetaItemType();
 +            List<IAType> pkTypes = feedDataSource.getPkTypes();
 +            ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
 +            serdes.add(payloadSerde);
 +            if (metaType != null) {
 +                serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType));
 +            }
 +            if (pkTypes != null) {
 +                for (IAType type : pkTypes) {
 +                    serdes.add(AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
 +                }
 +            }
 +            RecordDescriptor feedDesc = new RecordDescriptor(
 +                    serdes.toArray(new ISerializerDeserializer[serdes.size()]));
 +            FeedPolicyEntity feedPolicy = (FeedPolicyEntity) feedDataSource.getProperties()
 +                    .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
 +            if (feedPolicy == null) {
 +                throw new AlgebricksException("Feed not configured with a policy");
 +            }
 +            feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
 +            FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(),
 +                    feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset());
 +            FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
 +                    feedDataSource.getSourceFeedId(), feedOutputType, feedDesc, feedPolicy.getProperties(),
 +                    feedDataSource.getLocation());
 +
 +            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedCollector,
 +                    determineLocationConstraint(feedDataSource));
 +
 +        } catch (Exception e) {
 +            throw new AlgebricksException(e);
 +        }
 +    }
 +
 +    private AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
 +            throws AsterixException {
 +        String[] locationArray = null;
 +        String locations = null;;
 +        switch (feedDataSource.getSourceFeedType()) {
 +            case PRIMARY:
 +                switch (feedDataSource.getLocation()) {
 +                    case COMPUTE:
 +                        if (feedDataSource.getFeed().getFeedId().equals(feedDataSource.getSourceFeedId())) {
 +                            locationArray = feedDataSource.getLocations();
 +                        } else {
 +                            Collection<FeedActivity> activities = centralFeedManager.getFeedLoadManager()
 +                                    .getFeedActivities();
 +                            Iterator<FeedActivity> it = activities.iterator();
 +                            FeedActivity activity = null;
 +                            while (it.hasNext()) {
 +                                activity = it.next();
 +                                if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
 +                                        && activity.getFeedName()
 +                                                .equals(feedDataSource.getSourceFeedId().getFeedName())) {
 +                                    locations = activity.getFeedActivityDetails()
 +                                            .get(FeedActivityDetails.COMPUTE_LOCATIONS);
 +                                    locationArray = locations.split(",");
 +                                    break;
 +                                }
 +                            }
 +                        }
 +                        break;
 +                    case INTAKE:
 +                        locationArray = feedDataSource.getLocations();
 +                        break;
 +                    default:
 +                        throw new AsterixException(
 +                                "Can't subscibe to a FeedRuntime with type: " + feedDataSource.getLocation());
 +                }
 +                break;
 +            case SECONDARY:
 +                Collection<FeedActivity> activities = centralFeedManager.getFeedLoadManager().getFeedActivities();
 +                Iterator<FeedActivity> it = activities.iterator();
 +                FeedActivity activity = null;
 +                while (it.hasNext()) {
 +                    activity = it.next();
 +                    if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
 +                            && activity.getFeedName().equals(feedDataSource.getSourceFeedId().getFeedName())) {
 +                        switch (feedDataSource.getLocation()) {
 +                            case INTAKE:
 +                                locations = activity.getFeedActivityDetails()
 +                                        .get(FeedActivityDetails.COLLECT_LOCATIONS);
 +                                break;
 +                            case COMPUTE:
 +                                locations = activity.getFeedActivityDetails()
 +                                        .get(FeedActivityDetails.COMPUTE_LOCATIONS);
 +                                break;
 +                            default:
 +                                throw new AsterixException(
 +                                        "Can't subscibe to a FeedRuntime with type: " + feedDataSource.getLocation());
 +                        }
 +                        break;
 +                    }
 +                }
 +
 +                if (locations != null) {
 +                    locationArray = locations.split(",");
 +                } else {
 +                    String message = "Unable to discover location(s) for source feed data hand-off "
 +                            + feedDataSource.getSourceFeedId();
 +                    if (LOGGER.isLoggable(Level.SEVERE)) {
 +                        LOGGER.severe(message);
 +                    }
 +                    throw new AsterixException(message);
 +                }
 +                break;
 +        }
 +        AlgebricksAbsolutePartitionConstraint locationConstraint = new AlgebricksAbsolutePartitionConstraint(
 +                locationArray);
 +        return locationConstraint;
 +    }
 +
 +    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
 +            LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
 +            List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
 +        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
 +                adapterFactory);
 +        AlgebricksPartitionConstraint constraint;
 +        try {
 +            constraint = adapterFactory.getPartitionConstraint();
 +        } catch (Exception e) {
 +            throw new AlgebricksException(e);
 +        }
 +        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
 +    }
 +
 +    public IDataFormat getDataFormat(String dataverseName) throws AsterixException {
 +        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
 +        IDataFormat format;
 +        try {
 +            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
 +        } catch (Exception e) {
 +            throw new AsterixException(e);
 +        }
 +        return format;
 +    }
 +
 +    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
 +            List<LogicalVariable> outputVars, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
 +            IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, IDataSource<AqlSourceId> dataSource,
 +            JobGenContext context, Object implConfig) throws AlgebricksException, MetadataException {
 +        AqlSourceId asid = dataSource.getId();
 +        String dataverseName = asid.getDataverseName();
 +        String datasetName = asid.getDatasourceName();
 +        Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
 +
 +        int[] minFilterFieldIndexes = null;
 +        if (minFilterVars != null && !minFilterVars.isEmpty()) {
 +            minFilterFieldIndexes = new int[minFilterVars.size()];
 +            int i = 0;
 +            for (LogicalVariable v : minFilterVars) {
 +                minFilterFieldIndexes[i] = opSchema.findVariable(v);
 +                i++;
 +            }
 +        }
 +        int[] maxFilterFieldIndexes = null;
 +        if (maxFilterVars != null && !maxFilterVars.isEmpty()) {
 +            maxFilterFieldIndexes = new int[maxFilterVars.size()];
 +            int i = 0;
 +            for (LogicalVariable v : maxFilterVars) {
 +                maxFilterFieldIndexes[i] = opSchema.findVariable(v);
 +                i++;
 +            }
 +        }
 +
 +        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, context, true, false,
 +                ((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true, true,
 +                implConfig, minFilterFieldIndexes, maxFilterFieldIndexes);
 +    }
 +
 +    private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
 +            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
 +            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
 +        try {
 +            configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
 +            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
 +                    itemType, metaType);
 +
 +            // check to see if dataset is indexed
 +            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(),
 +                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
 +
 +            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
 +                // get files
 +                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
 +                Iterator<ExternalFile> iterator = files.iterator();
 +                while (iterator.hasNext()) {
 +                    if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
 +                        iterator.remove();
 +                    }
 +                }
 +                // TODO Check this call, result of merge from master!
 +                // ((IGenericAdapterFactory) adapterFactory).setFiles(files);
 +            }
 +
 +            return adapterFactory;
 +        } catch (Exception e) {
 +            throw new AlgebricksException("Unable to create adapter", e);
 +        }
 +    }
 +
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
 +            JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
 +            throws AlgebricksException {
 +        if (itemType.getTypeTag() != ATypeTag.RECORD) {
 +            throw new AlgebricksException("Can only scan datasets of records.");
 +        }
 +
 +        @SuppressWarnings("rawtypes")
 +        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
 +        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 +
 +        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
 +                adapterFactory);
 +
 +        AlgebricksPartitionConstraint constraint;
 +        try {
 +            constraint = adapterFactory.getPartitionConstraint();
 +        } catch (Exception e) {
 +            throw new AlgebricksException(e);
 +        }
 +
 +        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
 +    }
 +
 +    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
 +            JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
 +        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
 +        factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
 +        ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
 +                ExternalDataConstants.KEY_TYPE_NAME);
 +        IAdapterFactory adapterFactory = factoryOutput.first;
 +        FeedIntakeOperatorDescriptor feedIngestor = null;
 +        switch (factoryOutput.third) {
 +            case INTERNAL:
 +                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory, recordType,
 +                        policyAccessor, factoryOutput.second);
 +                break;
 +            case EXTERNAL:
 +                String libraryName = primaryFeed.getAdapterName().trim()
 +                        .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
 +                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName,
 +                        adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second);
 +                break;
 +        }
 +
 +        AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
 +        return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
 +                partitionConstraint, adapterFactory);
 +    }
 +
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
 +            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
 +            JobGenContext context, boolean retainInput, boolean retainNull, Dataset dataset, String indexName,
 +            int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
 +            Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
 +
 +        boolean isSecondary = true;
 +        int numSecondaryKeys = 0;
 +        try {
 +            boolean temp = dataset.getDatasetDetails().isTemp();
 +            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), dataset.getDatasetName());
 +            if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) {
 +                isSecondary = !indexName.equals(primaryIndex.getIndexName());
 +            }
 +            int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
 +            RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
 +            int[] bloomFilterKeyFields;
 +            ITypeTraits[] typeTraits;
 +            IBinaryComparatorFactory[] comparatorFactories;
 +
 +            ARecordType itemType = (ARecordType) this.findType(dataset.getItemTypeDataverseName(),
 +                    dataset.getItemTypeName());
 +            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
 +            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                    itemType, context.getBinaryComparatorFactoryProvider());
 +            int[] filterFields = null;
 +            int[] btreeFields = null;
 +
 +            if (isSecondary) {
 +                Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                        dataset.getDatasetName(), indexName);
 +                numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
 +                bloomFilterKeyFields = new int[numSecondaryKeys];
 +                for (int i = 0; i < numSecondaryKeys; i++) {
 +                    bloomFilterKeyFields[i] = i;
 +                }
 +                Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits = getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
 +                        secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
 +                        secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
 +                        dataset.getDatasetType());
 +                comparatorFactories = comparatorFactoriesAndTypeTraits.first;
 +                typeTraits = comparatorFactoriesAndTypeTraits.second;
 +                if (filterTypeTraits != null) {
 +                    filterFields = new int[1];
 +                    filterFields[0] = numSecondaryKeys + numPrimaryKeys;
 +                    btreeFields = new int[numSecondaryKeys + numPrimaryKeys];
 +                    for (int k = 0; k < btreeFields.length; k++) {
 +                        btreeFields[k] = k;
 +                    }
 +                }
 +
 +            } else {
 +                bloomFilterKeyFields = new int[numPrimaryKeys];
 +                for (int i = 0; i < numPrimaryKeys; i++) {
 +                    bloomFilterKeyFields[i] = i;
 +                }
- 
-                 typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
++                // get meta item type
++                ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
++                typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
 +                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
 +                        context.getBinaryComparatorFactoryProvider());
- 
 +                filterFields = DatasetUtils.createFilterFields(dataset);
 +                btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 +            }
 +
 +            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 +            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
 +            try {
 +                spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
 +                        dataset.getDatasetName(), indexName, temp);
 +            } catch (Exception e) {
 +                throw new AlgebricksException(e);
 +            }
 +
 +            ISearchOperationCallbackFactory searchCallbackFactory = null;
 +            if (isSecondary) {
 +                searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
 +                        : new SecondaryIndexSearchOperationCallbackFactory();
 +            } else {
 +                JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
 +                int datasetId = dataset.getDatasetId();
 +                int[] primaryKeyFields = new int[numPrimaryKeys];
 +                for (int i = 0; i < numPrimaryKeys; i++) {
 +                    primaryKeyFields[i] = i;
 +                }
 +
 +                AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
 +                ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
 +                if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
 +                    searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
 +                            : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
 +                                    txnSubsystemProvider, ResourceType.LSM_BTREE);
 +                } else {
 +                    searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
 +                            : new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
 +                                    txnSubsystemProvider, ResourceType.LSM_BTREE);
 +                }
 +            }
 +            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
 +                    .getMergePolicyFactory(dataset, mdTxnCtx);
 +            AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
 +            BTreeSearchOperatorDescriptor btreeSearchOp;
 +            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
 +                btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
 +                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
 +                        spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
 +                        lowKeyInclusive, highKeyInclusive,
 +                        new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
 +                                compactionInfo.first, compactionInfo.second,
 +                                isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
 +                                        : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                                rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
 +                                storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
 +                                filterCmpFactories, btreeFields, filterFields, !temp),
 +                        retainInput, retainNull, context.getNullWriterFactory(), searchCallbackFactory,
 +                        minFilterFieldIndexes, maxFilterFieldIndexes);
 +            } else {
 +                // External dataset <- use the btree with buddy btree->
 +                // Be Careful of Key Start Index ?
 +                int[] buddyBreeFields = new int[] { numSecondaryKeys };
 +                ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(
 +                        compactionInfo.first, compactionInfo.second,
 +                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
 +                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
 +                        getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
 +                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
 +                btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
 +                        rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
 +                        highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
 +                        retainNull, context.getNullWriterFactory(), searchCallbackFactory);
 +            }
 +
 +            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
 +
 +        } catch (MetadataException me) {
 +            throw new AlgebricksException(me);
 +        }
 +    }
 +
 +    private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
 +            IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
 +            List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType) throws AlgebricksException {
 +
 +        IBinaryComparatorFactory[] comparatorFactories;
 +        ITypeTraits[] typeTraits;
 +        int sidxKeyFieldCount = sidxKeyFieldNames.size();
 +        int pidxKeyFieldCount = pidxKeyFieldNames.size();
 +        typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
 +        comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
 +
 +        int i = 0;
 +        for (; i < sidxKeyFieldCount; ++i) {
 +            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
 +                    sidxKeyFieldNames.get(i), recType);
 +            IAType keyType = keyPairType.first;
 +            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
 +                    true);
 +            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 +        }
 +
 +        for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
 +            IAType keyType = null;
 +            try {
 +                switch (dsType) {
 +                    case INTERNAL:
 +                        keyType = recType.getSubFieldType(pidxKeyFieldNames.get(j));
 +                        break;
 +                    case EXTERNAL:
 +                        keyType = IndexingConstants.getFieldType(j);
 +                        break;
 +                    default:
 +                        throw new AlgebricksException("Unknown Dataset Type");
 +                }
 +            } catch (AsterixException e) {
 +                throw new AlgebricksException(e);
 +            }
 +            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
 +                    true);
 +            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 +        }
 +
 +        return new Pair<IBinaryComparatorFactory[], ITypeTraits[]>(comparatorFactories, typeTraits);
 +    }
 +
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
 +            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
 +            JobGenContext context, boolean retainInput, boolean retainNull, Dataset dataset, String indexName,
 +            int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
 +
 +        try {
 +            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
 +            int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
 +
 +            boolean temp = dataset.getDatasetDetails().isTemp();
 +            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), indexName);
 +            if (secondaryIndex == null) {
 +                throw new AlgebricksException(
 +                        "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName());
 +            }
 +            List<List<String>> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
 +            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
 +            int numSecondaryKeys = secondaryKeyFields.size();
 +            if (numSecondaryKeys != 1) {
 +                throw new AlgebricksException("Cannot use " + numSecondaryKeys
 +                        + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
 +            }
 +            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
 +                    secondaryKeyFields.get(0), recType);
 +            IAType keyType = keyTypePair.first;
 +            if (keyType == null) {
 +                throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
 +            }
 +            int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
 +            int numNestedSecondaryKeyFields = numDimensions * 2;
 +            IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
 +            for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
 +                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
 +            }
 +
 +            RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
 +            // IS NOT THE VARIABLE BELOW ALWAYS = 0 ??
 +            int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys;
 +            if (retainInput) {
 +                keysStartIndex -= numNestedSecondaryKeyFields;
 +            }
 +            IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
 +                    outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
 +            ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
 +                    numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
 +            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 +            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForDataset(
 +                    dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
 +
 +            IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
 +                    dataset, recType, context.getBinaryComparatorFactoryProvider());
 +            int[] btreeFields = new int[primaryComparatorFactories.length];
 +            for (int i = 0; i < btreeFields.length; i++) {
 +                btreeFields[i] = i + numNestedSecondaryKeyFields;
 +            }
 +
 +            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
 +            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                    recType, context.getBinaryComparatorFactoryProvider());
 +            int[] filterFields = null;
 +            int[] rtreeFields = null;
 +            if (filterTypeTraits != null) {
 +                filterFields = new int[1];
 +                filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys;
 +                rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
 +                for (int i = 0; i < rtreeFields.length; i++) {
 +                    rtreeFields[i] = i;
 +                }
 +            }
 +
 +            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
 +            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
 +                    .getMergePolicyFactory(dataset, mdTxnCtx);
 +            ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
 +                    : new SecondaryIndexSearchOperationCallbackFactory();
 +
 +            RTreeSearchOperatorDescriptor rtreeSearchOp;
 +            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
 +                rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
 +                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
 +                        spPc.first, typeTraits, comparatorFactories, keyFields,
 +                        new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
 +                                primaryComparatorFactories,
 +                                new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
 +                                compactionInfo.second,
 +                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
 +                                LSMRTreeIOOperationCallbackFactory.INSTANCE,
 +                                proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
 +                                storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
 +                                filterTypeTraits, filterCmpFactories, filterFields, !temp),
 +                        retainInput, retainNull, context.getNullWriterFactory(), searchCallbackFactory,
 +                        minFilterFieldIndexes, maxFilterFieldIndexes);
 +
 +            } else {
 +                // External Dataset
 +                ExternalRTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalRTreeDataflowHelperFactory(
 +                        valueProviderFactories, RTreePolicyType.RTREE,
 +                        IndexingConstants.getBuddyBtreeComparatorFactories(), compactionInfo.first,
 +                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
 +                        proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
 +                        getStorageProperties().getBloomFilterFalsePositiveRate(),
 +                        new int[] { numNestedSecondaryKeyFields },
 +                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
 +                // Create the operator
 +                rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
 +                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
 +                        spPc.first, typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput,
 +                        retainNull, context.getNullWriterFactory(), searchCallbackFactory);
 +            }
 +
 +            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
 +
 +        } catch (MetadataException me) {
 +            throw new AlgebricksException(me);
 +        }
 +    }
 +
 +    @Override
 +    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
 +            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
 +        FileSplitDataSink fsds = (FileSplitDataSink) sink;
 +        FileSplitSinkId fssi = fsds.getId();
 +        FileSplit fs = fssi.getFileSplit();
 +        File outFile = fs.getLocalFile().getFile();
 +        String nodeId = fs.getNodeName();
 +
 +        SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
 +                getWriterFactory(), inputDesc);
 +        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
 +        return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
 +            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
 +            JobSpecification spec) throws AlgebricksException {
 +        ResultSetDataSink rsds = (ResultSetDataSink) sink;
 +        ResultSetSinkId rssId = rsds.getId();
 +        ResultSetId rsId = rssId.getResultSetId();
 +
 +        ResultWriterOperatorDescriptor resultWriter = null;
 +        try {
 +            IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
 +                    .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory());
 +            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(),
 +                    resultSerializedAppenderFactory);
 +        } catch (IOException e) {
 +            throw new AlgebricksException(e);
 +        }
 +
 +        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
 +    }
 +
 +    @Override
 +    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
 +            throws AlgebricksException {
 +        AqlDataSource ads = findDataSource(dataSourceId);
 +        Dataset dataset = ((DatasetDataSource) ads).getDataset();
 +
 +        try {
 +            String indexName = indexId;
 +            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), indexName);
 +            if (secondaryIndex != null) {
 +                return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
 +            } else {
 +                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                        dataset.getDatasetName(), dataset.getDatasetName());
 +                if (primaryIndex.getIndexName().equals(indexId)) {
 +                    return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
 +                } else {
 +                    return null;
 +                }
 +            }
 +        } catch (MetadataException me) {
 +            throw new AlgebricksException(me);
 +        }
 +    }
 +
 +    public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
 +        Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
 +        if (dataset == null) {
 +            throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
 +        }
 +        IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
 +        IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
 +        INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
 +        AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
 +                ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
 +        return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
 +                dataset.getDatasetDetails(), domain);
 +    }
 +
 +    @Override
 +    public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
 +        boolean result = false;
 +        switch (((AqlDataSource) dataSource).getDatasourceType()) {
 +            case INTERNAL_DATASET:
 +            case EXTERNAL_DATASET:
 +                result = ((DatasetDataSource) dataSource).getDataset().getDatasetType() == DatasetType.EXTERNAL;
 +                break;
 +            case FEED:
 +                result = true;
 +                break;
 +            case LOADABLE:
 +                result = true;
 +                break;
 +            default:
 +                break;
 +        }
 +        return result;
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
 +            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
 +            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
 +            JobSpecification spec) throws AlgebricksException {
 +        String dataverseName = dataSource.getId().getDataverseName();
 +        String datasetName = dataSource.getId().getDatasourceName();
 +
 +        Dataset dataset = findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
 +        }
 +
 +        int numKeys = keys.size();
 +        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
 +
 +        // move key fields to front
 +        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
 +        int[] bloomFilterKeyFields = new int[numKeys];
 +        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
 +        int i = 0;
 +        for (LogicalVariable varKey : keys) {
 +            int idx = propagatedSchema.findVariable(varKey);
 +            fieldPermutation[i] = idx;
 +            bloomFilterKeyFields[i] = i;
 +            i++;
 +        }
 +        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 +        if (numFilterFields > 0) {
 +            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
 +            fieldPermutation[numKeys + 1] = idx;
 +        }
 +
 +        try {
 +            boolean temp = dataset.getDatasetDetails().isTemp();
 +            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 +
 +            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), dataset.getDatasetName());
 +            String indexName = primaryIndex.getIndexName();
 +
 +            String itemTypeName = dataset.getItemTypeName();
 +            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
 +                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
-             ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
++            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, null);
 +            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
 +                    itemType, context.getBinaryComparatorFactoryProvider());
 +
 +            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
 +                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
 +            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 +
 +            long numElementsHint = getCardinalityPerPartitionHint(dataset);
 +
 +            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
 +            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                    itemType, context.getBinaryComparatorFactoryProvider());
 +            int[] filterFields = DatasetUtils.createFilterFields(dataset);
 +            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 +
 +            // TODO
 +            // figure out the right behavior of the bulkload and then give the
 +            // right callback
 +            // (ex. what's the expected behavior when there is an error during
 +            // bulkload?)
 +            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
 +                    .getMergePolicyFactory(dataset, mdTxnCtx);
 +            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
 +                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
 +                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
 +                    GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
 +                    new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
 +                            compactionInfo.first, compactionInfo.second,
 +                            new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
 +                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
 +                            storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
 +                            filterCmpFactories, btreeFields, filterFields, !temp));
 +            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
 +                    splitsAndConstraint.second);
 +        } catch (MetadataException me) {
 +            throw new AlgebricksException(me);
 +        }
 +    }
 +
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
 +            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
 +            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
 +            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
 +            List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
 +
 +        String datasetName = dataSource.getId().getDatasourceName();
 +        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
 +        if (dataset == null) {
 +            throw new AlgebricksException(
 +                    "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
 +        }
 +        boolean temp = dataset.getDatasetDetails().isTemp();
 +        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 +
 +        int numKeys = keys.size();
 +        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
 +        // Move key fields to front.
 +        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
 +                + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
 +        int[] bloomFilterKeyFields = new int[numKeys];
 +        int i = 0;
 +        for (LogicalVariable varKey : keys) {
 +            int idx = propagatedSchema.findVariable(varKey);
 +            fieldPermutation[i] = idx;
 +            bloomFilterKeyFields[i] = i;
 +            i++;
 +        }
 +        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
 +        if (numFilterFields > 0) {
 +            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
 +            fieldPermutation[i++] = idx;
 +        }
 +        if (additionalNonFilteringFields != null) {
 +            for (LogicalVariable variable : additionalNonFilteringFields) {
 +                int idx = propagatedSchema.findVariable(variable);
 +                fieldPermutation[i++] = idx;
 +            }
 +        }
 +
 +        try {
 +            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), dataset.getDatasetName());
 +            String indexName = primaryIndex.getIndexName();
- 
-             String itemTypeName = dataset.getItemTypeName();
 +            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                     .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
- 
-             ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
++                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
++            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
++            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
 +
 +            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 +            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
 +                    itemType, context.getBinaryComparatorFactoryProvider());
 +            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
 +                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
 +
 +            // prepare callback
 +            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
 +            int datasetId = dataset.getDatasetId();
 +            int[] primaryKeyFields = new int[numKeys];
 +            for (i = 0; i < numKeys; i++) {
 +                primaryKeyFields[i] = i;
 +            }
 +
 +            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
 +            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
 +                    itemType, context.getBinaryComparatorFactoryProvider());
 +            int[] filterFields = DatasetUtils.createFilterFields(dataset);
 +            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 +
 +            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
 +            IModificationOperationCallbackFactory modificationCallbackFactory = temp
 +                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
 +                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
 +                    : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
 +                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
 +
 +            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
 +                    .getMergePolicyFactory(dataset, mdTxnCtx);
 +            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
 +                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
 +                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
 +                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
 +                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
 +                    btreeFields, filterFields, !temp);
 +            IOperatorDescriptor op;
 +            if (bulkload) {
 +                long numElementsHint = getCardinalityPerPartitionHint(dataset);
 +                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
 +                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
 +                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
 +                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
 +            } else {
 +                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
 +                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
 +                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
 +                        fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
 +                        NoOpOperationCallbackFactory.INSTANCE);
 +            }
 +            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
 +
 +        } catch (MetadataException me) {
 +            throw new AlgebricksException(me);
 +        }
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
 +            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
 +            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
 +            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
 +            JobSpecification spec, boolean bulkload) throws AlgebricksException {
 +        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
 +                additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
 +            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
 +            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
 +            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
 +        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
 +                additionalNonKeyFields, recordDesc, context, spec, false, null);
 +    }
 +
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
 +            IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
 +            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
 +            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
 +            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
 +            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
 +        String indexName = dataSourceIndex.getId();
 +        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
 +        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 +
 +        Dataset dataset = findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AlgebricksException("Unknown dataset " + datasetName);
 +        }
 +        Index secondaryIndex;
 +        try {
 +            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), indexName);
 +        } catch (MetadataException e) {
 +            throw new AlgebricksException(e);
 +        }
 +        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
 +        switch (secondaryIndex.getIndexType()) {
 +            case BTREE: {
 +                return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
 +                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
 +                        bulkload);
 +            }
 +            case RTREE: {
 +                return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
 +                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
 +                        bulkload);
 +            }
 +            case SINGLE_PARTITION_WORD_INVIX:
 +            case SINGLE_PARTITION_NGRAM_INVIX:
 +            case LENGTH_PARTITIONED_WORD_INVIX:
 +            case LENGTH_PARTITIONED_NGRAM_INVIX: {
 +                return getInvertedIndexDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
 +                        primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
 +                        indexOp, secondaryIndex.getIndexType(), bulkload);
 +            }
 +            default: {
 +                throw new AlgebricksException(
 +                        "Insert and delete not implemented for index type: " + secondaryIndex.getIndexType());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
 +            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
 +            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
 +            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
 +            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
 +            boolean bulkload) throws AlgebricksException {
 +        return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
 +                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
 +                bulkload);
 +    }
 +
 +    @Override
 +    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
 +            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
 +            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
 +            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
 +            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
 +
 +        String indexName = dataSourceIndex.getId();
 +        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
 +        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 +
 +        IOperatorSchema inputSchema = new OperatorSchemaImpl();
 +        if (inputSchemas.length > 0) {
 +            inputSchema = inputSchemas[0];
 +        } else {
 +            throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
 +        }
 +
 +        Dataset dataset = findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AlgebricksException("Unknown dataset " + datasetName);
 +        }
 +        Index secondaryIndex;
 +        try {
 +            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), indexName);
 +        } catch (MetadataException e) {
 +            throw new AlgebricksException(e);
 +        }
 +        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
 +        // TokenizeOperator only supports a keyword or n-gram index.
 +        switch (secondaryIndex.getIndexType()) {
 +            case SINGLE_PARTITION_WORD_INVIX:
 +            case SINGLE_PARTITION_NGRAM_INVIX:
 +            case LENGTH_PARTITIONED_WORD_INVIX:
 +            case LENGTH_PARTITIONED_NGRAM_INVIX: {
 +                return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
 +                        typeEnv, primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
 +                        IndexOperation.INSERT, secondaryIndex.getIndexType(), bulkload);
 +            }
 +            default: {
 +                throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
 +                        + secondaryIndex.getIndexType());
 +            }
 +        }
 +
 +    }
 +
 +    // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
 +    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
 +            String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
 +            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
 +            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
 +            JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
 +            throws AlgebricksException {
 +
 +        // Sanity checks.
 +        if (primaryKeys.size() > 1) {
 +            throw new AlgebricksException("Cannot tokenize composite primary key.");
 +        }
 +        if (secondaryKeys.size() > 1) {
 +            throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
 +        }
 +
 +        boolean isPartitioned;
 +        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
 +                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
 +            isPartitioned = true;
 +        } else {
 +            isPartitioned = false;
 +        }
 +
 +        // Number of Keys that needs to be propagated
 +        int numKeys = inputSchema.getSize();
 +
 +        // Get the rest of Logical Variables that are not (PK or SK) and each
 +        // variable's positions.
 +        // These variables will be propagated through TokenizeOperator.
 +        List<LogicalVariable> otherKeys = new ArrayList<LogicalVariable>();
 +        if (inputSchema.getSize() > 0) {
 +            for (int k = 0; k < inputSchema.getSize(); k++) {
 +                boolean found = false;
 +                for (LogicalVariable varKey : primaryKeys) {
 +                    if (varKey.equals(inputSchema.getVariable(k))) {
 +                        found = true;
 +                        break;
 +                    } else {
 +                        found = false;
 +                    }
 +                }
 +                if (!found) {
 +                    for (LogicalVariable varKey : secondaryKeys) {
 +                        if (varKey.equals(inputSchema.getVariable(k))) {
 +                            found = true;
 +                            break;
 +                        } else {
 +                            found = false;
 +                        }
 +                    }
 +                }
 +                if (!found) {
 +                    otherKeys.add(inputSchema.getVariable(k));
 +                }
 +            }
 +        }
 +
 +        // For tokenization, sorting and loading.
 +        // One token (+ optional partitioning field) + primary keys + secondary
 +        // keys + other variables
 +        // secondary keys and other variables will be just passed to the
 +        // IndexInsertDelete Operator.
 +        int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
 +
 +        // generate field permutations for the input
 +        int[] fieldPermutation = new int[numKeys];
 +
 +        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
 +        int i = 0;
 +        int j = 0;
 +        for (LogicalVariable varKey : primaryKeys) {
 +            int idx = propagatedSchema.findVariable(varKey);
 +            fieldPermutation[i] = idx;
 +            modificationCallbackPrimaryKeyFields[j] = i;
 +            i++;
 +            j++;
 +        }
 +        for (LogicalVariable varKey : otherKeys) {
 +            int idx = propagatedSchema.findVariable(varKey);
 +            fieldPermutation[i] = idx;
 +            i++;
 +        }
 +        for (LogicalVariable varKey : secondaryKeys) {
 +            int idx = propagatedSchema.findVariable(varKey);
 +            fieldPermutation[i] = idx;
 +            i++;
 +        }
 +
 +        Dataset dataset = findDataset(dataverseName, datasetName);
 +        if (dataset == null) {
 +            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
 +        }
 +        String itemTypeName = dataset.getItemTypeName();
 +        IAType itemType;
 +        try {
 +            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
 +                    .getDatatype();
 +
 +            if (itemType.getTypeTag() != ATypeTag.RECORD) {
 +                throw new AlgebricksException("Only record types can be tokenized.");
 +            }
 +
 +            ARecordType recType = (ARecordType) itemType;
 +
 +            // Index parameters.
 +            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
 +                    dataset.getDatasetName(), indexName);
 +
 +            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
 +            List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
 +
 +            int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
 +            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
 +            ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
 +
 +            // Find the key type of the secondary key. If it's a derived type,
 +            // return the derived type.
 +            // e.g. UNORDERED LIST -> return UNORDERED LIST type
 +            IAType secondaryKeyType = null;
 +            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
 +                    secondaryKeyExprs.get(0), recType);
 +            secondaryKeyType = keyPairType.first;
 +            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
 +            i = 0;
 +            for (List<String> partitioningKey : partitioningKeys) {
 +                IAType keyType = recType.getSubFieldType(partitioningKey);
 +                invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 +                ++i;
 +            }
 +
 +            tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
 +            if (isPartitioned) {
 +                // The partitioning field is hardcoded to be a short *without*
 +                // an Asterix type tag.
 +                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
 +            }
 +
 +            IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
 +                    secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 +
 +            P

<TRUNCATED>