You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:06 UTC

[18/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
deleted file mode 100644
index 93917cc..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ /dev/null
@@ -1,3069 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.aql.translator;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.asterix.api.common.APIFramework;
-import edu.uci.ics.asterix.api.common.Job;
-import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
-import edu.uci.ics.asterix.aql.base.Statement;
-import edu.uci.ics.asterix.aql.expression.CompactStatement;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
-import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
-import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
-import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DatasetDecl;
-import edu.uci.ics.asterix.aql.expression.DataverseDecl;
-import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
-import edu.uci.ics.asterix.aql.expression.DeleteStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DropStatement;
-import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
-import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
-import edu.uci.ics.asterix.aql.expression.FunctionDecl;
-import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
-import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
-import edu.uci.ics.asterix.aql.expression.Identifier;
-import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
-import edu.uci.ics.asterix.aql.expression.InsertStatement;
-import edu.uci.ics.asterix.aql.expression.InternalDetailsDecl;
-import edu.uci.ics.asterix.aql.expression.LoadStatement;
-import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
-import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
-import edu.uci.ics.asterix.aql.expression.Query;
-import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
-import edu.uci.ics.asterix.aql.expression.SetStatement;
-import edu.uci.ics.asterix.aql.expression.SubscribeFeedStatement;
-import edu.uci.ics.asterix.aql.expression.TypeDecl;
-import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
-import edu.uci.ics.asterix.aql.expression.TypeExpression;
-import edu.uci.ics.asterix.aql.expression.WriteStatement;
-import edu.uci.ics.asterix.aql.util.FunctionUtils;
-import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
-import edu.uci.ics.asterix.common.feeds.FeedId;
-import edu.uci.ics.asterix.common.feeds.FeedJointKey;
-import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
-import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
-import edu.uci.ics.asterix.common.feeds.api.IFeedJoint.FeedJointType;
-import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
-import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.feeds.CentralFeedManager;
-import edu.uci.ics.asterix.feeds.FeedJoint;
-import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
-import edu.uci.ics.asterix.file.DatasetOperations;
-import edu.uci.ics.asterix.file.DataverseOperations;
-import edu.uci.ics.asterix.file.ExternalIndexingOperations;
-import edu.uci.ics.asterix.file.FeedOperations;
-import edu.uci.ics.asterix.file.IndexOperations;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.IDatasetDetails;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
-import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
-import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.Datatype;
-import edu.uci.ics.asterix.metadata.entities.Dataverse;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
-import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.entities.Function;
-import edu.uci.ics.asterix.metadata.entities.Index;
-import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
-import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
-import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
-import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.TypeSignature;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
-import edu.uci.ics.asterix.result.ResultReader;
-import edu.uci.ics.asterix.result.ResultUtils;
-import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
-import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
-import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
-import edu.uci.ics.asterix.translator.TypeTranslator;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
-import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-
-/*
- * Provides functionality for executing a batch of AQL statements (queries included)
- * sequentially.
- */
-public class AqlTranslator extends AbstractAqlTranslator {
-
-    private static Logger LOGGER = Logger.getLogger(AqlTranslator.class.getName());
-
-    private enum ProgressState {
-        NO_PROGRESS,
-        ADDED_PENDINGOP_RECORD_TO_METADATA
-    }
-
-    public static enum ResultDelivery {
-        SYNC,
-        ASYNC,
-        ASYNC_DEFERRED
-    }
-
-    public static final boolean IS_DEBUG_MODE = false;//true
-    private final List<Statement> aqlStatements;
-    private final SessionConfig sessionConfig;
-    private Dataverse activeDefaultDataverse;
-    private final List<FunctionDecl> declaredFunctions;
-
-    public AqlTranslator(List<Statement> aqlStatements, SessionConfig conf) throws MetadataException, AsterixException {
-        this.aqlStatements = aqlStatements;
-        this.sessionConfig = conf;
-        declaredFunctions = getDeclaredFunctions(aqlStatements);
-    }
-
-    private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
-        List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
-        for (Statement st : statements) {
-            if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
-                functionDecls.add((FunctionDecl) st);
-            }
-        }
-        return functionDecls;
-    }
-
-    /**
-     * Compiles and submits for execution a list of AQL statements.
-     * 
-     * @param hcc
-     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
-     * @param hdc
-     *            A Hyracks dataset client object that is used to read the results.
-     * @param resultDelivery
-     *            True if the results should be read asynchronously or false if we should wait for results to be read.
-     * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
-     * @throws Exception
-     */
-    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
-            throws Exception {
-        int resultSetIdCounter = 0;
-        FileSplit outputFile = null;
-        IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
-        IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
-        Map<String, String> config = new HashMap<String, String>();
-
-        for (Statement stmt : aqlStatements) {
-            validateOperation(activeDefaultDataverse, stmt);
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
-                    CentralFeedManager.getInstance());
-            metadataProvider.setWriterFactory(writerFactory);
-            metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
-            metadataProvider.setOutputFile(outputFile);
-            metadataProvider.setConfig(config);
-            switch (stmt.getKind()) {
-                case SET: {
-                    handleSetStatement(metadataProvider, stmt, config);
-                    break;
-                }
-                case DATAVERSE_DECL: {
-                    activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
-                    break;
-                }
-                case CREATE_DATAVERSE: {
-                    handleCreateDataverseStatement(metadataProvider, stmt);
-                    break;
-                }
-                case DATASET_DECL: {
-                    handleCreateDatasetStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case CREATE_INDEX: {
-                    handleCreateIndexStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case TYPE_DECL: {
-                    handleCreateTypeStatement(metadataProvider, stmt);
-                    break;
-                }
-                case NODEGROUP_DECL: {
-                    handleCreateNodeGroupStatement(metadataProvider, stmt);
-                    break;
-                }
-                case DATAVERSE_DROP: {
-                    handleDataverseDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case DATASET_DROP: {
-                    handleDatasetDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case INDEX_DROP: {
-                    handleIndexDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case TYPE_DROP: {
-                    handleTypeDropStatement(metadataProvider, stmt);
-                    break;
-                }
-                case NODEGROUP_DROP: {
-                    handleNodegroupDropStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case CREATE_FUNCTION: {
-                    handleCreateFunctionStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case FUNCTION_DROP: {
-                    handleFunctionDropStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case LOAD: {
-                    handleLoadStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case INSERT: {
-                    handleInsertStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case DELETE: {
-                    handleDeleteStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CREATE_PRIMARY_FEED:
-                case CREATE_SECONDARY_FEED: {
-                    handleCreateFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DROP_FEED: {
-                    handleDropFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DROP_FEED_POLICY: {
-                    handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CONNECT_FEED: {
-                    handleConnectFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DISCONNECT_FEED: {
-                    handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case SUBSCRIBE_FEED: {
-                    handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CREATE_FEED_POLICY: {
-                    handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case QUERY: {
-                    metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
-                    metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
-                            || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
-                    handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery);
-                    break;
-                }
-
-                case COMPACT: {
-                    handleCompactStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case EXTERNAL_DATASET_REFRESH: {
-                    handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case WRITE: {
-                    Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt);
-                    if (result.first != null) {
-                        writerFactory = result.first;
-                    }
-                    outputFile = result.second;
-                    break;
-                }
-
-                case RUN: {
-                    handleRunStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-            }
-        }
-    }
-
-    private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config)
-            throws RemoteException, ACIDException {
-        SetStatement ss = (SetStatement) stmt;
-        String pname = ss.getPropName();
-        String pvalue = ss.getPropValue();
-        config.put(pname, pvalue);
-    }
-
-    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws InstantiationException, IllegalAccessException, ClassNotFoundException {
-        WriteStatement ws = (WriteStatement) stmt;
-        File f = new File(ws.getFileName());
-        FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
-        IAWriterFactory writerFactory = null;
-        if (ws.getWriterClassName() != null) {
-            writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
-        }
-        return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
-    }
-
-    private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws Exception {
-        DataverseDecl dvd = (DataverseDecl) stmt;
-        String dvName = dvd.getDataverseName().getValue();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
-        try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-            if (dv == null) {
-                throw new MetadataException("Unknown dataverse " + dvName);
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            return dv;
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw new MetadataException(e);
-        } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
-        }
-    }
-
-    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-
-        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-        String dvName = stmtCreateDataverse.getDataverseName().getValue();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
-        try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-            if (dv != null) {
-                if (stmtCreateDataverse.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
-                }
-            }
-            MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-                    stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
-        }
-    }
-
-    private void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
-            MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws AsterixException, Exception {
-        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
-                MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
-        if (compactionPolicyEntity == null) {
-            throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
-        }
-        String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(
-                compactionPolicyFactoryClassName).newInstance();
-        if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
-            throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
-        }
-        if (compactionPolicyProperties == null) {
-            if (mergePolicyFactory.getName().compareTo("no-merge") != 0) {
-                throw new AsterixException("Compaction policy properties are missing.");
-            }
-        } else {
-            for (Map.Entry<String, String> entry : compactionPolicyProperties.entrySet()) {
-                if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) {
-                    throw new AsterixException("Invalid compaction policy property: " + entry.getKey());
-                }
-            }
-            for (String p : mergePolicyFactory.getPropertiesNames()) {
-                if (!compactionPolicyProperties.containsKey(p)) {
-                    throw new AsterixException("Missing compaction policy property: " + p);
-                }
-            }
-        }
-    }
-
-    private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws AsterixException, Exception {
-
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        DatasetDecl dd = (DatasetDecl) stmt;
-        String dataverseName = getActiveDataverse(dd.getDataverse());
-        String datasetName = dd.getName().getValue();
-        DatasetType dsType = dd.getDatasetType();
-        String itemTypeName = dd.getItemTypeName().getValue();
-        Identifier ngNameId = dd.getNodegroupName();
-        String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
-        String compactionPolicy = dd.getCompactionPolicy();
-        Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
-        boolean defaultCompactionPolicy = (compactionPolicy == null);
-        boolean temp = dd.getDatasetDetailsDecl().isTemp();
-
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, dataverseName + "." + itemTypeName,
-                nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
-        Dataset dataset = null;
-        try {
-
-            IDatasetDetails datasetDetails = null;
-            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
-            if (ds != null) {
-                if (dd.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
-                }
-            }
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    itemTypeName);
-            if (dt == null) {
-                throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
-            }
-            String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName,
-                    mdTxnCtx);
-
-            if (compactionPolicy == null) {
-                compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
-                compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
-            } else {
-                validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
-            }
-            switch (dd.getDatasetType()) {
-                case INTERNAL: {
-                    IAType itemType = dt.getDatatype();
-                    if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                        throw new AlgebricksException("Can only partition ARecord's.");
-                    }
-                    List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getPartitioningExprs();
-                    boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
-                    ARecordType aRecordType = (ARecordType) itemType;
-                    List<IAType> partitioningTypes = aRecordType.validatePartitioningExpressions(partitioningExprs,
-                            autogenerated);
-
-                    List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
-                    if (filterField != null) {
-                        aRecordType.validateFilterField(filterField);
-                    }
-                    if (compactionPolicy == null) {
-                        if (filterField != null) {
-                            // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the
-                            // correlated-prefix as the default merge policy.
-                            compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
-                            compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
-                        }
-                    }
-                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-                            partitioningTypes, autogenerated, filterField, temp);
-                    break;
-                }
-                case EXTERNAL: {
-                    String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-
-                    datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
-                            ExternalDatasetTransactionState.COMMIT);
-                    break;
-                }
-
-            }
-
-            //#. initialize DatasetIdFactory if it is not initialized.
-            if (!DatasetIdFactory.isInitialized()) {
-                DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
-            }
-
-            //#. add a new dataset with PendingAddOp
-            dataset = new Dataset(dataverseName, datasetName, itemTypeName, ngName, compactionPolicy,
-                    compactionPolicyProperties, datasetDetails, dd.getHints(), dsType,
-                    DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
-            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-
-            if (dd.getDatasetType() == DatasetType.INTERNAL) {
-                Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-                        dataverseName);
-                JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
-                        metadataProvider);
-
-                //#. make metadataTxn commit before calling runJob.
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                //#. runJob
-                runJob(hcc, jobSpec, true);
-
-                //#. begin new metadataTxn
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            }
-
-            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
-            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-            dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
-            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-
-                //#. execute compensation operations
-                //   remove the index in NC
-                //   [Notice]
-                //   As long as we updated(and committed) metadata, we should remove any effect of the job
-                //   because an exception occurs during runJob.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-                try {
-                    JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-
-                    runJob(hcc, jobSpec, true);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    if (bActiveTxn) {
-                        abort(e, e2, mdTxnCtx);
-                    }
-                }
-
-                //   remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                try {
-                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
-                            + "." + datasetName + ") couldn't be removed from the metadata", e);
-                }
-            }
-
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, dataverseName + "." + itemTypeName,
-                    nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
-        }
-    }
-
-    private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
-        List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
-        boolean resourceInUse = false;
-        StringBuilder builder = new StringBuilder();
-
-        if (activeFeedConnections != null && !activeFeedConnections.isEmpty()) {
-            for (FeedConnectionId connId : activeFeedConnections) {
-                if (connId.getDatasetName().equals(datasetName)) {
-                    resourceInUse = true;
-                    builder.append(connId + "\n");
-                }
-            }
-        }
-
-        if (resourceInUse) {
-            throw new AsterixException("Dataset " + datasetName + " is currently being "
-                    + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
-        }
-
-    }
-
-    private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
-        if (ngNameId != null) {
-            return ngNameId.getValue();
-        }
-        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
-        if (hintValue == null) {
-            return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
-        } else {
-            return (dataverse + ":" + dd.getName().getValue());
-        }
-    }
-
-    private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
-            throws AsterixException {
-        int nodegroupCardinality = -1;
-        String nodegroupName;
-        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
-        if (hintValue == null) {
-            nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
-            return nodegroupName;
-        } else {
-            int numChosen = 0;
-            boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
-                    dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
-            if (!valid) {
-                throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
-            } else {
-                nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
-            }
-            Set<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
-            Set<String> nodeNamesClone = new HashSet<String>();
-            for (String node : nodeNames) {
-                nodeNamesClone.add(node);
-            }
-            String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
-            List<String> selectedNodes = new ArrayList<String>();
-            selectedNodes.add(metadataNodeName);
-            numChosen++;
-            nodeNamesClone.remove(metadataNodeName);
-
-            if (numChosen < nodegroupCardinality) {
-                Random random = new Random();
-                String[] nodes = nodeNamesClone.toArray(new String[] {});
-                int[] b = new int[nodeNamesClone.size()];
-                for (int i = 0; i < b.length; i++) {
-                    b[i] = i;
-                }
-
-                for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
-                    int selected = i + random.nextInt(nodeNamesClone.size() - i);
-                    int selNodeIndex = b[selected];
-                    selectedNodes.add(nodes[selNodeIndex]);
-                    int temp = b[0];
-                    b[0] = b[selected];
-                    b[selected] = temp;
-                }
-            }
-            nodegroupName = dataverse + ":" + dd.getName().getValue();
-            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
-            return nodegroupName;
-        }
-
-    }
-
-    @SuppressWarnings("unchecked")
-    private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-        String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
-        String datasetName = stmtCreateIndex.getDatasetName().getValue();
-
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
-        String indexName = null;
-        JobSpecification spec = null;
-        Dataset ds = null;
-        // For external datasets
-        ArrayList<ExternalFile> externalFilesSnapshot = null;
-        boolean firstExternalDatasetIndex = false;
-        boolean filesIndexReplicated = false;
-        Index filesIndex = null;
-        boolean datasetLocked = false;
-        try {
-            ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
-            if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
-            }
-
-            indexName = stmtCreateIndex.getIndexName().getValue();
-            Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName, indexName);
-
-            String itemTypeName = ds.getItemTypeName();
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    itemTypeName);
-            IAType itemType = dt.getDatatype();
-            ARecordType aRecordType = (ARecordType) itemType;
-
-            List<List<String>> indexFields = new ArrayList<List<String>>();
-            List<IAType> indexFieldTypes = new ArrayList<IAType>();
-            for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) {
-                IAType fieldType = null;
-                boolean isOpen = aRecordType.isOpen();
-                ARecordType subType = aRecordType;
-                int i = 0;
-                if (fieldExpr.first.size() > 1 && !isOpen) {
-                    for (; i < fieldExpr.first.size() - 1;) {
-                        subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i));
-                        i++;
-                        if (subType.isOpen()) {
-                            isOpen = true;
-                            break;
-                        };
-                    }
-                }
-                if (fieldExpr.second == null) {
-                    fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size()));
-                } else {
-                    if (!stmtCreateIndex.isEnforced())
-                        throw new AlgebricksException("Cannot create typed index on \"" + fieldExpr.first
-                                + "\" field without enforcing it's type");
-                    if (!isOpen)
-                        throw new AlgebricksException("Typed index on \"" + fieldExpr.first
-                                + "\" field could be created only for open datatype");
-                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
-                            indexName, dataverseName);
-                    TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
-                    fieldType = typeMap.get(typeSignature);
-                }
-                if (fieldType == null)
-                    throw new AlgebricksException("Unknown type " + fieldExpr.second);
-
-                indexFields.add(fieldExpr.first);
-                indexFieldTypes.add(fieldType);
-            }
-
-            aRecordType.validateKeyFields(indexFields, indexFieldTypes, stmtCreateIndex.getIndexType());
-
-            if (idx != null) {
-                if (stmtCreateIndex.getIfNotExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-                }
-            }
-
-            // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key.
-            // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop.
-            if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                    || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-                List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(ds);
-                for (List<String> partitioningKey : partitioningKeys) {
-                    IAType keyType = aRecordType.getSubFieldType(partitioningKey);
-                    ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-
-                    // If it is not a fixed length
-                    if (typeTrait.getFixedLength() < 0) {
-                        throw new AlgebricksException("The keyword or ngram index -" + indexName
-                                + " cannot be created on the dataset -" + datasetName
-                                + " due to its variable-length primary key field - " + partitioningKey);
-                    }
-
-                }
-            }
-
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                validateIfResourceIsActiveInFeed(dataverseName, datasetName);
-            } else {
-                // External dataset
-                // Check if the dataset is indexible
-                if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
-                    throw new AlgebricksException("dataset using "
-                            + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
-                            + " Adapter can't be indexed");
-                }
-                // check if the name of the index is valid
-                if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
-                    throw new AlgebricksException("external dataset index name is invalid");
-                }
-
-                // Check if the files index exist
-                filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
-                firstExternalDatasetIndex = (filesIndex == null);
-                // lock external dataset
-                ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
-                datasetLocked = true;
-                if (firstExternalDatasetIndex) {
-                    // verify that no one has created an index before we acquire the lock
-                    filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                            dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
-                    if (filesIndex != null) {
-                        ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
-                        firstExternalDatasetIndex = false;
-                        ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
-                    }
-                }
-                if (firstExternalDatasetIndex) {
-                    // Get snapshot from External File System
-                    externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
-                    // Add an entry for the files index
-                    filesIndex = new Index(dataverseName, datasetName,
-                            ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
-                            IMetadataEntity.PENDING_ADD_OP);
-                    MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                    // Add files to the external files index
-                    for (ExternalFile file : externalFilesSnapshot) {
-                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                    }
-                    // This is the first index for the external dataset, replicate the files index
-                    spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
-                            metadataProvider, true);
-                    if (spec == null) {
-                        throw new AsterixException(
-                                "Failed to create job spec for replicating Files Index For external dataset");
-                    }
-                    filesIndexReplicated = true;
-                    runJob(hcc, spec, true);
-                }
-            }
-
-            //check whether there exists another enforced index on the same field
-            if (stmtCreateIndex.isEnforced()) {
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
-                        metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-                for (Index index : indexes) {
-                    if (index.getKeyFieldNames().equals(indexFields)
-                            && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds())
-                        throw new AsterixException("Cannot create index " + indexName + " , enforced index "
-                                + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
-                                + "\" already exist");
-                }
-            }
-
-            //#. add a new index with PendingAddOp
-            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
-                    indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), false,
-                    IMetadataEntity.PENDING_ADD_OP);
-            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-
-            ARecordType enforcedType = null;
-            if (stmtCreateIndex.isEnforced()) {
-                enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, index);
-            }
-
-            //#. prepare to create the index artifact in NC.
-            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
-                    index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
-                    index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
-            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, enforcedType, metadataProvider);
-            if (spec == null) {
-                throw new AsterixException("Failed to create job spec for creating index '"
-                        + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-            //#. create the index artifact in NC.
-            runJob(hcc, spec, true);
-
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-            //#. load data into the index in NC.
-            cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
-                    index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
-                    index.getGramLength(), index.getIndexType());
-            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, enforcedType, metadataProvider);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            runJob(hcc, spec, true);
-
-            //#. begin new metadataTxn
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
-            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                    indexName);
-            index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
-            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            // add another new files index with PendingNoOp after deleting the index with PendingAddOp
-            if (firstExternalDatasetIndex) {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, filesIndex.getIndexName());
-                filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
-                MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                // update transaction timestamp
-                ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
-                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-            // If files index was replicated for external dataset, it should be cleaned up on NC side
-            if (filesIndexReplicated) {
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                        ExternalIndexingOperations.getFilesIndexName(datasetName));
-                try {
-                    JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                            metadataProvider, ds);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-                    runJob(hcc, jobSpec, true);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    if (bActiveTxn) {
-                        abort(e, e2, mdTxnCtx);
-                    }
-                }
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //   remove the index in NC
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                try {
-                    JobSpecification jobSpec = IndexOperations
-                            .buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
-
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    bActiveTxn = false;
-                    runJob(hcc, jobSpec, true);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    if (bActiveTxn) {
-                        abort(e, e2, mdTxnCtx);
-                    }
-                }
-
-                if (firstExternalDatasetIndex) {
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                    try {
-                        // Drop External Files from metadata
-                        MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    } catch (Exception e2) {
-                        e.addSuppressed(e2);
-                        abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is inconsistent state: pending files for("
-                                + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e);
-                    }
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                    try {
-                        // Drop the files index from metadata
-                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                                datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    } catch (Exception e2) {
-                        e.addSuppressed(e2);
-                        abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
-                                + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName)
-                                + ") couldn't be removed from the metadata", e);
-                    }
-                }
-                // remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                try {
-                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName, indexName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
-                }
-            }
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
-            if (datasetLocked) {
-                ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
-            }
-        }
-    }
-
-    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-        TypeDecl stmtCreateType = (TypeDecl) stmt;
-        String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
-        String typeName = stmtCreateType.getIdent().getValue();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
-        try {
-
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-            if (dv == null) {
-                throw new AlgebricksException("Unknown dataverse " + dataverseName);
-            }
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-            if (dt != null) {
-                if (!stmtCreateType.getIfNotExists()) {
-                    throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-                }
-            } else {
-                if (builtinTypeMap.get(typeName) != null) {
-                    throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-                } else {
-                    Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx,
-                            stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(), dataverseName);
-                    TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-                    IAType type = typeMap.get(typeSignature);
-                    MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
-                }
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
-        }
-    }
-
-    private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-        String dataverseName = stmtDelete.getDataverseName().getValue();
-
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
-        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
-        try {
-
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-            if (dv == null) {
-                if (stmtDelete.getIfExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
-                }
-            }
-
-            //# disconnect all feeds from any datasets in the dataverse.
-            List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
-                    .getActiveFeedConnections(null);
-            DisconnectFeedStatement disStmt = null;
-            Identifier dvId = new Identifier(dataverseName);
-            for (FeedConnectionId connection : activeFeedConnections) {
-                FeedId feedId = connection.getFeedId();
-                if (feedId.getDataverse().equals(dataverseName)) {
-                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
-                            connection.getDatasetName()));
-                    try {
-                        handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Disconnected feed " + feedId.getFeedName() + " from dataset "
-                                    + connection.getDatasetName());
-                        }
-                    } catch (Exception exception) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to disconnect feed " + feedId.getFeedName() + " from dataset "
-                                    + connection.getDatasetName() + ". Encountered exception " + exception);
-                        }
-                    }
-                }
-            }
-
-            //#. prepare jobs which will drop corresponding datasets with indexes.
-            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
-            for (int j = 0; j < datasets.size(); j++) {
-                String datasetName = datasets.get(j).getDatasetName();
-                DatasetType dsType = datasets.get(j).getDatasetType();
-                if (dsType == DatasetType.INTERNAL) {
-                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                            datasetName);
-                    for (int k = 0; k < indexes.size(); k++) {
-                        if (indexes.get(k).isSecondaryIndex()) {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
-                                    datasets.get(j)));
-                        }
-                    }
-
-                    CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-                    jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-                } else {
-                    // External dataset
-                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                            datasetName);
-                    for (int k = 0; k < indexes.size(); k++) {
-                        if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                                    metadataProvider, datasets.get(j)));
-                        } else {
-                            CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
-                                    datasets.get(j)));
-                        }
-                    }
-                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
-                }
-            }
-            jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
-            //#. mark PendingDropOp on the dataverse record by
-            //   first, deleting the dataverse record from the DATAVERSE_DATASET
-            //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
-                    IMetadataEntity.PENDING_DROP_OP));
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-            for (JobSpecification jobSpec : jobsToExecute) {
-                runJob(hcc, jobSpec, true);
-            }
-
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-            //#. finally, delete the dataverse.
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
-                activeDefaultDataverse = null;
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
-                    activeDefaultDataverse = null;
-                }
-
-                //#. execute compensation operations
-                //   remove the all indexes in NC
-                try {
-                    for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
-                    }
-                } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
-                    e.addSuppressed(e2);
-                }
-
-                //   remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                try {
-                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
-                            + ") couldn't be removed from the metadata", e);
-                }
-            }
-
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
-        }
-    }
-
-    private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-        DropStatement stmtDelete = (DropStatement) stmt;
-        String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
-        String datasetName = stmtDelete.getDatasetName().getValue();
-
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
-        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
-        try {
-
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            if (ds == null) {
-                if (stmtDelete.getIfExists()) {
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
-                } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
-                }
-            }
-
-            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-                List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
-                if (feedConnections != null && !feedConnections.isEmpty()) {
-                    for (FeedConnectionId connection : feedConnections) {
-                        Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
-                                connection);
-                        disconnectJobList.put(connection, p);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
-                                    + datasetName + " as dataset is being dropped");
-                        }
-                    }
-                }
-
-                //#. prepare jobs to drop the datatset and the indexes in NC
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (indexes.get(j).isSecondaryIndex()) {
-                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                indexes.get(j).getIndexName());
-                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-                    }
-                }
-                CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-                jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-
-                //#. mark the existing dataset as PendingDropOp
-                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(
-                        mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
-                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
-                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                //# disconnect the feeds
-                for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
-                    runJob(hcc, p.first, true);
-                }
-
-                //#. run the jobs
-                for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
-                }
-
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            } else {
-                // External dataset
-                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                //#. prepare jobs to drop the datatset and the indexes in NC
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                for (int j = 0; j < indexes.size(); j++) {
-                    if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                indexes.get(j).getIndexName());
-                        jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-                    } else {
-                        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                indexes.get(j).getIndexName());
-                        jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider,
-                                ds));
-                    }
-                }
-
-                //#. mark the existing dataset as PendingDropOp
-                MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-                MetadataManager.INSTANCE.addDataset(
-                        mdTxnCtx,
-                        new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
-                                .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
-                                .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                //#. run the jobs
-                for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
-                }
-                if (indexes.size() > 0) {
-                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                }
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            }
-
-            //#. finally, delete the dataset.
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-            // Drop the associated nodegroup
-            String nodegroup = ds.getNodeGroupName();
-            if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
-                MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
-            }
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //   remove the all indexes in NC
-                try {
-                    for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
-                    }
-                } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
-                    e.addSuppressed(e2);
-                }
-
-                //   remove the record from the metadata.
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                try {
-                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e2) {
-                    e.addSuppressed(e2);
-                    abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
-                            + "." + datasetName + ") couldn't be removed from the metadata", e);
-                }
-            }
-
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
-        }
-    }
-
-    private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
-
-        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-        String datasetName = stmtIndexDrop.getDatasetName().getValue();
-        String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
-        ProgressState progress = ProgressState.NO_PROGRESS;
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
-        String indexName = null;
-        // For external index
-        boolean dropFilesIndex = false;
-        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
-        try {
-
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            if (ds == null) {
-                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-                        + dataverseName);
-            }
-
-            List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
-            boolean resourceInUse = false;
-            if (feedConnections != null && !feedConnections.isEmpty()) {
-                StringBuilder builder = new StringBuilder();
-                for (FeedConnectionId connection : feedConnections) {
-                    if (connection.getDatasetName().equals(datasetName)) {
-                        resourceInUse = true;
-                        builder.append(connection + "\n");
-                    }
-                }
-                if (resourceInUse) {
-                    throw new AsterixException("Dataset" + datasetName
-                            + " is currently being fed into by the following feeds " + "." + builder.toString()
-                            + "\nOperation not supported.");
-                }
-            }
-
-            if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                indexName = stmtIndexDrop.getIndexName().getValue();
-                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                if (index == null) {
-                    if (stmtIndexDrop.getIfExists()) {
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
-                    } else {
-                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
-                    }
-                }
-                //#. prepare a job to drop the index in NC.
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-
-                //#. mark PendingDropOp on the existing index
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(
-                        mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
-                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
-                //#. commit the existing transaction before calling runJob.
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
-                }
-
-                //#. begin a new transaction
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-                //#. finally, delete the existing index
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-            } else {
-                // External dataset
-                indexName = stmtIndexDrop.getIndexName().getValue();
-                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                if (index == null) {
-                    if (stmtIndexDrop.getIfExists()) {
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
-                    } else {
-                        throw new AlgebricksException("There is no index with this name " + indexName + ".");
-                    }
-                } else if (ExternalIndexingOperations.isFileIndex(index)) {
-                    throw new AlgebricksException("Dropping a dataset's files index is not allowed.");
-                }
-                //#. prepare a job to drop the index in NC.
-                CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-                List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                        datasetName);
-                if (datasetIndexes.size() == 2) {
-                    dropFilesIndex = true;
-                    // only one index + the files index, we need to delete both of the indexes
-                    for (Index externalIndex : datasetIndexes) {
-                        if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
-                            cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-                                    externalIndex.getIndexName());
-                            jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                                    metadataProvider, ds));
-                            //#. mark PendingDropOp on the existing files index
-                            MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
-                                    externalIndex.getIndexName());
-                            MetadataManager.INSTANCE.addIndex(
-                                    mdTxnCtx,
-                                    new Index(dataverseName, datasetName, externalIndex.getIndexName(), externalIndex
-                                            .getIndexType(), externalIndex.getKeyFieldNames(),
-                                            index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), externalIndex
-                                                    .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-                        }
-                    }
-                }
-
-                //#. mark PendingDropOp on the existing index
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                MetadataManager.INSTANCE.addIndex(
-                        mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
-                                        .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
-                //#. commit the existing transaction before calling runJob.
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                bActiveTxn = false;
-                progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
-                for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
-                }
-
-                //#. begin a new transaction
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                bActiveTxn = true;
-                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-                //#. finally, delete the existing index
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-                if (dropFilesIndex) {
-                    // delete the files index too
-                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
-                            ExternalIndexingOperations.getFilesIndexName(datasetName));
-                    MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
-                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                }
-            }
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-
-            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //   remove the all indexes in NC
-                try {
-                    for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
-                    }
-                } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
-                    e.addSuppressed(e2);
-        

<TRUNCATED>