You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:43:55 UTC

[07/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
new file mode 100644
index 0000000..8f555cc
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory;
+import edu.uci.ics.asterix.external.indexing.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
+import edu.uci.ics.asterix.external.indexing.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
+import edu.uci.ics.asterix.external.indexing.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
+import edu.uci.ics.asterix.external.indexing.operators.IndexInfoOperatorDescriptor;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.external.FilesIndexDescription;
+import edu.uci.ics.asterix.metadata.external.IndexingConstants;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.tools.external.data.ExternalFilesIndexOperatorDescriptor;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+public class ExternalIndexingOperations {
+
+    public static final List<List<String>> FILE_INDEX_FIELD_NAMES = new ArrayList<List<String>>();
+    public static final ArrayList<IAType> FILE_INDEX_FIELD_TYPES = new ArrayList<IAType>();
+    static {
+        FILE_INDEX_FIELD_NAMES.add(new ArrayList<String>(Arrays.asList("")));
+        FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING);
+    }
+
+    public static boolean isIndexible(ExternalDatasetDetails ds) {
+        String adapter = ds.getAdapter();
+        if (adapter.equalsIgnoreCase("hdfs") || adapter.equalsIgnoreCase("hive")
+                || adapter.equalsIgnoreCase("edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter")
+                || adapter.equalsIgnoreCase("edu.uci.ics.asterix.external.dataset.adapter.HIVEAdapter")) {
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean isRefereshActive(ExternalDatasetDetails ds) {
+        return ds.getState() != ExternalDatasetTransactionState.COMMIT;
+    }
+
+    public static boolean datasetUsesHiveAdapter(ExternalDatasetDetails ds) {
+        String adapter = ds.getAdapter();
+        return (adapter.equalsIgnoreCase("hive") || adapter
+                .equalsIgnoreCase("edu.uci.ics.asterix.external.dataset.adapter.HIVEAdapter"));
+    }
+
+    public static boolean isValidIndexName(String datasetName, String indexName) {
+        return (!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName));
+    }
+
+    public static String getFilesIndexName(String datasetName) {
+        return datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX);
+    }
+
+    public static int getRIDSize(Dataset dataset) {
+        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
+        return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT));
+    }
+
+    public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) {
+        ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails());
+        return IndexingConstants.getComparatorFactories((dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)));
+    }
+
+    public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() {
+        return IndexingConstants.getBuddyBtreeComparatorFactories();
+    }
+
+    public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset) throws AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        try {
+            // Create the file system object
+            FileSystem fs = getFileSystemObject(datasetDetails.getProperties());
+            // If dataset uses hive adapter, add path to the dataset properties
+            if (datasetUsesHiveAdapter(datasetDetails)) {
+                HiveAdapterFactory.populateConfiguration(datasetDetails.getProperties());
+            }
+            // Get paths of dataset
+            String path = datasetDetails.getProperties().get(HDFSAdapterFactory.KEY_PATH);
+            String[] paths = path.split(",");
+
+            // Add fileStatuses to files
+            for (String aPath : paths) {
+                FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
+                for (int i = 0; i < fileStatuses.length; i++) {
+                    int nextFileNumber = files.size();
+                    if (fileStatuses[i].isDirectory()) {
+                        listSubFiles(dataset, fs, fileStatuses[i], files);
+                    } else {
+                        files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(),
+                                nextFileNumber, fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i]
+                                        .getModificationTime()), fileStatuses[i].getLen(),
+                                ExternalFilePendingOp.PENDING_NO_OP));
+                    }
+                }
+            }
+            // Close file system
+            fs.close();
+            if (files.size() == 0) {
+                throw new AlgebricksException("File Snapshot retrieved from external file system is empty");
+            }
+            return files;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Unable to get list of HDFS files " + e);
+        }
+    }
+
+    /* list all files under the directory
+     * src is expected to be a folder
+     */
+    private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, ArrayList<ExternalFile> files)
+            throws IOException {
+        Path path = src.getPath();
+        FileStatus[] fileStatuses = srcFs.listStatus(path);
+        for (int i = 0; i < fileStatuses.length; i++) {
+            int nextFileNumber = files.size();
+            if (fileStatuses[i].isDirectory()) {
+                listSubFiles(dataset, srcFs, fileStatuses[i], files);
+            } else {
+                files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
+                        fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()),
+                        fileStatuses[i].getLen(), ExternalFilePendingOp.PENDING_NO_OP));
+            }
+        }
+    }
+
+    public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException {
+        Configuration conf = new Configuration();
+        conf.set("fs.default.name", map.get(HDFSAdapterFactory.KEY_HDFS_URL).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        return FileSystem.get(conf);
+    }
+
+    public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset,
+            ArrayList<ExternalFile> externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex)
+            throws MetadataException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata(
+                FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, false, dataset.getDatasetId(),
+                mergePolicyFactory, mergePolicyFactoryProperties);
+        PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.ExternalBTreeResource);
+        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+                mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
+                        dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+        ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, indexDataflowHelperFactory, localResourceFactoryProvider,
+                externalFilesSnapshot, createIndex);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp,
+                secondarySplitsAndConstraint.second);
+        spec.addRoot(externalFilesOp);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    /**
+     * This method create an indexing operator that index records in HDFS
+     * 
+     * @param jobSpec
+     * @param itemType
+     * @param dataset
+     * @param format
+     * @param files
+     * @param indexerDesc
+     * @return
+     * @throws Exception
+     */
+    private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
+            JobSpecification jobSpec, IAType itemType, Dataset dataset, List<ExternalFile> files,
+            RecordDescriptor indexerDesc, AqlMetadataProvider metadataProvider) throws Exception {
+        HDFSIndexingAdapterFactory adapterFactory = new HDFSIndexingAdapterFactory();
+        adapterFactory.setFiles(files);
+        adapterFactory.configure(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties(),
+                (ARecordType) itemType);
+        return new Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>(
+                new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
+                adapterFactory.getPartitionConstraint());
+    }
+
+    public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
+            JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
+            RecordDescriptor indexerDesc, List<ExternalFile> files) throws Exception {
+        if (files == null) {
+            files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
+        }
+        return getExternalDataIndexingOperator(spec, itemType, dataset, files, indexerDesc, metadataProvider);
+    }
+
+    /**
+     * At the end of this method, we expect to have 4 sets as follows:
+     * metadataFiles should contain only the files that are appended in their original state
+     * addedFiles should contain new files that has number assigned starting after the max original file number
+     * deleteedFiles should contain files that are no longer there in the file system
+     * appendedFiles should have the new file information of existing files
+     * The method should return false in case of zero delta
+     * 
+     * @param dataset
+     * @param metadataFiles
+     * @param addedFiles
+     * @param deletedFiles
+     * @param appendedFiles
+     * @return
+     * @throws MetadataException
+     * @throws AlgebricksException
+     */
+    public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles,
+            List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles)
+            throws MetadataException, AlgebricksException {
+        boolean uptodate = true;
+        int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1;
+
+        ArrayList<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset);
+
+        // Loop over file system files < taking care of added files >
+        for (ExternalFile fileSystemFile : fileSystemFiles) {
+            boolean fileFound = false;
+            Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+            while (mdFilesIterator.hasNext()) {
+                ExternalFile metadataFile = mdFilesIterator.next();
+                if (fileSystemFile.getFileName().equals(metadataFile.getFileName())) {
+                    // Same file name
+                    if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) {
+                        // Same timestamp
+                        if (fileSystemFile.getSize() == metadataFile.getSize()) {
+                            // Same size -> no op
+                            mdFilesIterator.remove();
+                            fileFound = true;
+                        } else {
+                            // Different size -> append op
+                            metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
+                            fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP);
+                            appendedFiles.add(fileSystemFile);
+                            fileFound = true;
+                            uptodate = false;
+                        }
+                    } else {
+                        // Same file name, Different file mod date -> delete and add
+                        metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
+                        deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile
+                                .getDatasetName(), 0, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
+                                metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP));
+                        fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
+                        fileSystemFile.setFileNumber(newFileNumber);
+                        addedFiles.add(fileSystemFile);
+                        newFileNumber++;
+                        fileFound = true;
+                        uptodate = false;
+                    }
+                }
+                if (fileFound)
+                    break;
+            }
+            if (!fileFound) {
+                // File not stored previously in metadata -> pending add op
+                fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
+                fileSystemFile.setFileNumber(newFileNumber);
+                addedFiles.add(fileSystemFile);
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+
+        // Done with files from external file system -> metadata files now contain both deleted files and appended ones
+        // first, correct number assignment to deleted and updated files
+        for (ExternalFile deletedFile : deletedFiles) {
+            deletedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+        for (ExternalFile appendedFile : appendedFiles) {
+            appendedFile.setFileNumber(newFileNumber);
+            newFileNumber++;
+        }
+
+        // include the remaining deleted files
+        Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator();
+        while (mdFilesIterator.hasNext()) {
+            ExternalFile metadataFile = mdFilesIterator.next();
+            if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) {
+                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
+                deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
+                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), metadataFile
+                                .getSize(), metadataFile.getPendingOp()));
+                newFileNumber++;
+                uptodate = false;
+            }
+        }
+        return uptodate;
+    }
+
+    public static Dataset createTransactionDataset(Dataset dataset) {
+        ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(),
+                originalDsd.getTimestamp(), ExternalDatasetTransactionState.BEGIN);
+        Dataset transactionDatset = new Dataset(dataset.getDataverseName(), dataset.getDatasetName(),
+                dataset.getItemTypeName(), dataset.getNodeGroupName(), dataset.getCompactionPolicy(),
+                dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(), DatasetType.EXTERNAL,
+                dataset.getDatasetId(), dataset.getPendingOp());
+        return transactionDatset;
+    }
+
+    public static boolean isFileIndex(Index index) {
+        return (index.getIndexName().equals(getFilesIndexName(index.getDatasetName())));
+    }
+
+    public static JobSpecification buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
+            AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
+        String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+                : indexDropStmt.getDataverseName();
+        String datasetName = indexDropStmt.getDatasetName();
+        String indexName = indexDropStmt.getIndexName();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
+        AlgebricksPartitionConstraintHelper
+                .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+
+        return spec;
+    }
+
+    public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles,
+            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
+            AqlMetadataProvider metadataProvider) throws MetadataException, AlgebricksException {
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+        for (ExternalFile file : metadataFiles) {
+            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP)
+                files.add(file);
+            else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+                for (ExternalFile appendedFile : appendedFiles) {
+                    if (appendedFile.getFileName().equals(file.getFileName())) {
+                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(),
+                                file.getFileNumber(), file.getFileName(), file.getLastModefiedTime(), appendedFile
+                                        .getSize(), ExternalFilePendingOp.PENDING_NO_OP));
+                    }
+                }
+            }
+        }
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        Collections.sort(files);
+        return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false);
+    }
+
+    public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles,
+            List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles,
+            AqlMetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+        // Create files list
+        ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+
+        for (ExternalFile metadataFile : metadataFiles) {
+            if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) {
+                files.add(metadataFile);
+            } else {
+                metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                files.add(metadataFile);
+            }
+        }
+        // add new files
+        for (ExternalFile file : addedFiles) {
+            files.add(file);
+        }
+        // add appended files
+        for (ExternalFile file : appendedFiles) {
+            files.add(file);
+        }
+
+        CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(),
+                index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
+                index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
+        return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, metadataProvider, files);
+    }
+
+    public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
+                metadataProvider.getMetadataTxnContext());
+        boolean temp = ds.getDatasetDetails().isTemp();
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                        getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
+                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                if (index.getIndexType() == IndexType.BTREE) {
+                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, spec));
+                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                } else if (index.getIndexType() == IndexType.RTREE) {
+                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
+                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                }
+            }
+        }
+
+        ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
+                rtreeDataflowHelperFactories, rtreeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    private static ExternalBTreeDataflowHelperFactory getFilesIndexDataflowHelperFactory(Dataset ds,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
+            AsterixStorageProperties storageProperties, JobSpecification spec) {
+        return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(),
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
+    }
+
+    private static ExternalBTreeWithBuddyDataflowHelperFactory getBTreeDataflowHelperFactory(Dataset ds, Index index,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
+            AsterixStorageProperties storageProperties, JobSpecification spec) {
+        return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
+            AsterixStorageProperties storageProperties, AqlMetadataProvider metadataProvider, JobSpecification spec)
+            throws AlgebricksException, AsterixException {
+        int numPrimaryKeys = getRIDSize(ds);
+        List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
+        secondaryKeyFields.size();
+        ARecordType itemType = (ARecordType) metadataProvider.findType(ds.getDataverseName(), ds.getItemTypeName());
+        Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        if (spatialType == null) {
+            throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+        }
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+
+        ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+                + numNestedSecondaryKeyFields];
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        ATypeTag keyType = nestedKeyType.getTypeTag();
+
+        keyType = nestedKeyType.getTypeTag();
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            secondaryRecFields[i] = keySerde;
+
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    nestedKeyType, true);
+            secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+        }
+        // Add serializers and comparators for primary index fields.
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i);
+            secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i);
+        }
+        int[] primaryKeyFields = new int[numPrimaryKeys];
+        for (int i = 0; i < primaryKeyFields.length; i++) {
+            primaryKeyFields[i] = i + numNestedSecondaryKeyFields;
+        }
+
+        return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+                getBuddyBtreeComparatorFactories(), mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
+    }
+
+    public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+
+        boolean temp = ds.getDatasetDetails().isTemp();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                        getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
+                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                if (index.getIndexType() == IndexType.BTREE) {
+                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, spec));
+                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                } else if (index.getIndexType() == IndexType.RTREE) {
+                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
+                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                }
+            }
+        }
+
+        ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
+                rtreeDataflowHelperFactories, rtreeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+
+    }
+
+    public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
+            throws AlgebricksException, AsterixException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        boolean temp = ds.getDatasetDetails().isTemp();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                        getFilesIndexName(ds.getDatasetName()), temp);
+        IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
+                mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
+        IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+
+        ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+        ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<ExternalRTreeDataflowHelperFactory>();
+        ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<IndexInfoOperatorDescriptor>();
+
+        for (Index index : indexes) {
+            if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                        .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
+                                index.getIndexName(), temp);
+                if (index.getIndexType() == IndexType.BTREE) {
+                    btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, spec));
+                    btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                } else if (index.getIndexType() == IndexType.RTREE) {
+                    rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
+                            mergePolicyFactoryProperties, storageProperties, metadataProvider, spec));
+                    rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER));
+                }
+            }
+        }
+
+        ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec,
+                filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos,
+                rtreeDataflowHelperFactories, rtreeInfos);
+
+        spec.addRoot(op);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op,
+                filesIndexSplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+
+    public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, AqlMetadataProvider metadataProvider)
+            throws MetadataException, AlgebricksException {
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+        ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
+        Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+                        dataset.getDatasetName(), getFilesIndexName(dataset.getDatasetName()), true);
+        IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+        ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+                mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
+                        dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
+                ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                secondaryFileSplitProvider, FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
+                FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, indexDataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE);
+        spec.addRoot(compactOp);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                secondarySplitsAndConstraint.second);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
new file mode 100644
index 0000000..379a4f0
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.util.Collection;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.message.EndFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.PrepareStallMessage;
+import edu.uci.ics.asterix.metadata.feeds.TerminateDataFlowMessage;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+
+/**
+ * Provides helper method(s) for creating JobSpec for operations on a feed.
+ */
+public class FeedOperations {
+
+    /**
+     * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
+     * 
+     * @param primaryFeed
+     * @param metadataProvider
+     * @return JobSpecification the Hyracks job specification for receiving data from external source
+     * @throws Exception
+     */
+    public static Pair<JobSpecification, IFeedAdapterFactory> buildFeedIntakeJobSpec(PrimaryFeed primaryFeed,
+            AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
+
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+        IFeedAdapterFactory adapterFactory = null;
+        IOperatorDescriptor feedIngestor;
+        AlgebricksPartitionConstraint ingesterPc;
+
+        try {
+            Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> t = metadataProvider
+                    .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
+            feedIngestor = t.first;
+            ingesterPc = t.second;
+            adapterFactory = t.third;
+        } catch (AlgebricksException e) {
+            e.printStackTrace();
+            throw new AsterixException(e);
+        }
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
+
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
+        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
+        spec.addRoot(nullSink);
+        return new Pair<JobSpecification, IFeedAdapterFactory>(spec, adapterFactory);
+    }
+
+    public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
+            throws AsterixException, AlgebricksException {
+
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IOperatorDescriptor feedMessenger = null;
+        AlgebricksPartitionConstraint messengerPc = null;
+
+        List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
+                locations);
+
+        feedMessenger = p.first;
+        messengerPc = p.second;
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+        spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+        spec.addRoot(nullSink);
+
+        return spec;
+    }
+
+    /**
+     * Builds the job spec for sending message to an active feed to disconnect it from the
+     * its source.
+     */
+    public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(AqlMetadataProvider metadataProvider,
+            FeedConnectionId connectionId) throws AsterixException, AlgebricksException {
+
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IOperatorDescriptor feedMessenger;
+        AlgebricksPartitionConstraint messengerPc;
+        List<String> locations = null;
+        FeedRuntimeType sourceRuntimeType;
+        try {
+            FeedConnectJobInfo cInfo = FeedLifecycleListener.INSTANCE.getFeedConnectJobInfo(connectionId);
+            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
+            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
+
+            boolean terminateIntakeJob = false;
+            boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
+            if (completeDisconnect) {
+                sourceRuntimeType = FeedRuntimeType.INTAKE;
+                locations = cInfo.getCollectLocations();
+                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
+            } else {
+                locations = cInfo.getComputeLocations();
+                sourceRuntimeType = FeedRuntimeType.COMPUTE;
+            }
+
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
+                    connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
+
+            feedMessenger = p.first;
+            messengerPc = p.second;
+
+            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+            NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+            spec.addRoot(nullSink);
+            return new Pair<JobSpecification, Boolean>(spec, terminateIntakeJob);
+
+        } catch (AlgebricksException e) {
+            throw new AsterixException(e);
+        }
+
+    }
+
+    public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
+            Collection<String> collectLocations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
+            ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
+            List<String> collectLocations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
+            Collection<String> targetLocations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
+            JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
+        FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
+        IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
+                feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
+        return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
+    }
+
+    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
+            JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
+            Collection<String> locations) throws AlgebricksException {
+        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                locations.toArray(new String[] {}));
+        FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId,
+                feedMessage);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
+    }
+
+    private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
+            AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
+                messengerPc);
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
+        messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
+        messageJobSpec.addRoot(nullSink);
+        return messageJobSpec;
+    }
+
+    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
+            JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
+            FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
+            throws AlgebricksException {
+        IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
+                completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
+        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
new file mode 100644
index 0000000..0642151
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class IndexOperations {
+
+    private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
+            .getPhysicalOptimizationConfig();
+
+    public static JobSpecification buildSecondaryIndexCreationJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
+            throws AsterixException, AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
+                        createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
+                        createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
+                        createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
+                        physicalOptimizationConfig, recType, enforcedType);
+        return secondaryIndexHelper.buildCreationJobSpec();
+    }
+
+    public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
+            throws AsterixException, AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
+                        createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
+                        createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
+                        createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
+                        physicalOptimizationConfig, recType, enforcedType);
+        return secondaryIndexHelper.buildLoadingJobSpec();
+    }
+
+    public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
+            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider,
+            List<ExternalFile> files) throws AsterixException, AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
+                        createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
+                        createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(),
+                        createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider,
+                        physicalOptimizationConfig, recType, enforcedType);
+        secondaryIndexHelper.setExternalFiles(files);
+        return secondaryIndexHelper.buildLoadingJobSpec();
+    }
+
+    public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
+            AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
+        String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+                : indexDropStmt.getDataverseName();
+        String datasetName = indexDropStmt.getDatasetName();
+        String indexName = indexDropStmt.getIndexName();
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        boolean temp = dataset.getDatasetDetails().isTemp();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+
+        // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
+        IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, true));
+        AlgebricksPartitionConstraintHelper
+                .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+        spec.addRoot(btreeDrop);
+
+        return spec;
+    }
+
+    public static JobSpecification buildSecondaryIndexCompactJobSpec(CompiledIndexCompactStatement indexCompactStmt,
+            ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider, Dataset dataset)
+            throws AsterixException, AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(indexCompactStmt.getIndexType(), indexCompactStmt.getDataverseName(),
+                        indexCompactStmt.getDatasetName(), indexCompactStmt.getIndexName(),
+                        indexCompactStmt.getKeyFields(), indexCompactStmt.getKeyTypes(), indexCompactStmt.isEnforced(),
+                        indexCompactStmt.getGramLength(), metadataProvider, physicalOptimizationConfig, recType,
+                        enforcedType);
+        return secondaryIndexHelper.buildCompactJobSpec();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java b/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
new file mode 100644
index 0000000..5e76fdf
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobSpecificationUtils {
+    public static JobSpecification createJobSpecification() {
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+        int frameSize = compilerProperties.getFrameSize();
+        JobSpecification spec = new JobSpecification(frameSize);
+        return spec;
+    }
+}
\ No newline at end of file