You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:06 UTC
[18/51] [partial] incubator-asterixdb git commit: Change folder
structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
deleted file mode 100644
index 93917cc..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ /dev/null
@@ -1,3069 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.aql.translator;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.asterix.api.common.APIFramework;
-import edu.uci.ics.asterix.api.common.Job;
-import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
-import edu.uci.ics.asterix.aql.base.Statement;
-import edu.uci.ics.asterix.aql.expression.CompactStatement;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
-import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
-import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
-import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DatasetDecl;
-import edu.uci.ics.asterix.aql.expression.DataverseDecl;
-import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
-import edu.uci.ics.asterix.aql.expression.DeleteStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DropStatement;
-import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
-import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
-import edu.uci.ics.asterix.aql.expression.FunctionDecl;
-import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
-import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
-import edu.uci.ics.asterix.aql.expression.Identifier;
-import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
-import edu.uci.ics.asterix.aql.expression.InsertStatement;
-import edu.uci.ics.asterix.aql.expression.InternalDetailsDecl;
-import edu.uci.ics.asterix.aql.expression.LoadStatement;
-import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
-import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
-import edu.uci.ics.asterix.aql.expression.Query;
-import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
-import edu.uci.ics.asterix.aql.expression.SetStatement;
-import edu.uci.ics.asterix.aql.expression.SubscribeFeedStatement;
-import edu.uci.ics.asterix.aql.expression.TypeDecl;
-import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
-import edu.uci.ics.asterix.aql.expression.TypeExpression;
-import edu.uci.ics.asterix.aql.expression.WriteStatement;
-import edu.uci.ics.asterix.aql.util.FunctionUtils;
-import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
-import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
-import edu.uci.ics.asterix.common.feeds.FeedId;
-import edu.uci.ics.asterix.common.feeds.FeedJointKey;
-import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
-import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
-import edu.uci.ics.asterix.common.feeds.api.IFeedJoint.FeedJointType;
-import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
-import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.feeds.CentralFeedManager;
-import edu.uci.ics.asterix.feeds.FeedJoint;
-import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
-import edu.uci.ics.asterix.file.DatasetOperations;
-import edu.uci.ics.asterix.file.DataverseOperations;
-import edu.uci.ics.asterix.file.ExternalIndexingOperations;
-import edu.uci.ics.asterix.file.FeedOperations;
-import edu.uci.ics.asterix.file.IndexOperations;
-import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
-import edu.uci.ics.asterix.metadata.IDatasetDetails;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
-import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
-import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.Datatype;
-import edu.uci.ics.asterix.metadata.entities.Dataverse;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
-import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.entities.Function;
-import edu.uci.ics.asterix.metadata.entities.Index;
-import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
-import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
-import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
-import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.TypeSignature;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
-import edu.uci.ics.asterix.result.ResultReader;
-import edu.uci.ics.asterix.result.ResultUtils;
-import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
-import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
-import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
-import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
-import edu.uci.ics.asterix.translator.TypeTranslator;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
-import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-
-/*
- * Provides functionality for executing a batch of AQL statements (queries included)
- * sequentially.
- */
-public class AqlTranslator extends AbstractAqlTranslator {
-
- private static Logger LOGGER = Logger.getLogger(AqlTranslator.class.getName());
-
- private enum ProgressState {
- NO_PROGRESS,
- ADDED_PENDINGOP_RECORD_TO_METADATA
- }
-
- public static enum ResultDelivery {
- SYNC,
- ASYNC,
- ASYNC_DEFERRED
- }
-
- public static final boolean IS_DEBUG_MODE = false;//true
- private final List<Statement> aqlStatements;
- private final SessionConfig sessionConfig;
- private Dataverse activeDefaultDataverse;
- private final List<FunctionDecl> declaredFunctions;
-
- public AqlTranslator(List<Statement> aqlStatements, SessionConfig conf) throws MetadataException, AsterixException {
- this.aqlStatements = aqlStatements;
- this.sessionConfig = conf;
- declaredFunctions = getDeclaredFunctions(aqlStatements);
- }
-
- private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
- List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
- for (Statement st : statements) {
- if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
- functionDecls.add((FunctionDecl) st);
- }
- }
- return functionDecls;
- }
-
- /**
- * Compiles and submits for execution a list of AQL statements.
- *
- * @param hcc
- * A Hyracks client connection that is used to submit a jobspec to Hyracks.
- * @param hdc
- * A Hyracks dataset client object that is used to read the results.
- * @param resultDelivery
- * True if the results should be read asynchronously or false if we should wait for results to be read.
- * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
- * @throws Exception
- */
- public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
- throws Exception {
- int resultSetIdCounter = 0;
- FileSplit outputFile = null;
- IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
- IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
- Map<String, String> config = new HashMap<String, String>();
-
- for (Statement stmt : aqlStatements) {
- validateOperation(activeDefaultDataverse, stmt);
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
- CentralFeedManager.getInstance());
- metadataProvider.setWriterFactory(writerFactory);
- metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
- metadataProvider.setOutputFile(outputFile);
- metadataProvider.setConfig(config);
- switch (stmt.getKind()) {
- case SET: {
- handleSetStatement(metadataProvider, stmt, config);
- break;
- }
- case DATAVERSE_DECL: {
- activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt);
- break;
- }
- case CREATE_DATAVERSE: {
- handleCreateDataverseStatement(metadataProvider, stmt);
- break;
- }
- case DATASET_DECL: {
- handleCreateDatasetStatement(metadataProvider, stmt, hcc);
- break;
- }
- case CREATE_INDEX: {
- handleCreateIndexStatement(metadataProvider, stmt, hcc);
- break;
- }
- case TYPE_DECL: {
- handleCreateTypeStatement(metadataProvider, stmt);
- break;
- }
- case NODEGROUP_DECL: {
- handleCreateNodeGroupStatement(metadataProvider, stmt);
- break;
- }
- case DATAVERSE_DROP: {
- handleDataverseDropStatement(metadataProvider, stmt, hcc);
- break;
- }
- case DATASET_DROP: {
- handleDatasetDropStatement(metadataProvider, stmt, hcc);
- break;
- }
- case INDEX_DROP: {
- handleIndexDropStatement(metadataProvider, stmt, hcc);
- break;
- }
- case TYPE_DROP: {
- handleTypeDropStatement(metadataProvider, stmt);
- break;
- }
- case NODEGROUP_DROP: {
- handleNodegroupDropStatement(metadataProvider, stmt);
- break;
- }
-
- case CREATE_FUNCTION: {
- handleCreateFunctionStatement(metadataProvider, stmt);
- break;
- }
-
- case FUNCTION_DROP: {
- handleFunctionDropStatement(metadataProvider, stmt);
- break;
- }
-
- case LOAD: {
- handleLoadStatement(metadataProvider, stmt, hcc);
- break;
- }
- case INSERT: {
- handleInsertStatement(metadataProvider, stmt, hcc);
- break;
- }
- case DELETE: {
- handleDeleteStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case CREATE_PRIMARY_FEED:
- case CREATE_SECONDARY_FEED: {
- handleCreateFeedStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case DROP_FEED: {
- handleDropFeedStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case DROP_FEED_POLICY: {
- handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case CONNECT_FEED: {
- handleConnectFeedStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case DISCONNECT_FEED: {
- handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case SUBSCRIBE_FEED: {
- handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case CREATE_FEED_POLICY: {
- handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case QUERY: {
- metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
- metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
- || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
- handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery);
- break;
- }
-
- case COMPACT: {
- handleCompactStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case EXTERNAL_DATASET_REFRESH: {
- handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
- break;
- }
-
- case WRITE: {
- Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt);
- if (result.first != null) {
- writerFactory = result.first;
- }
- outputFile = result.second;
- break;
- }
-
- case RUN: {
- handleRunStatement(metadataProvider, stmt, hcc);
- break;
- }
- }
- }
- }
-
- private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config)
- throws RemoteException, ACIDException {
- SetStatement ss = (SetStatement) stmt;
- String pname = ss.getPropName();
- String pvalue = ss.getPropValue();
- config.put(pname, pvalue);
- }
-
- private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws InstantiationException, IllegalAccessException, ClassNotFoundException {
- WriteStatement ws = (WriteStatement) stmt;
- File f = new File(ws.getFileName());
- FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
- IAWriterFactory writerFactory = null;
- if (ws.getWriterClassName() != null) {
- writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
- }
- return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
- }
-
- private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
- throws Exception {
- DataverseDecl dvd = (DataverseDecl) stmt;
- String dvName = dvd.getDataverseName().getValue();
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
- try {
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv == null) {
- throw new MetadataException("Unknown dataverse " + dvName);
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return dv;
- } catch (Exception e) {
- abort(e, e, mdTxnCtx);
- throw new MetadataException(e);
- } finally {
- MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
- }
- }
-
- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-
- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
- String dvName = stmtCreateDataverse.getDataverseName().getValue();
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
- try {
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv != null) {
- if (stmtCreateDataverse.getIfNotExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
- }
- }
- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
- stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- abort(e, e, mdTxnCtx);
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
- }
- }
-
- private void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
- MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws AsterixException, Exception {
- CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
- MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
- if (compactionPolicyEntity == null) {
- throw new AsterixException("Unknown compaction policy: " + compactionPolicy);
- }
- String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName();
- ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(
- compactionPolicyFactoryClassName).newInstance();
- if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
- throw new AsterixException("The correlated-prefix merge policy cannot be used with external dataset.");
- }
- if (compactionPolicyProperties == null) {
- if (mergePolicyFactory.getName().compareTo("no-merge") != 0) {
- throw new AsterixException("Compaction policy properties are missing.");
- }
- } else {
- for (Map.Entry<String, String> entry : compactionPolicyProperties.entrySet()) {
- if (!mergePolicyFactory.getPropertiesNames().contains(entry.getKey())) {
- throw new AsterixException("Invalid compaction policy property: " + entry.getKey());
- }
- }
- for (String p : mergePolicyFactory.getPropertiesNames()) {
- if (!compactionPolicyProperties.containsKey(p)) {
- throw new AsterixException("Missing compaction policy property: " + p);
- }
- }
- }
- }
-
- private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws AsterixException, Exception {
-
- ProgressState progress = ProgressState.NO_PROGRESS;
- DatasetDecl dd = (DatasetDecl) stmt;
- String dataverseName = getActiveDataverse(dd.getDataverse());
- String datasetName = dd.getName().getValue();
- DatasetType dsType = dd.getDatasetType();
- String itemTypeName = dd.getItemTypeName().getValue();
- Identifier ngNameId = dd.getNodegroupName();
- String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
- String compactionPolicy = dd.getCompactionPolicy();
- Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
- boolean defaultCompactionPolicy = (compactionPolicy == null);
- boolean temp = dd.getDatasetDetailsDecl().isTemp();
-
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, dataverseName + "." + itemTypeName,
- nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
- Dataset dataset = null;
- try {
-
- IDatasetDetails datasetDetails = null;
- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- if (ds != null) {
- if (dd.getIfNotExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
- }
- }
- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
- itemTypeName);
- if (dt == null) {
- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
- }
- String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName,
- mdTxnCtx);
-
- if (compactionPolicy == null) {
- compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
- compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
- } else {
- validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
- }
- switch (dd.getDatasetType()) {
- case INTERNAL: {
- IAType itemType = dt.getDatatype();
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only partition ARecord's.");
- }
- List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
- .getPartitioningExprs();
- boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
- ARecordType aRecordType = (ARecordType) itemType;
- List<IAType> partitioningTypes = aRecordType.validatePartitioningExpressions(partitioningExprs,
- autogenerated);
-
- List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
- if (filterField != null) {
- aRecordType.validateFilterField(filterField);
- }
- if (compactionPolicy == null) {
- if (filterField != null) {
- // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the
- // correlated-prefix as the default merge policy.
- compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
- compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
- }
- }
- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
- partitioningTypes, autogenerated, filterField, temp);
- break;
- }
- case EXTERNAL: {
- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-
- datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
- ExternalDatasetTransactionState.COMMIT);
- break;
- }
-
- }
-
- //#. initialize DatasetIdFactory if it is not initialized.
- if (!DatasetIdFactory.isInitialized()) {
- DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
- }
-
- //#. add a new dataset with PendingAddOp
- dataset = new Dataset(dataverseName, datasetName, itemTypeName, ngName, compactionPolicy,
- compactionPolicyProperties, datasetDetails, dd.getHints(), dsType,
- DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-
- if (dd.getDatasetType() == DatasetType.INTERNAL) {
- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
- dataverseName);
- JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
- metadataProvider);
-
- //#. make metadataTxn commit before calling runJob.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- //#. runJob
- runJob(hcc, jobSpec, true);
-
- //#. begin new metadataTxn
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- }
-
- //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
- MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
- dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
- }
-
- if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-
- //#. execute compensation operations
- // remove the index in NC
- // [Notice]
- // As long as we updated(and committed) metadata, we should remove any effect of the job
- // because an exception occurs during runJob.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
- try {
- JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
-
- runJob(hcc, jobSpec, true);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- if (bActiveTxn) {
- abort(e, e2, mdTxnCtx);
- }
- }
-
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
- + "." + datasetName + ") couldn't be removed from the metadata", e);
- }
- }
-
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, dataverseName + "." + itemTypeName,
- nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
- }
- }
-
- private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
- List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
- boolean resourceInUse = false;
- StringBuilder builder = new StringBuilder();
-
- if (activeFeedConnections != null && !activeFeedConnections.isEmpty()) {
- for (FeedConnectionId connId : activeFeedConnections) {
- if (connId.getDatasetName().equals(datasetName)) {
- resourceInUse = true;
- builder.append(connId + "\n");
- }
- }
- }
-
- if (resourceInUse) {
- throw new AsterixException("Dataset " + datasetName + " is currently being "
- + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
- }
-
- }
-
- private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
- if (ngNameId != null) {
- return ngNameId.getValue();
- }
- String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
- if (hintValue == null) {
- return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
- } else {
- return (dataverse + ":" + dd.getName().getValue());
- }
- }
-
- private String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
- throws AsterixException {
- int nodegroupCardinality = -1;
- String nodegroupName;
- String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
- if (hintValue == null) {
- nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
- return nodegroupName;
- } else {
- int numChosen = 0;
- boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
- dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
- if (!valid) {
- throw new AsterixException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
- } else {
- nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
- }
- Set<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
- Set<String> nodeNamesClone = new HashSet<String>();
- for (String node : nodeNames) {
- nodeNamesClone.add(node);
- }
- String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
- List<String> selectedNodes = new ArrayList<String>();
- selectedNodes.add(metadataNodeName);
- numChosen++;
- nodeNamesClone.remove(metadataNodeName);
-
- if (numChosen < nodegroupCardinality) {
- Random random = new Random();
- String[] nodes = nodeNamesClone.toArray(new String[] {});
- int[] b = new int[nodeNamesClone.size()];
- for (int i = 0; i < b.length; i++) {
- b[i] = i;
- }
-
- for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
- int selected = i + random.nextInt(nodeNamesClone.size() - i);
- int selNodeIndex = b[selected];
- selectedNodes.add(nodes[selNodeIndex]);
- int temp = b[0];
- b[0] = b[selected];
- b[selected] = temp;
- }
- }
- nodegroupName = dataverse + ":" + dd.getName().getValue();
- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
- return nodegroupName;
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- ProgressState progress = ProgressState.NO_PROGRESS;
- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
- String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
- String datasetName = stmtCreateIndex.getDatasetName().getValue();
-
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
- String indexName = null;
- JobSpecification spec = null;
- Dataset ds = null;
- // For external datasets
- ArrayList<ExternalFile> externalFilesSnapshot = null;
- boolean firstExternalDatasetIndex = false;
- boolean filesIndexReplicated = false;
- Index filesIndex = null;
- boolean datasetLocked = false;
- try {
- ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- if (ds == null) {
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName);
- }
-
- indexName = stmtCreateIndex.getIndexName().getValue();
- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, indexName);
-
- String itemTypeName = ds.getItemTypeName();
- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
- itemTypeName);
- IAType itemType = dt.getDatatype();
- ARecordType aRecordType = (ARecordType) itemType;
-
- List<List<String>> indexFields = new ArrayList<List<String>>();
- List<IAType> indexFieldTypes = new ArrayList<IAType>();
- for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) {
- IAType fieldType = null;
- boolean isOpen = aRecordType.isOpen();
- ARecordType subType = aRecordType;
- int i = 0;
- if (fieldExpr.first.size() > 1 && !isOpen) {
- for (; i < fieldExpr.first.size() - 1;) {
- subType = (ARecordType) subType.getFieldType(fieldExpr.first.get(i));
- i++;
- if (subType.isOpen()) {
- isOpen = true;
- break;
- };
- }
- }
- if (fieldExpr.second == null) {
- fieldType = subType.getSubFieldType(fieldExpr.first.subList(i, fieldExpr.first.size()));
- } else {
- if (!stmtCreateIndex.isEnforced())
- throw new AlgebricksException("Cannot create typed index on \"" + fieldExpr.first
- + "\" field without enforcing it's type");
- if (!isOpen)
- throw new AlgebricksException("Typed index on \"" + fieldExpr.first
- + "\" field could be created only for open datatype");
- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
- indexName, dataverseName);
- TypeSignature typeSignature = new TypeSignature(dataverseName, indexName);
- fieldType = typeMap.get(typeSignature);
- }
- if (fieldType == null)
- throw new AlgebricksException("Unknown type " + fieldExpr.second);
-
- indexFields.add(fieldExpr.first);
- indexFieldTypes.add(fieldType);
- }
-
- aRecordType.validateKeyFields(indexFields, indexFieldTypes, stmtCreateIndex.getIndexType());
-
- if (idx != null) {
- if (stmtCreateIndex.getIfNotExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
- }
- }
-
- // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key.
- // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop.
- if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
- || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
- || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
- List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(ds);
- for (List<String> partitioningKey : partitioningKeys) {
- IAType keyType = aRecordType.getSubFieldType(partitioningKey);
- ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-
- // If it is not a fixed length
- if (typeTrait.getFixedLength() < 0) {
- throw new AlgebricksException("The keyword or ngram index -" + indexName
- + " cannot be created on the dataset -" + datasetName
- + " due to its variable-length primary key field - " + partitioningKey);
- }
-
- }
- }
-
- if (ds.getDatasetType() == DatasetType.INTERNAL) {
- validateIfResourceIsActiveInFeed(dataverseName, datasetName);
- } else {
- // External dataset
- // Check if the dataset is indexible
- if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
- throw new AlgebricksException("dataset using "
- + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
- + " Adapter can't be indexed");
- }
- // check if the name of the index is valid
- if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
- throw new AlgebricksException("external dataset index name is invalid");
- }
-
- // Check if the files index exist
- filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
- firstExternalDatasetIndex = (filesIndex == null);
- // lock external dataset
- ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
- datasetLocked = true;
- if (firstExternalDatasetIndex) {
- // verify that no one has created an index before we acquire the lock
- filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
- dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
- if (filesIndex != null) {
- ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
- firstExternalDatasetIndex = false;
- ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
- }
- }
- if (firstExternalDatasetIndex) {
- // Get snapshot from External File System
- externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
- // Add an entry for the files index
- filesIndex = new Index(dataverseName, datasetName,
- ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
- ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES,
- ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
- IMetadataEntity.PENDING_ADD_OP);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
- // Add files to the external files index
- for (ExternalFile file : externalFilesSnapshot) {
- MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
- }
- // This is the first index for the external dataset, replicate the files index
- spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
- metadataProvider, true);
- if (spec == null) {
- throw new AsterixException(
- "Failed to create job spec for replicating Files Index For external dataset");
- }
- filesIndexReplicated = true;
- runJob(hcc, spec, true);
- }
- }
-
- //check whether there exists another enforced index on the same field
- if (stmtCreateIndex.isEnforced()) {
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
- metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
- for (Index index : indexes) {
- if (index.getKeyFieldNames().equals(indexFields)
- && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds())
- throw new AsterixException("Cannot create index " + indexName + " , enforced index "
- + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',')
- + "\" already exist");
- }
- }
-
- //#. add a new index with PendingAddOp
- Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
- indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), false,
- IMetadataEntity.PENDING_ADD_OP);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-
- ARecordType enforcedType = null;
- if (stmtCreateIndex.isEnforced()) {
- enforcedType = IntroduceSecondaryIndexInsertDeleteRule.createEnforcedType(aRecordType, index);
- }
-
- //#. prepare to create the index artifact in NC.
- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
- index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
- index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
- spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, enforcedType, metadataProvider);
- if (spec == null) {
- throw new AsterixException("Failed to create job spec for creating index '"
- + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
-
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- //#. create the index artifact in NC.
- runJob(hcc, spec, true);
-
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- //#. load data into the index in NC.
- cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
- index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
- index.getGramLength(), index.getIndexType());
- spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, enforcedType, metadataProvider);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
-
- runJob(hcc, spec, true);
-
- //#. begin new metadataTxn
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
- indexName);
- index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
- // add another new files index with PendingNoOp after deleting the index with PendingAddOp
- if (firstExternalDatasetIndex) {
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, filesIndex.getIndexName());
- filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
- // update transaction timestamp
- ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
- MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
- } catch (Exception e) {
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
- }
- // If files index was replicated for external dataset, it should be cleaned up on NC side
- if (filesIndexReplicated) {
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- ExternalIndexingOperations.getFilesIndexName(datasetName));
- try {
- JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
- metadataProvider, ds);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- runJob(hcc, jobSpec, true);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- if (bActiveTxn) {
- abort(e, e2, mdTxnCtx);
- }
- }
- }
-
- if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the index in NC
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- try {
- JobSpecification jobSpec = IndexOperations
- .buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds);
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- runJob(hcc, jobSpec, true);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- if (bActiveTxn) {
- abort(e, e2, mdTxnCtx);
- }
- }
-
- if (firstExternalDatasetIndex) {
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- // Drop External Files from metadata
- MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent state: pending files for("
- + dataverseName + "." + datasetName + ") couldn't be removed from the metadata", e);
- }
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- // Drop the files index from metadata
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
- + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName)
- + ") couldn't be removed from the metadata", e);
- }
- }
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName, indexName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is in inconsistent state: pending index(" + dataverseName
- + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
- }
- }
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
- if (datasetLocked) {
- ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
- }
- }
- }
-
- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
- TypeDecl stmtCreateType = (TypeDecl) stmt;
- String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
- String typeName = stmtCreateType.getIdent().getValue();
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
- try {
-
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
- if (dv == null) {
- throw new AlgebricksException("Unknown dataverse " + dataverseName);
- }
- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
- if (dt != null) {
- if (!stmtCreateType.getIfNotExists()) {
- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
- }
- } else {
- if (builtinTypeMap.get(typeName) != null) {
- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
- } else {
- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx,
- stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(), dataverseName);
- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
- IAType type = typeMap.get(typeSignature);
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
- }
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- abort(e, e, mdTxnCtx);
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
- }
- }
-
- private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- String dataverseName = stmtDelete.getDataverseName().getValue();
-
- ProgressState progress = ProgressState.NO_PROGRESS;
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
- List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
- try {
-
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
- if (dv == null) {
- if (stmtDelete.getIfExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("There is no dataverse with this name " + dataverseName + ".");
- }
- }
-
- //# disconnect all feeds from any datasets in the dataverse.
- List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
- .getActiveFeedConnections(null);
- DisconnectFeedStatement disStmt = null;
- Identifier dvId = new Identifier(dataverseName);
- for (FeedConnectionId connection : activeFeedConnections) {
- FeedId feedId = connection.getFeedId();
- if (feedId.getDataverse().equals(dataverseName)) {
- disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
- connection.getDatasetName()));
- try {
- handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnected feed " + feedId.getFeedName() + " from dataset "
- + connection.getDatasetName());
- }
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to disconnect feed " + feedId.getFeedName() + " from dataset "
- + connection.getDatasetName() + ". Encountered exception " + exception);
- }
- }
- }
- }
-
- //#. prepare jobs which will drop corresponding datasets with indexes.
- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
- for (int j = 0; j < datasets.size(); j++) {
- String datasetName = datasets.get(j).getDatasetName();
- DatasetType dsType = datasets.get(j).getDatasetType();
- if (dsType == DatasetType.INTERNAL) {
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
- datasetName);
- for (int k = 0; k < indexes.size(); k++) {
- if (indexes.get(k).isSecondaryIndex()) {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexes.get(k).getIndexName());
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
- datasets.get(j)));
- }
- }
-
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
- jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
- } else {
- // External dataset
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
- datasetName);
- for (int k = 0; k < indexes.size(); k++) {
- if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexes.get(k).getIndexName());
- jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
- metadataProvider, datasets.get(j)));
- } else {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexes.get(k).getIndexName());
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
- datasets.get(j)));
- }
- }
- ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
- }
- }
- jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
- //#. mark PendingDropOp on the dataverse record by
- // first, deleting the dataverse record from the DATAVERSE_DATASET
- // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(),
- IMetadataEntity.PENDING_DROP_OP));
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
-
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- //#. finally, delete the dataverse.
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
- activeDefaultDataverse = null;
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
- }
-
- if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
- activeDefaultDataverse = null;
- }
-
- //#. execute compensation operations
- // remove the all indexes in NC
- try {
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
- } catch (Exception e2) {
- //do no throw exception since still the metadata needs to be compensated.
- e.addSuppressed(e2);
- }
-
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- try {
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent state: pending dataverse(" + dataverseName
- + ") couldn't be removed from the metadata", e);
- }
- }
-
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
- }
- }
-
- private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- DropStatement stmtDelete = (DropStatement) stmt;
- String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
- String datasetName = stmtDelete.getDatasetName().getValue();
-
- ProgressState progress = ProgressState.NO_PROGRESS;
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
- List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
- try {
-
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds == null) {
- if (stmtDelete.getIfExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("There is no dataset with this name " + datasetName
- + " in dataverse " + dataverseName + ".");
- }
- }
-
- Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
- if (ds.getDatasetType() == DatasetType.INTERNAL) {
- // prepare job spec(s) that would disconnect any active feeds involving the dataset.
- List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
- if (feedConnections != null && !feedConnections.isEmpty()) {
- for (FeedConnectionId connection : feedConnections) {
- Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
- connection);
- disconnectJobList.put(connection, p);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
- + datasetName + " as dataset is being dropped");
- }
- }
- }
-
- //#. prepare jobs to drop the datatset and the indexes in NC
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- for (int j = 0; j < indexes.size(); j++) {
- if (indexes.get(j).isSecondaryIndex()) {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexes.get(j).getIndexName());
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
- }
- }
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
- jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-
- //#. mark the existing dataset as PendingDropOp
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- MetadataManager.INSTANCE.addDataset(
- mdTxnCtx,
- new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
- .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
- .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- //# disconnect the feeds
- for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
- runJob(hcc, p.first, true);
- }
-
- //#. run the jobs
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
-
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- } else {
- // External dataset
- ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
- //#. prepare jobs to drop the datatset and the indexes in NC
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- for (int j = 0; j < indexes.size(); j++) {
- if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexes.get(j).getIndexName());
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
- } else {
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- indexes.get(j).getIndexName());
- jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider,
- ds));
- }
- }
-
- //#. mark the existing dataset as PendingDropOp
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- MetadataManager.INSTANCE.addDataset(
- mdTxnCtx,
- new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getNodeGroupName(), ds
- .getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds
- .getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- //#. run the jobs
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
- if (indexes.size() > 0) {
- ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
- }
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- }
-
- //#. finally, delete the dataset.
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- // Drop the associated nodegroup
- String nodegroup = ds.getNodeGroupName();
- if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
- }
-
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
- }
-
- if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the all indexes in NC
- try {
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
- } catch (Exception e2) {
- //do no throw exception since still the metadata needs to be compensated.
- e.addSuppressed(e2);
- }
-
- // remove the record from the metadata.
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
- datasetName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
- + "." + datasetName + ") couldn't be removed from the metadata", e);
- }
- }
-
- throw e;
- } finally {
- MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
- }
- }
-
- private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
-
- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
- String datasetName = stmtIndexDrop.getDatasetName().getValue();
- String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
- ProgressState progress = ProgressState.NO_PROGRESS;
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- boolean bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
- String indexName = null;
- // For external index
- boolean dropFilesIndex = false;
- List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
- try {
-
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds == null) {
- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
- + dataverseName);
- }
-
- List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
- boolean resourceInUse = false;
- if (feedConnections != null && !feedConnections.isEmpty()) {
- StringBuilder builder = new StringBuilder();
- for (FeedConnectionId connection : feedConnections) {
- if (connection.getDatasetName().equals(datasetName)) {
- resourceInUse = true;
- builder.append(connection + "\n");
- }
- }
- if (resourceInUse) {
- throw new AsterixException("Dataset" + datasetName
- + " is currently being fed into by the following feeds " + "." + builder.toString()
- + "\nOperation not supported.");
- }
- }
-
- if (ds.getDatasetType() == DatasetType.INTERNAL) {
- indexName = stmtIndexDrop.getIndexName().getValue();
- Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- if (index == null) {
- if (stmtIndexDrop.getIfExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("There is no index with this name " + indexName + ".");
- }
- }
- //#. prepare a job to drop the index in NC.
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
-
- //#. mark PendingDropOp on the existing index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- MetadataManager.INSTANCE.addIndex(
- mdTxnCtx,
- new Index(dataverseName, datasetName, indexName, index.getIndexType(),
- index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
- .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
- //#. commit the existing transaction before calling runJob.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
-
- //#. begin a new transaction
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- //#. finally, delete the existing index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- } else {
- // External dataset
- indexName = stmtIndexDrop.getIndexName().getValue();
- Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- if (index == null) {
- if (stmtIndexDrop.getIfExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return;
- } else {
- throw new AlgebricksException("There is no index with this name " + indexName + ".");
- }
- } else if (ExternalIndexingOperations.isFileIndex(index)) {
- throw new AlgebricksException("Dropping a dataset's files index is not allowed.");
- }
- //#. prepare a job to drop the index in NC.
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
- jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
- datasetName);
- if (datasetIndexes.size() == 2) {
- dropFilesIndex = true;
- // only one index + the files index, we need to delete both of the indexes
- for (Index externalIndex : datasetIndexes) {
- if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
- cds = new CompiledIndexDropStatement(dataverseName, datasetName,
- externalIndex.getIndexName());
- jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
- metadataProvider, ds));
- //#. mark PendingDropOp on the existing files index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
- externalIndex.getIndexName());
- MetadataManager.INSTANCE.addIndex(
- mdTxnCtx,
- new Index(dataverseName, datasetName, externalIndex.getIndexName(), externalIndex
- .getIndexType(), externalIndex.getKeyFieldNames(),
- index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), externalIndex
- .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
- }
- }
- }
-
- //#. mark PendingDropOp on the existing index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- MetadataManager.INSTANCE.addIndex(
- mdTxnCtx,
- new Index(dataverseName, datasetName, indexName, index.getIndexType(),
- index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index
- .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-
- //#. commit the existing transaction before calling runJob.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- bActiveTxn = false;
- progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
-
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
-
- //#. begin a new transaction
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- bActiveTxn = true;
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
- //#. finally, delete the existing index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
- if (dropFilesIndex) {
- // delete the files index too
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
- ExternalIndexingOperations.getFilesIndexName(datasetName));
- MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
- ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
- }
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
- } catch (Exception e) {
- if (bActiveTxn) {
- abort(e, e, mdTxnCtx);
- }
-
- if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
- //#. execute compensation operations
- // remove the all indexes in NC
- try {
- for (JobSpecification jobSpec : jobsToExecute) {
- runJob(hcc, jobSpec, true);
- }
- } catch (Exception e2) {
- //do no throw exception since still the metadata needs to be compensated.
- e.addSuppressed(e2);
-
<TRUNCATED>