You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/04/23 18:13:32 UTC
[1/2] hive git commit: HIVE-17647 : DDLTask.generateAddMmTasks(Table
tbl) and other random code should not start transactions (Sergey Shelukhin,
reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/branch-3 871c05bcc -> 0e1c66759
refs/heads/master bdb0457af -> 622440199
HIVE-17647 : DDLTask.generateAddMmTasks(Table tbl) and other random code should not start transactions (Sergey Shelukhin, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/62244019
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/62244019
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/62244019
Branch: refs/heads/master
Commit: 622440199c2207616356c03d9bf6eb94e8f6bd99
Parents: bdb0457
Author: sergey <se...@apache.org>
Authored: Mon Apr 23 11:07:40 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Apr 23 11:07:40 2018 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 2 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 23 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 17 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 31 +-
.../hadoop/hive/ql/exec/ImportCommitTask.java | 62 ----
.../hadoop/hive/ql/exec/ImportCommitWork.java | 54 ----
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 2 -
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 2 +-
.../hive/ql/parse/BaseSemanticAnalyzer.java | 5 +
.../hive/ql/parse/DDLSemanticAnalyzer.java | 18 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 37 +--
.../ql/parse/ReplicationSemanticAnalyzer.java | 4 +-
.../hadoop/hive/ql/plan/AlterTableDesc.java | 109 ++++---
.../org/apache/hadoop/hive/ql/plan/DDLDesc.java | 6 +
.../hive/ql/lockmgr/TestDbTxnManager2.java | 18 ++
.../results/clientpositive/mm_conversions.q.out | 309 -------------------
17 files changed, 161 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 3aaa68b..ed161da 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -192,7 +192,6 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
metadata_only_queries.q,\
metadata_only_queries_with_filters.q,\
metadataonly1.q,\
- mm_conversions.q,\
mrr.q,\
nonmr_fetch_threshold.q,\
optimize_nullscan.q,\
@@ -581,6 +580,7 @@ minillaplocal.query.files=\
mapjoin_hint.q,\
mapjoin_emit_interval.q,\
mergejoin_3way.q,\
+ mm_conversions.q,\
mm_exim.q,\
mrr.q,\
multiMapJoin1.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 9cb2ff1..a35a215 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -1415,8 +1416,9 @@ public class Driver implements IDriver {
if(userFromUGI == null) {
throw createProcessorResponse(10);
}
+
// Set the table write id in all of the acid file sinks
- if (haveAcidWrite()) {
+ if (!plan.getAcidSinks().isEmpty()) {
List<FileSinkDesc> acidSinks = new ArrayList<>(plan.getAcidSinks());
//sorting makes tests easier to write since file names and ROW__IDs depend on statementId
//so this makes (file name -> data) mapping stable
@@ -1433,6 +1435,18 @@ public class Driver implements IDriver {
desc.setStatementId(queryTxnMgr.getStmtIdAndIncrement());
}
}
+
+ // Note: the sinks and DDL cannot coexist at this time; but if they could we would
+ // need to make sure we don't get two write IDs for the same table.
+ DDLDescWithWriteId acidDdlDesc = plan.getAcidDdlDesc();
+ if (acidDdlDesc != null && acidDdlDesc.mayNeedWriteId()) {
+ String fqTableName = acidDdlDesc.getFullTableName();
+ long writeId = queryTxnMgr.getTableWriteId(
+ Utilities.getDatabaseName(fqTableName), Utilities.getTableName(fqTableName));
+ acidDdlDesc.setWriteId(writeId);
+ }
+
+
/*It's imperative that {@code acquireLocks()} is called for all commands so that
HiveTxnManager can transition its state machine correctly*/
queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
@@ -1456,10 +1470,6 @@ public class Driver implements IDriver {
}
}
- private boolean haveAcidWrite() {
- return !plan.getAcidSinks().isEmpty();
- }
-
public void releaseLocksAndCommitOrRollback(boolean commit) throws LockException {
releaseLocksAndCommitOrRollback(commit, queryTxnMgr);
}
@@ -1886,7 +1896,7 @@ public class Driver implements IDriver {
return false;
}
// Lock operations themselves don't require the lock.
- if (isExplicitLockOperation()){
+ if (isExplicitLockOperation()) {
return false;
}
if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) {
@@ -2115,7 +2125,6 @@ public class Driver implements IDriver {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
-
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index f53afaf..79e938a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,8 +35,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -50,6 +48,8 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.DDLDesc;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -61,8 +61,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* QueryPlan can be serialized to disk so that we can restart/resume the
@@ -112,6 +112,7 @@ public class QueryPlan implements Serializable {
private final HiveOperation operation;
private final boolean acidResourcesInQuery;
private final Set<FileSinkDesc> acidSinks; // Note: both full-ACID and insert-only sinks.
+ private final DDLDesc.DDLDescWithWriteId acidDdlDesc;
private Boolean autoCommitValue;
public QueryPlan() {
@@ -123,6 +124,7 @@ public class QueryPlan implements Serializable {
this.operation = command;
this.acidResourcesInQuery = false;
this.acidSinks = Collections.emptySet();
+ this.acidDdlDesc = null;
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -151,8 +153,8 @@ public class QueryPlan implements Serializable {
this.resultSchema = resultSchema;
this.acidResourcesInQuery = sem.hasTransactionalInQuery();
this.acidSinks = sem.getAcidFileSinks();
+ this.acidDdlDesc = sem.getAcidDdlDesc();
}
- private static final Logger LOG = LoggerFactory.getLogger(QueryPlan.class);
/**
* @return true if any acid resources are read/written
@@ -166,6 +168,11 @@ public class QueryPlan implements Serializable {
Set<FileSinkDesc> getAcidSinks() {
return acidSinks;
}
+
+ DDLDescWithWriteId getAcidDdlDesc() {
+ return acidDdlDesc;
+ }
+
public String getQueryStr() {
return queryString;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index c8cb8a4..4e10649 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.ImmutableSet;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -148,6 +149,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.CheckConstraint;
import org.apache.hadoop.hive.ql.metadata.CheckResult;
import org.apache.hadoop.hive.ql.metadata.DefaultConstraint;
@@ -4423,27 +4425,17 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
}
- private List<Task<?>> generateAddMmTasks(Table tbl) throws HiveException {
+ private List<Task<?>> generateAddMmTasks(Table tbl, Long writeId) throws HiveException {
// We will move all the files in the table/partition directories into the first MM
// directory, then commit the first write ID.
List<Path> srcs = new ArrayList<>(), tgts = new ArrayList<>();
- long mmWriteId = 0;
- try {
- HiveTxnManager txnManager = getTxnMgr();
- if (txnManager.isTxnOpen()) {
- mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName());
- } else {
- txnManager.openTxn(new Context(conf), conf.getUser());
- mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName());
- txnManager.commitTxn();
- }
- } catch (Exception e) {
- String errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
- console.printError(errorMessage, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ if (writeId == null) {
+ throw new HiveException("Internal error - write ID not set for MM conversion");
}
+
int stmtId = 0;
- String mmDir = AcidUtils.deltaSubdir(mmWriteId, mmWriteId, stmtId);
+ String mmDir = AcidUtils.deltaSubdir(writeId, writeId, stmtId);
+
Hive db = getHive();
if (tbl.getPartitionKeys().size() > 0) {
PartitionIterable parts = new PartitionIterable(db, tbl, null,
@@ -4471,10 +4463,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
// Don't set inputs and outputs - the locks have already been taken so it's pointless.
MoveWork mw = new MoveWork(null, null, null, null, false);
mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
- ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId);
- Task<?> mv = TaskFactory.get(mw), ic = TaskFactory.get(icw);
- mv.addDependentTask(ic);
- return Lists.<Task<?>>newArrayList(mv);
+ return Lists.<Task<?>>newArrayList(TaskFactory.get(mw));
}
private List<Task<?>> alterTableAddProps(AlterTableDesc alterTbl, Table tbl,
@@ -4491,7 +4480,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps());
if (isToMmTable != null) {
if (!isFromMmTable && isToMmTable) {
- result = generateAddMmTasks(tbl);
+ result = generateAddMmTasks(tbl, alterTbl.getWriteId());
} else if (isFromMmTable && !isToMmTable) {
throw new HiveException("Cannot convert an ACID table to non-ACID");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
deleted file mode 100644
index b3c62ad..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.util.StringUtils;
-
-public class ImportCommitTask extends Task<ImportCommitWork> {
-
- private static final long serialVersionUID = 1L;
-
- public ImportCommitTask() {
- super();
- }
-
- @Override
- public int execute(DriverContext driverContext) {
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getWriteId());
- }
-
- try {
- if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
- return 0;
- }
- return 0;
- } catch (Exception e) {
- console.printError("Failed with exception " + e.getMessage(), "\n"
- + StringUtils.stringifyException(e));
- setException(e);
- return 1;
- }
- }
-
- @Override
- public StageType getType() {
- return StageType.MOVE; // The commit for import is normally done as part of MoveTask.
- }
-
- @Override
- public String getName() {
- return "IMPORT_COMMIT";
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
deleted file mode 100644
index a119250..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
-@Explain(displayName = "Import Commit", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class ImportCommitWork implements Serializable {
- private static final long serialVersionUID = 1L;
- private String dbName, tblName;
- private long writeId;
- private int stmtId;
-
- public ImportCommitWork(String dbName, String tblName, long writeId, int stmtId) {
- this.writeId = writeId;
- this.stmtId = stmtId;
- this.dbName = dbName;
- this.tblName = tblName;
- }
-
- public long getWriteId() {
- return writeId;
- }
-
- public int getStmtId() {
- return stmtId;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public String getTblName() {
- return tblName;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 10a2ed2..2da6b0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -105,8 +105,6 @@ public final class TaskFactory {
MergeFileTask.class));
taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
DependencyCollectionTask.class));
- taskvec.add(new TaskTuple<ImportCommitWork>(ImportCommitWork.class,
- ImportCommitTask.class));
taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4760b85..212e0a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1475,7 +1475,7 @@ public class AcidUtils {
/**
* The method for altering table props; may set the table to MM, non-MM, or not affect MM.
* todo: All such validation logic should be TransactionValidationListener
- * @param tbl object image before alter table command
+ * @param tbl object image before alter table command (or null if not retrieved yet).
* @param props prop values set in this alter table command
*/
public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
@@ -1491,17 +1491,18 @@ public class AcidUtils {
return null;
}
- if(transactional == null) {
+ if (transactional == null && tbl != null) {
transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
}
boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
if (transactionalProp == null) {
- if (isSetToTxn) return false; // Assume the full ACID table.
+ if (isSetToTxn || tbl == null) return false; // Assume the full ACID table.
throw new RuntimeException("Cannot change '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL
+ "' without '" + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "'");
}
if (!"insert_only".equalsIgnoreCase(transactionalProp)) return false; // Not MM.
if (!isSetToTxn) {
+ if (tbl == null) return true; // No table information yet; looks like it could be valid.
throw new RuntimeException("Cannot set '"
+ hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "' to 'insert_only' without "
+ "setting '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL + "' to 'true'");
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 49c355b..be98446 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1134,7 +1134,7 @@ public class Hive {
tTable = getMSC().getTable(dbName, tableName);
} catch (NoSuchObjectException e) {
if (throwException) {
- LOG.error("Table " + tableName + " not found: " + e.getMessage());
+ LOG.error("Table " + dbName + "." + tableName + " not found: " + e.getMessage());
throw new InvalidTableException(tableName);
}
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index cf897a6..596edde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -2289,4 +2290,8 @@ public abstract class BaseSemanticAnalyzer {
public void setCacheUsage(CacheUsage cacheUsage) {
this.cacheUsage = cacheUsage;
}
+
+ public DDLDescWithWriteId getAcidDdlDesc() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index fb15adf..defb8be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.plan.AlterResourcePlanDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition;
import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
import org.apache.hadoop.hive.ql.plan.AlterWMTriggerDesc;
@@ -122,6 +123,7 @@ import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc;
import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc;
import org.apache.hadoop.hive.ql.plan.CreateResourcePlanDesc;
import org.apache.hadoop.hive.ql.plan.CreateWMTriggerDesc;
+import org.apache.hadoop.hive.ql.plan.DDLDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DescDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DescFunctionDesc;
@@ -200,6 +202,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
private final Set<String> reservedPartitionValues;
private final HiveAuthorizationTaskFactory hiveAuthorizationTaskFactory;
private WriteEntity alterTableOutput;
+ // Equivalent to acidSinks, but for DDL operations that change data.
+ private DDLDesc.DDLDescWithWriteId ddlDescWithWriteId;
static {
TokenToTypeName.put(HiveParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME);
@@ -1579,7 +1583,6 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
truncateTblDesc.setLbCtx(lbCtx);
addInputsOutputsAlterTable(tableName, partSpec, AlterTableTypes.TRUNCATE);
-
ddlWork.setNeedLock(true);
TableDesc tblDesc = Utilities.getTableDesc(table);
// Write the output to temporary directory and move it to the final location at the end
@@ -1752,7 +1755,18 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
|| mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isPotentialMmSwitch);
- rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
+ DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), alterTblDesc);
+ if (isPotentialMmSwitch) {
+ this.ddlDescWithWriteId = alterTblDesc;
+ ddlWork.setNeedLock(true); // Hmm... why don't many other operations here need locks?
+ }
+
+ rootTasks.add(TaskFactory.get(ddlWork));
+ }
+
+ @Override
+ public DDLDescWithWriteId getAcidDdlDesc() {
+ return ddlDescWithWriteId;
}
private void analyzeAlterTableSerdeProps(ASTNode ast, String tableName,
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index ac44be5..b850ddc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.ImportCommitWork;
import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -480,8 +479,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
- EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm,
- Task<?> commitTask)
+ EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm)
throws MetaException, IOException, HiveException {
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
@@ -540,9 +538,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
x.getTasks().add(copyTask);
- if (commitTask != null) {
- loadPartTask.addDependentTask(commitTask);
- }
return addPartTask;
}
}
@@ -839,16 +834,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (table != null) {
if (table.isPartitioned()) {
x.getLOG().debug("table partitioned");
- Task<?> ict = createImportCommitTask(x.getConf(),
- table.getDbName(), table.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(table.getParameters()));
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
} else {
throw new SemanticException(
ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -878,12 +870,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
if (isPartitioned(tblDesc)) {
- Task<?> ict = createImportCommitTask(x.getConf(),
- tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
- replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ replicationSpec, x, writeId, stmtId, isSourceMm));
}
} else {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -920,14 +909,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return newTable;
}
- private static Task<?> createImportCommitTask(
- HiveConf conf, String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) {
- // TODO: noop, remove?
- Task<ImportCommitWork> ict = (!isMmTable) ? null : TaskFactory.get(
- new ImportCommitWork(dbName, tblName, writeId, stmtId), conf);
- return ict;
- }
-
/**
* Create tasks for repl import
*/
@@ -1022,13 +1003,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
- Task<?> ict = createImportCommitTask(x.getConf(),
- tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
if (updatedMetadata != null) {
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
}
@@ -1055,13 +1033,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
addPartitionDesc.setReplicationSpec(replicationSpec);
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
- Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask(
- x.getConf(), tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
if (updatedMetadata != null) {
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
}
@@ -1078,7 +1053,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.allowReplacementInto(ptn.getParameters())){
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
} else {
x.getTasks().add(alterSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index bdecbaf..05eca1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -515,7 +515,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
String tableName,
Map<String, String> partSpec,
String replState,
- Task<? extends Serializable> preCursor) {
+ Task<? extends Serializable> preCursor) throws SemanticException {
HashMap<String, String> mapProp = new HashMap<>();
mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
@@ -563,7 +563,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private List<Task<? extends Serializable>> addUpdateReplStateTasks(
boolean isDatabaseLoad,
UpdatedMetaDataTracker updatedMetadata,
- List<Task<? extends Serializable>> importTasks) {
+ List<Task<? extends Serializable>> importTasks) throws SemanticException {
String replState = updatedMetadata.getReplicationState();
String dbName = updatedMetadata.getDatabase();
String tableName = updatedMetadata.getTable();
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
index d7b2247..a767796 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -33,9 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
import com.google.common.collect.ImmutableList;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -49,7 +49,7 @@ import java.util.Set;
*
*/
@Explain(displayName = "Alter Table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class AlterTableDesc extends DDLDesc implements Serializable {
+public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDLDescWithWriteId {
private static final long serialVersionUID = 1L;
/**
@@ -124,7 +124,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
boolean isStoredAsSubDirectories = false;
List<String> skewedColNames;
List<List<String>> skewedColValues;
- Table table;
+ Table tableForSkewedColValidation;
boolean isDropIfExists = false;
boolean isTurnOffSorting = false;
boolean isCascade = false;
@@ -137,6 +137,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
List<SQLDefaultConstraint> defaultConstraintsCols;
List<SQLCheckConstraint> checkConstraintsCols;
ReplicationSpec replicationSpec;
+ private Long writeId = null;
public AlterTableDesc() {
}
@@ -150,12 +151,13 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* new column name
* @param newComment
* @param newType
+ * @throws SemanticException
*/
public AlterTableDesc(String tblName, HashMap<String, String> partSpec,
String oldColName, String newColName, String newType, String newComment,
- boolean first, String afterCol, boolean isCascade) {
+ boolean first, String afterCol, boolean isCascade) throws SemanticException {
super();
- oldName = tblName;
+ setOldName(tblName);
this.partSpec = partSpec;
this.oldColName = oldColName;
this.newColName = newColName;
@@ -172,9 +174,9 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
boolean first, String afterCol, boolean isCascade, List<SQLPrimaryKey> primaryKeyCols,
List<SQLForeignKey> foreignKeyCols, List<SQLUniqueConstraint> uniqueConstraintCols,
List<SQLNotNullConstraint> notNullConstraintCols, List<SQLDefaultConstraint> defaultConstraints,
- List<SQLCheckConstraint> checkConstraints) {
+ List<SQLCheckConstraint> checkConstraints) throws SemanticException {
super();
- oldName = tblName;
+ setOldName(tblName);
this.partSpec = partSpec;
this.oldColName = oldColName;
this.newColName = newColName;
@@ -201,10 +203,11 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* Flag to denote if current table can be a view
* @param replicationSpec
* Replication specification with current event ID
+ * @throws SemanticException
*/
- public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) {
+ public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) throws SemanticException {
op = AlterTableTypes.RENAME;
- this.oldName = oldName;
+ setOldName(oldName);
this.newName = newName;
this.expectView = expectView;
this.replicationSpec = replicationSpec;
@@ -215,11 +218,12 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* name of the table
* @param newCols
* new columns to be added
+ * @throws SemanticException
*/
public AlterTableDesc(String name, HashMap<String, String> partSpec, List<FieldSchema> newCols,
- AlterTableTypes alterType, boolean isCascade) {
+ AlterTableTypes alterType, boolean isCascade) throws SemanticException {
op = alterType;
- oldName = name;
+ setOldName(name);
this.newCols = new ArrayList<FieldSchema>(newCols);
this.partSpec = partSpec;
this.isCascade = isCascade;
@@ -267,12 +271,13 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* @param outputFormat
* new table output format
* @param partSpec
+ * @throws SemanticException
*/
public AlterTableDesc(String name, String inputFormat, String outputFormat,
- String serdeName, String storageHandler, HashMap<String, String> partSpec) {
+ String serdeName, String storageHandler, HashMap<String, String> partSpec) throws SemanticException {
super();
op = AlterTableTypes.ADDFILEFORMAT;
- oldName = name;
+ setOldName(name);
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serdeName = serdeName;
@@ -281,8 +286,8 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
}
public AlterTableDesc(String tableName, int numBuckets,
- List<String> bucketCols, List<Order> sortCols, HashMap<String, String> partSpec) {
- oldName = tableName;
+ List<String> bucketCols, List<Order> sortCols, HashMap<String, String> partSpec) throws SemanticException {
+ setOldName(tableName);
op = AlterTableTypes.ADDCLUSTERSORTCOLUMN;
numberBuckets = numBuckets;
bucketColumns = new ArrayList<String>(bucketCols);
@@ -290,47 +295,47 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
this.partSpec = partSpec;
}
- public AlterTableDesc(String tableName, boolean sortingOff, HashMap<String, String> partSpec) {
- oldName = tableName;
+ public AlterTableDesc(String tableName, boolean sortingOff, HashMap<String, String> partSpec) throws SemanticException {
+ setOldName(tableName);
op = AlterTableTypes.ADDCLUSTERSORTCOLUMN;
isTurnOffSorting = sortingOff;
this.partSpec = partSpec;
}
public AlterTableDesc(String tableName, String newLocation,
- HashMap<String, String> partSpec) {
+ HashMap<String, String> partSpec) throws SemanticException {
op = AlterTableTypes.ALTERLOCATION;
- this.oldName = tableName;
+ setOldName(tableName);
this.newLocation = newLocation;
this.partSpec = partSpec;
}
public AlterTableDesc(String tableName, Map<List<String>, String> locations,
- HashMap<String, String> partSpec) {
+ HashMap<String, String> partSpec) throws SemanticException {
op = AlterTableTypes.ALTERSKEWEDLOCATION;
- this.oldName = tableName;
+ setOldName(tableName);
this.skewedLocations = locations;
this.partSpec = partSpec;
}
public AlterTableDesc(String tableName, boolean turnOffSkewed,
- List<String> skewedColNames, List<List<String>> skewedColValues) {
- oldName = tableName;
+ List<String> skewedColNames, List<List<String>> skewedColValues) throws SemanticException {
+ setOldName(tableName);
op = AlterTableTypes.ADDSKEWEDBY;
this.isTurnOffSkewed = turnOffSkewed;
this.skewedColNames = new ArrayList<String>(skewedColNames);
this.skewedColValues = new ArrayList<List<String>>(skewedColValues);
}
- public AlterTableDesc(String tableName, HashMap<String, String> partSpec, int numBuckets) {
+ public AlterTableDesc(String tableName, HashMap<String, String> partSpec, int numBuckets) throws SemanticException {
op = AlterTableTypes.ALTERBUCKETNUM;
- this.oldName = tableName;
+ setOldName(tableName);
this.partSpec = partSpec;
this.numberBuckets = numBuckets;
}
- public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSpec replicationSpec) {
- this.oldName = tableName;
+ public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSpec replicationSpec) throws SemanticException {
+ setOldName(tableName);
this.dropConstraintName = dropConstraintName;
this.replicationSpec = replicationSpec;
op = AlterTableTypes.DROPCONSTRAINT;
@@ -338,8 +343,8 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
public AlterTableDesc(String tableName, List<SQLPrimaryKey> primaryKeyCols,
List<SQLForeignKey> foreignKeyCols, List<SQLUniqueConstraint> uniqueConstraintCols,
- ReplicationSpec replicationSpec) {
- this.oldName = tableName;
+ ReplicationSpec replicationSpec) throws SemanticException {
+ setOldName(tableName);
this.primaryKeyCols = primaryKeyCols;
this.foreignKeyCols = foreignKeyCols;
this.uniqueConstraintCols = uniqueConstraintCols;
@@ -350,8 +355,8 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
public AlterTableDesc(String tableName, List<SQLPrimaryKey> primaryKeyCols,
List<SQLForeignKey> foreignKeyCols, List<SQLUniqueConstraint> uniqueConstraintCols,
List<SQLNotNullConstraint> notNullConstraintCols, List<SQLDefaultConstraint> defaultConstraints,
- List<SQLCheckConstraint> checkConstraints, ReplicationSpec replicationSpec) {
- this.oldName = tableName;
+ List<SQLCheckConstraint> checkConstraints, ReplicationSpec replicationSpec) throws SemanticException {
+ setOldName(tableName);
this.primaryKeyCols = primaryKeyCols;
this.foreignKeyCols = foreignKeyCols;
this.uniqueConstraintCols = uniqueConstraintCols;
@@ -384,8 +389,9 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* @param oldName
* the oldName to set
*/
- public void setOldName(String oldName) {
- this.oldName = oldName;
+ public void setOldName(String oldName) throws SemanticException {
+ // Make sure we qualify the name from the outset so there's no ambiguity.
+ this.oldName = String.join(".", Utilities.getDbTableName(oldName));
}
/**
@@ -848,26 +854,19 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* @throws SemanticException
*/
public void validate() throws SemanticException {
- if (null != table) {
+ if (null != tableForSkewedColValidation) {
/* Validate skewed information. */
ValidationUtility.validateSkewedInformation(
- ParseUtils.validateColumnNameUniqueness(table.getCols()), this.getSkewedColNames(),
- this.getSkewedColValues());
+ ParseUtils.validateColumnNameUniqueness(tableForSkewedColValidation.getCols()),
+ this.getSkewedColNames(), this.getSkewedColValues());
}
}
/**
- * @return the table
- */
- public Table getTable() {
- return table;
- }
-
- /**
* @param table the table to set
*/
public void setTable(Table table) {
- this.table = table;
+ this.tableForSkewedColValidation = table;
}
/**
@@ -929,4 +928,24 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* This can result in a "ALTER IF NEWER THAN" kind of semantic
*/
public ReplicationSpec getReplicationSpec(){ return this.replicationSpec; }
+
+ @Override
+ public void setWriteId(long writeId) {
+ this.writeId = writeId;
+ }
+
+ @Override
+ public String getFullTableName() {
+ return getOldName();
+ }
+
+ @Override
+ public boolean mayNeedWriteId() {
+ return getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS
+ && AcidUtils.isToInsertOnlyTable(null, getProps());
+ }
+
+ public Long getWriteId() {
+ return this.writeId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
index 65f4cf2..8941d97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
@@ -26,4 +26,10 @@ import java.io.Serializable;
*/
public abstract class DDLDesc implements Serializable {
private static final long serialVersionUID = 1L;
+
+ public static interface DDLDescWithWriteId {
+ void setWriteId(long writeId);
+ String getFullTableName();
+ boolean mayNeedWriteId();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 0926663..3c2f9d4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2453,4 +2453,22 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks);
}
+
+ @Test
+ public void testMmConversionLocks() throws Exception {
+ dropTable(new String[] {"T"});
+ CommandProcessorResponse cpr = driver.run("create table T (a int, b int) tblproperties('transactional'='false')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into T values(0,2),(1,4)");
+ checkCmdOnDriver(cpr);
+
+ cpr = driver.compileAndRespond("ALTER TABLE T set tblproperties"
+ + "('transactional'='true', 'transactional_properties'='insert_only')", true);
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets X lock on T
+
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/62244019/ql/src/test/results/clientpositive/mm_conversions.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_conversions.q.out b/ql/src/test/results/clientpositive/mm_conversions.q.out
deleted file mode 100644
index 4754710..0000000
--- a/ql/src/test/results/clientpositive/mm_conversions.q.out
+++ /dev/null
@@ -1,309 +0,0 @@
-PREHOOK: query: drop table intermediate
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table intermediate
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@intermediate
-POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@intermediate
-PREHOOK: query: insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=455
-POSTHOOK: query: insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=455
-POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=456
-POSTHOOK: query: insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=456
-POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=457
-POSTHOOK: query: insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=457
-POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: drop table simple_to_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table simple_to_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table simple_to_mm(key int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: create table simple_to_mm(key int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple_to_mm
-PREHOOK: query: insert into table simple_to_mm select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: insert into table simple_to_mm select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_to_mm
-POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from simple_to_mm s1 order by key
-PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-POSTHOOK: query: select * from simple_to_mm s1 order by key
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-0
-98
-100
-PREHOOK: query: alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-PREHOOK: type: ALTERTABLE_PROPERTIES
-PREHOOK: Input: default@simple_to_mm
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-POSTHOOK: type: ALTERTABLE_PROPERTIES
-POSTHOOK: Input: default@simple_to_mm
-POSTHOOK: Output: default@simple_to_mm
-PREHOOK: query: select * from simple_to_mm s2 order by key
-PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-POSTHOOK: query: select * from simple_to_mm s2 order by key
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-0
-98
-100
-PREHOOK: query: insert into table simple_to_mm select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: insert into table simple_to_mm select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_to_mm
-POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table simple_to_mm select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: insert into table simple_to_mm select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_to_mm
-POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from simple_to_mm s3 order by key
-PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-POSTHOOK: query: select * from simple_to_mm s3 order by key
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-0
-0
-0
-98
-98
-98
-100
-100
-100
-PREHOOK: query: drop table simple_to_mm
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@simple_to_mm
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: drop table simple_to_mm
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@simple_to_mm
-POSTHOOK: Output: default@simple_to_mm
-PREHOOK: query: drop table part_to_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table part_to_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@part_to_mm
-PREHOOK: query: insert into table part_to_mm partition(key_mm='455') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=455
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='455') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=455
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from part_to_mm s1 order by key, key_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Input: default@part_to_mm@key_mm=455
-PREHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_to_mm s1 order by key, key_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Input: default@part_to_mm@key_mm=455
-POSTHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-0 455
-0 456
-98 455
-98 456
-100 455
-100 456
-PREHOOK: query: alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-PREHOOK: type: ALTERTABLE_PROPERTIES
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-POSTHOOK: type: ALTERTABLE_PROPERTIES
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Output: default@part_to_mm
-PREHOOK: query: select * from part_to_mm s2 order by key, key_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Input: default@part_to_mm@key_mm=455
-PREHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_to_mm s2 order by key, key_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Input: default@part_to_mm@key_mm=455
-POSTHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-0 455
-0 456
-98 455
-98 456
-100 455
-100 456
-PREHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table part_to_mm partition(key_mm='457') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=457
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='457') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=457
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=457).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from part_to_mm s3 order by key, key_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Input: default@part_to_mm@key_mm=455
-PREHOOK: Input: default@part_to_mm@key_mm=456
-PREHOOK: Input: default@part_to_mm@key_mm=457
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_to_mm s3 order by key, key_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Input: default@part_to_mm@key_mm=455
-POSTHOOK: Input: default@part_to_mm@key_mm=456
-POSTHOOK: Input: default@part_to_mm@key_mm=457
-#### A masked pattern was here ####
-0 455
-0 456
-0 456
-0 457
-98 455
-98 456
-98 456
-98 457
-100 455
-100 456
-100 456
-100 457
-PREHOOK: query: drop table part_to_mm
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: drop table part_to_mm
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Output: default@part_to_mm
-PREHOOK: query: drop table intermediate
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@intermediate
-PREHOOK: Output: default@intermediate
-POSTHOOK: query: drop table intermediate
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Output: default@intermediate
[2/2] hive git commit: HIVE-17647 : DDLTask.generateAddMmTasks(Table
tbl) and other random code should not start transactions (Sergey Shelukhin,
reviewed by Eugene Koifman)
Posted by se...@apache.org.
HIVE-17647 : DDLTask.generateAddMmTasks(Table tbl) and other random code should not start transactions (Sergey Shelukhin, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e1c6675
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e1c6675
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e1c6675
Branch: refs/heads/branch-3
Commit: 0e1c667593d093a62849b607c2fea7474bd48a94
Parents: 871c05b
Author: sergey <se...@apache.org>
Authored: Mon Apr 23 11:07:40 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Apr 23 11:13:31 2018 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 2 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 23 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 17 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 31 +-
.../hadoop/hive/ql/exec/ImportCommitTask.java | 62 ----
.../hadoop/hive/ql/exec/ImportCommitWork.java | 54 ----
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 2 -
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 2 +-
.../hive/ql/parse/BaseSemanticAnalyzer.java | 5 +
.../hive/ql/parse/DDLSemanticAnalyzer.java | 18 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 37 +--
.../ql/parse/ReplicationSemanticAnalyzer.java | 4 +-
.../hadoop/hive/ql/plan/AlterTableDesc.java | 109 ++++---
.../org/apache/hadoop/hive/ql/plan/DDLDesc.java | 6 +
.../hive/ql/lockmgr/TestDbTxnManager2.java | 18 ++
.../results/clientpositive/mm_conversions.q.out | 309 -------------------
17 files changed, 161 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index f278441..56595aa 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -192,7 +192,6 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
metadata_only_queries.q,\
metadata_only_queries_with_filters.q,\
metadataonly1.q,\
- mm_conversions.q,\
mrr.q,\
nonmr_fetch_threshold.q,\
optimize_nullscan.q,\
@@ -581,6 +580,7 @@ minillaplocal.query.files=\
mapjoin_hint.q,\
mapjoin_emit_interval.q,\
mergejoin_3way.q,\
+ mm_conversions.q,\
mm_exim.q,\
mrr.q,\
multiMapJoin1.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a630b6b..e8feeb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -1412,8 +1413,9 @@ public class Driver implements IDriver {
if(userFromUGI == null) {
throw createProcessorResponse(10);
}
+
// Set the table write id in all of the acid file sinks
- if (haveAcidWrite()) {
+ if (!plan.getAcidSinks().isEmpty()) {
List<FileSinkDesc> acidSinks = new ArrayList<>(plan.getAcidSinks());
//sorting makes tests easier to write since file names and ROW__IDs depend on statementId
//so this makes (file name -> data) mapping stable
@@ -1430,6 +1432,18 @@ public class Driver implements IDriver {
desc.setStatementId(queryTxnMgr.getStmtIdAndIncrement());
}
}
+
+ // Note: the sinks and DDL cannot coexist at this time; but if they could we would
+ // need to make sure we don't get two write IDs for the same table.
+ DDLDescWithWriteId acidDdlDesc = plan.getAcidDdlDesc();
+ if (acidDdlDesc != null && acidDdlDesc.mayNeedWriteId()) {
+ String fqTableName = acidDdlDesc.getFullTableName();
+ long writeId = queryTxnMgr.getTableWriteId(
+ Utilities.getDatabaseName(fqTableName), Utilities.getTableName(fqTableName));
+ acidDdlDesc.setWriteId(writeId);
+ }
+
+
/*It's imperative that {@code acquireLocks()} is called for all commands so that
HiveTxnManager can transition its state machine correctly*/
queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
@@ -1453,10 +1467,6 @@ public class Driver implements IDriver {
}
}
- private boolean haveAcidWrite() {
- return !plan.getAcidSinks().isEmpty();
- }
-
public void releaseLocksAndCommitOrRollback(boolean commit) throws LockException {
releaseLocksAndCommitOrRollback(commit, queryTxnMgr);
}
@@ -1883,7 +1893,7 @@ public class Driver implements IDriver {
return false;
}
// Lock operations themselves don't require the lock.
- if (isExplicitLockOperation()){
+ if (isExplicitLockOperation()) {
return false;
}
if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_LOCK_MAPRED_ONLY)) {
@@ -2112,7 +2122,6 @@ public class Driver implements IDriver {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
-
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index f53afaf..79e938a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,8 +35,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -50,6 +48,8 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.DDLDesc;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -61,8 +61,8 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* QueryPlan can be serialized to disk so that we can restart/resume the
@@ -112,6 +112,7 @@ public class QueryPlan implements Serializable {
private final HiveOperation operation;
private final boolean acidResourcesInQuery;
private final Set<FileSinkDesc> acidSinks; // Note: both full-ACID and insert-only sinks.
+ private final DDLDesc.DDLDescWithWriteId acidDdlDesc;
private Boolean autoCommitValue;
public QueryPlan() {
@@ -123,6 +124,7 @@ public class QueryPlan implements Serializable {
this.operation = command;
this.acidResourcesInQuery = false;
this.acidSinks = Collections.emptySet();
+ this.acidDdlDesc = null;
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -151,8 +153,8 @@ public class QueryPlan implements Serializable {
this.resultSchema = resultSchema;
this.acidResourcesInQuery = sem.hasTransactionalInQuery();
this.acidSinks = sem.getAcidFileSinks();
+ this.acidDdlDesc = sem.getAcidDdlDesc();
}
- private static final Logger LOG = LoggerFactory.getLogger(QueryPlan.class);
/**
* @return true if any acid resources are read/written
@@ -166,6 +168,11 @@ public class QueryPlan implements Serializable {
Set<FileSinkDesc> getAcidSinks() {
return acidSinks;
}
+
+ DDLDescWithWriteId getAcidDdlDesc() {
+ return acidDdlDesc;
+ }
+
public String getQueryStr() {
return queryString;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 9a487cd..d16944f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import java.util.concurrent.ExecutionException;
import com.google.common.collect.ImmutableSet;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -148,6 +149,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.CheckConstraint;
import org.apache.hadoop.hive.ql.metadata.CheckResult;
import org.apache.hadoop.hive.ql.metadata.DefaultConstraint;
@@ -4419,27 +4421,17 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
}
- private List<Task<?>> generateAddMmTasks(Table tbl) throws HiveException {
+ private List<Task<?>> generateAddMmTasks(Table tbl, Long writeId) throws HiveException {
// We will move all the files in the table/partition directories into the first MM
// directory, then commit the first write ID.
List<Path> srcs = new ArrayList<>(), tgts = new ArrayList<>();
- long mmWriteId = 0;
- try {
- HiveTxnManager txnManager = SessionState.get().getTxnMgr();
- if (txnManager.isTxnOpen()) {
- mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName());
- } else {
- txnManager.openTxn(new Context(conf), conf.getUser());
- mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName());
- txnManager.commitTxn();
- }
- } catch (Exception e) {
- String errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
- console.printError(errorMessage, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ if (writeId == null) {
+ throw new HiveException("Internal error - write ID not set for MM conversion");
}
+
int stmtId = 0;
- String mmDir = AcidUtils.deltaSubdir(mmWriteId, mmWriteId, stmtId);
+ String mmDir = AcidUtils.deltaSubdir(writeId, writeId, stmtId);
+
Hive db = getHive();
if (tbl.getPartitionKeys().size() > 0) {
PartitionIterable parts = new PartitionIterable(db, tbl, null,
@@ -4467,10 +4459,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
// Don't set inputs and outputs - the locks have already been taken so it's pointless.
MoveWork mw = new MoveWork(null, null, null, null, false);
mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
- ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId);
- Task<?> mv = TaskFactory.get(mw), ic = TaskFactory.get(icw);
- mv.addDependentTask(ic);
- return Lists.<Task<?>>newArrayList(mv);
+ return Lists.<Task<?>>newArrayList(TaskFactory.get(mw));
}
private List<Task<?>> alterTableAddProps(AlterTableDesc alterTbl, Table tbl,
@@ -4487,7 +4476,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps());
if (isToMmTable != null) {
if (!isFromMmTable && isToMmTable) {
- result = generateAddMmTasks(tbl);
+ result = generateAddMmTasks(tbl, alterTbl.getWriteId());
} else if (isFromMmTable && !isToMmTable) {
throw new HiveException("Cannot convert an ACID table to non-ACID");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
deleted file mode 100644
index b3c62ad..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.util.StringUtils;
-
-public class ImportCommitTask extends Task<ImportCommitWork> {
-
- private static final long serialVersionUID = 1L;
-
- public ImportCommitTask() {
- super();
- }
-
- @Override
- public int execute(DriverContext driverContext) {
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getWriteId());
- }
-
- try {
- if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
- return 0;
- }
- return 0;
- } catch (Exception e) {
- console.printError("Failed with exception " + e.getMessage(), "\n"
- + StringUtils.stringifyException(e));
- setException(e);
- return 1;
- }
- }
-
- @Override
- public StageType getType() {
- return StageType.MOVE; // The commit for import is normally done as part of MoveTask.
- }
-
- @Override
- public String getName() {
- return "IMPORT_COMMIT";
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
deleted file mode 100644
index a119250..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
-@Explain(displayName = "Import Commit", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class ImportCommitWork implements Serializable {
- private static final long serialVersionUID = 1L;
- private String dbName, tblName;
- private long writeId;
- private int stmtId;
-
- public ImportCommitWork(String dbName, String tblName, long writeId, int stmtId) {
- this.writeId = writeId;
- this.stmtId = stmtId;
- this.dbName = dbName;
- this.tblName = tblName;
- }
-
- public long getWriteId() {
- return writeId;
- }
-
- public int getStmtId() {
- return stmtId;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public String getTblName() {
- return tblName;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 10a2ed2..2da6b0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -105,8 +105,6 @@ public final class TaskFactory {
MergeFileTask.class));
taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
DependencyCollectionTask.class));
- taskvec.add(new TaskTuple<ImportCommitWork>(ImportCommitWork.class,
- ImportCommitTask.class));
taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4760b85..212e0a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1475,7 +1475,7 @@ public class AcidUtils {
/**
* The method for altering table props; may set the table to MM, non-MM, or not affect MM.
* todo: All such validation logic should be TransactionValidationListener
- * @param tbl object image before alter table command
+ * @param tbl object image before alter table command (or null if not retrieved yet).
* @param props prop values set in this alter table command
*/
public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
@@ -1491,17 +1491,18 @@ public class AcidUtils {
return null;
}
- if(transactional == null) {
+ if (transactional == null && tbl != null) {
transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
}
boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
if (transactionalProp == null) {
- if (isSetToTxn) return false; // Assume the full ACID table.
+ if (isSetToTxn || tbl == null) return false; // Assume the full ACID table.
throw new RuntimeException("Cannot change '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL
+ "' without '" + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "'");
}
if (!"insert_only".equalsIgnoreCase(transactionalProp)) return false; // Not MM.
if (!isSetToTxn) {
+ if (tbl == null) return true; // No table information yet; looks like it could be valid.
throw new RuntimeException("Cannot set '"
+ hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "' to 'insert_only' without "
+ "setting '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL + "' to 'true'");
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 49c355b..be98446 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1134,7 +1134,7 @@ public class Hive {
tTable = getMSC().getTable(dbName, tableName);
} catch (NoSuchObjectException e) {
if (throwException) {
- LOG.error("Table " + tableName + " not found: " + e.getMessage());
+ LOG.error("Table " + dbName + "." + tableName + " not found: " + e.getMessage());
throw new InvalidTableException(tableName);
}
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index cf897a6..596edde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -2289,4 +2290,8 @@ public abstract class BaseSemanticAnalyzer {
public void setCacheUsage(CacheUsage cacheUsage) {
this.cacheUsage = cacheUsage;
}
+
+ public DDLDescWithWriteId getAcidDdlDesc() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 158c072..3533e0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.plan.AlterResourcePlanDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
+import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition;
import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
import org.apache.hadoop.hive.ql.plan.AlterWMTriggerDesc;
@@ -122,6 +123,7 @@ import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc;
import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc;
import org.apache.hadoop.hive.ql.plan.CreateResourcePlanDesc;
import org.apache.hadoop.hive.ql.plan.CreateWMTriggerDesc;
+import org.apache.hadoop.hive.ql.plan.DDLDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DescDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DescFunctionDesc;
@@ -200,6 +202,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
private final Set<String> reservedPartitionValues;
private final HiveAuthorizationTaskFactory hiveAuthorizationTaskFactory;
private WriteEntity alterTableOutput;
+ // Equivalent to acidSinks, but for DDL operations that change data.
+ private DDLDesc.DDLDescWithWriteId ddlDescWithWriteId;
static {
TokenToTypeName.put(HiveParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME);
@@ -1579,7 +1583,6 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
truncateTblDesc.setLbCtx(lbCtx);
addInputsOutputsAlterTable(tableName, partSpec, AlterTableTypes.TRUNCATE);
-
ddlWork.setNeedLock(true);
TableDesc tblDesc = Utilities.getTableDesc(table);
// Write the output to temporary directory and move it to the final location at the end
@@ -1752,7 +1755,18 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
|| mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isPotentialMmSwitch);
- rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
+ DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), alterTblDesc);
+ if (isPotentialMmSwitch) {
+ this.ddlDescWithWriteId = alterTblDesc;
+ ddlWork.setNeedLock(true); // Hmm... why don't many other operations here need locks?
+ }
+
+ rootTasks.add(TaskFactory.get(ddlWork));
+ }
+
+ @Override
+ public DDLDescWithWriteId getAcidDdlDesc() {
+ return ddlDescWithWriteId;
}
private void analyzeAlterTableSerdeProps(ASTNode ast, String tableName,
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 4746c38..1f0ee01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.ImportCommitWork;
import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -482,8 +481,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
- EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm,
- Task<?> commitTask)
+ EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm)
throws MetaException, IOException, HiveException {
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
@@ -542,9 +540,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
x.getTasks().add(copyTask);
- if (commitTask != null) {
- loadPartTask.addDependentTask(commitTask);
- }
return addPartTask;
}
}
@@ -841,16 +836,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (table != null) {
if (table.isPartitioned()) {
x.getLOG().debug("table partitioned");
- Task<?> ict = createImportCommitTask(x.getConf(),
- table.getDbName(), table.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(table.getParameters()));
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
} else {
throw new SemanticException(
ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -880,12 +872,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
if (isPartitioned(tblDesc)) {
- Task<?> ict = createImportCommitTask(x.getConf(),
- tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
- replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ replicationSpec, x, writeId, stmtId, isSourceMm));
}
} else {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -922,14 +911,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return newTable;
}
- private static Task<?> createImportCommitTask(
- HiveConf conf, String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) {
- // TODO: noop, remove?
- Task<ImportCommitWork> ict = (!isMmTable) ? null : TaskFactory.get(
- new ImportCommitWork(dbName, tblName, writeId, stmtId), conf);
- return ict;
- }
-
/**
* Create tasks for repl import
*/
@@ -1024,13 +1005,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
- Task<?> ict = createImportCommitTask(x.getConf(),
- tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
if (updatedMetadata != null) {
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
}
@@ -1057,13 +1035,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
addPartitionDesc.setReplicationSpec(replicationSpec);
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
- Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask(
- x.getConf(), tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId,
- AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
if (updatedMetadata != null) {
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
}
@@ -1080,7 +1055,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.allowReplacementInto(ptn.getParameters())){
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
} else {
x.getTasks().add(alterSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 1f3eab7..3c6df1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -516,7 +516,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
String tableName,
Map<String, String> partSpec,
String replState,
- Task<? extends Serializable> preCursor) {
+ Task<? extends Serializable> preCursor) throws SemanticException {
HashMap<String, String> mapProp = new HashMap<>();
mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
@@ -564,7 +564,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private List<Task<? extends Serializable>> addUpdateReplStateTasks(
boolean isDatabaseLoad,
UpdatedMetaDataTracker updatedMetadata,
- List<Task<? extends Serializable>> importTasks) {
+ List<Task<? extends Serializable>> importTasks) throws SemanticException {
String replState = updatedMetadata.getReplicationState();
String dbName = updatedMetadata.getDatabase();
String tableName = updatedMetadata.getTable();
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
index d7b2247..a767796 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -33,9 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
import com.google.common.collect.ImmutableList;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -49,7 +49,7 @@ import java.util.Set;
*
*/
@Explain(displayName = "Alter Table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class AlterTableDesc extends DDLDesc implements Serializable {
+public class AlterTableDesc extends DDLDesc implements Serializable, DDLDesc.DDLDescWithWriteId {
private static final long serialVersionUID = 1L;
/**
@@ -124,7 +124,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
boolean isStoredAsSubDirectories = false;
List<String> skewedColNames;
List<List<String>> skewedColValues;
- Table table;
+ Table tableForSkewedColValidation;
boolean isDropIfExists = false;
boolean isTurnOffSorting = false;
boolean isCascade = false;
@@ -137,6 +137,7 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
List<SQLDefaultConstraint> defaultConstraintsCols;
List<SQLCheckConstraint> checkConstraintsCols;
ReplicationSpec replicationSpec;
+ private Long writeId = null;
public AlterTableDesc() {
}
@@ -150,12 +151,13 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* new column name
* @param newComment
* @param newType
+ * @throws SemanticException
*/
public AlterTableDesc(String tblName, HashMap<String, String> partSpec,
String oldColName, String newColName, String newType, String newComment,
- boolean first, String afterCol, boolean isCascade) {
+ boolean first, String afterCol, boolean isCascade) throws SemanticException {
super();
- oldName = tblName;
+ setOldName(tblName);
this.partSpec = partSpec;
this.oldColName = oldColName;
this.newColName = newColName;
@@ -172,9 +174,9 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
boolean first, String afterCol, boolean isCascade, List<SQLPrimaryKey> primaryKeyCols,
List<SQLForeignKey> foreignKeyCols, List<SQLUniqueConstraint> uniqueConstraintCols,
List<SQLNotNullConstraint> notNullConstraintCols, List<SQLDefaultConstraint> defaultConstraints,
- List<SQLCheckConstraint> checkConstraints) {
+ List<SQLCheckConstraint> checkConstraints) throws SemanticException {
super();
- oldName = tblName;
+ setOldName(tblName);
this.partSpec = partSpec;
this.oldColName = oldColName;
this.newColName = newColName;
@@ -201,10 +203,11 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* Flag to denote if current table can be a view
* @param replicationSpec
* Replication specification with current event ID
+ * @throws SemanticException
*/
- public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) {
+ public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) throws SemanticException {
op = AlterTableTypes.RENAME;
- this.oldName = oldName;
+ setOldName(oldName);
this.newName = newName;
this.expectView = expectView;
this.replicationSpec = replicationSpec;
@@ -215,11 +218,12 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* name of the table
* @param newCols
* new columns to be added
+ * @throws SemanticException
*/
public AlterTableDesc(String name, HashMap<String, String> partSpec, List<FieldSchema> newCols,
- AlterTableTypes alterType, boolean isCascade) {
+ AlterTableTypes alterType, boolean isCascade) throws SemanticException {
op = alterType;
- oldName = name;
+ setOldName(name);
this.newCols = new ArrayList<FieldSchema>(newCols);
this.partSpec = partSpec;
this.isCascade = isCascade;
@@ -267,12 +271,13 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* @param outputFormat
* new table output format
* @param partSpec
+ * @throws SemanticException
*/
public AlterTableDesc(String name, String inputFormat, String outputFormat,
- String serdeName, String storageHandler, HashMap<String, String> partSpec) {
+ String serdeName, String storageHandler, HashMap<String, String> partSpec) throws SemanticException {
super();
op = AlterTableTypes.ADDFILEFORMAT;
- oldName = name;
+ setOldName(name);
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serdeName = serdeName;
@@ -281,8 +286,8 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
}
public AlterTableDesc(String tableName, int numBuckets,
- List<String> bucketCols, List<Order> sortCols, HashMap<String, String> partSpec) {
- oldName = tableName;
+ List<String> bucketCols, List<Order> sortCols, HashMap<String, String> partSpec) throws SemanticException {
+ setOldName(tableName);
op = AlterTableTypes.ADDCLUSTERSORTCOLUMN;
numberBuckets = numBuckets;
bucketColumns = new ArrayList<String>(bucketCols);
@@ -290,47 +295,47 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
this.partSpec = partSpec;
}
- public AlterTableDesc(String tableName, boolean sortingOff, HashMap<String, String> partSpec) {
- oldName = tableName;
+ public AlterTableDesc(String tableName, boolean sortingOff, HashMap<String, String> partSpec) throws SemanticException {
+ setOldName(tableName);
op = AlterTableTypes.ADDCLUSTERSORTCOLUMN;
isTurnOffSorting = sortingOff;
this.partSpec = partSpec;
}
public AlterTableDesc(String tableName, String newLocation,
- HashMap<String, String> partSpec) {
+ HashMap<String, String> partSpec) throws SemanticException {
op = AlterTableTypes.ALTERLOCATION;
- this.oldName = tableName;
+ setOldName(tableName);
this.newLocation = newLocation;
this.partSpec = partSpec;
}
public AlterTableDesc(String tableName, Map<List<String>, String> locations,
- HashMap<String, String> partSpec) {
+ HashMap<String, String> partSpec) throws SemanticException {
op = AlterTableTypes.ALTERSKEWEDLOCATION;
- this.oldName = tableName;
+ setOldName(tableName);
this.skewedLocations = locations;
this.partSpec = partSpec;
}
public AlterTableDesc(String tableName, boolean turnOffSkewed,
- List<String> skewedColNames, List<List<String>> skewedColValues) {
- oldName = tableName;
+ List<String> skewedColNames, List<List<String>> skewedColValues) throws SemanticException {
+ setOldName(tableName);
op = AlterTableTypes.ADDSKEWEDBY;
this.isTurnOffSkewed = turnOffSkewed;
this.skewedColNames = new ArrayList<String>(skewedColNames);
this.skewedColValues = new ArrayList<List<String>>(skewedColValues);
}
- public AlterTableDesc(String tableName, HashMap<String, String> partSpec, int numBuckets) {
+ public AlterTableDesc(String tableName, HashMap<String, String> partSpec, int numBuckets) throws SemanticException {
op = AlterTableTypes.ALTERBUCKETNUM;
- this.oldName = tableName;
+ setOldName(tableName);
this.partSpec = partSpec;
this.numberBuckets = numBuckets;
}
- public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSpec replicationSpec) {
- this.oldName = tableName;
+ public AlterTableDesc(String tableName, String dropConstraintName, ReplicationSpec replicationSpec) throws SemanticException {
+ setOldName(tableName);
this.dropConstraintName = dropConstraintName;
this.replicationSpec = replicationSpec;
op = AlterTableTypes.DROPCONSTRAINT;
@@ -338,8 +343,8 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
public AlterTableDesc(String tableName, List<SQLPrimaryKey> primaryKeyCols,
List<SQLForeignKey> foreignKeyCols, List<SQLUniqueConstraint> uniqueConstraintCols,
- ReplicationSpec replicationSpec) {
- this.oldName = tableName;
+ ReplicationSpec replicationSpec) throws SemanticException {
+ setOldName(tableName);
this.primaryKeyCols = primaryKeyCols;
this.foreignKeyCols = foreignKeyCols;
this.uniqueConstraintCols = uniqueConstraintCols;
@@ -350,8 +355,8 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
public AlterTableDesc(String tableName, List<SQLPrimaryKey> primaryKeyCols,
List<SQLForeignKey> foreignKeyCols, List<SQLUniqueConstraint> uniqueConstraintCols,
List<SQLNotNullConstraint> notNullConstraintCols, List<SQLDefaultConstraint> defaultConstraints,
- List<SQLCheckConstraint> checkConstraints, ReplicationSpec replicationSpec) {
- this.oldName = tableName;
+ List<SQLCheckConstraint> checkConstraints, ReplicationSpec replicationSpec) throws SemanticException {
+ setOldName(tableName);
this.primaryKeyCols = primaryKeyCols;
this.foreignKeyCols = foreignKeyCols;
this.uniqueConstraintCols = uniqueConstraintCols;
@@ -384,8 +389,9 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* @param oldName
* the oldName to set
*/
- public void setOldName(String oldName) {
- this.oldName = oldName;
+ public void setOldName(String oldName) throws SemanticException {
+ // Make sure we qualify the name from the outset so there's no ambiguity.
+ this.oldName = String.join(".", Utilities.getDbTableName(oldName));
}
/**
@@ -848,26 +854,19 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* @throws SemanticException
*/
public void validate() throws SemanticException {
- if (null != table) {
+ if (null != tableForSkewedColValidation) {
/* Validate skewed information. */
ValidationUtility.validateSkewedInformation(
- ParseUtils.validateColumnNameUniqueness(table.getCols()), this.getSkewedColNames(),
- this.getSkewedColValues());
+ ParseUtils.validateColumnNameUniqueness(tableForSkewedColValidation.getCols()),
+ this.getSkewedColNames(), this.getSkewedColValues());
}
}
/**
- * @return the table
- */
- public Table getTable() {
- return table;
- }
-
- /**
* @param table the table to set
*/
public void setTable(Table table) {
- this.table = table;
+ this.tableForSkewedColValidation = table;
}
/**
@@ -929,4 +928,24 @@ public class AlterTableDesc extends DDLDesc implements Serializable {
* This can result in a "ALTER IF NEWER THAN" kind of semantic
*/
public ReplicationSpec getReplicationSpec(){ return this.replicationSpec; }
+
+ @Override
+ public void setWriteId(long writeId) {
+ this.writeId = writeId;
+ }
+
+ @Override
+ public String getFullTableName() {
+ return getOldName();
+ }
+
+ @Override
+ public boolean mayNeedWriteId() {
+ return getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS
+ && AcidUtils.isToInsertOnlyTable(null, getProps());
+ }
+
+ public Long getWriteId() {
+ return this.writeId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
index 65f4cf2..8941d97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLDesc.java
@@ -26,4 +26,10 @@ import java.io.Serializable;
*/
public abstract class DDLDesc implements Serializable {
private static final long serialVersionUID = 1L;
+
+ public static interface DDLDescWithWriteId {
+ void setWriteId(long writeId);
+ String getFullTableName();
+ boolean mayNeedWriteId();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 0926663..3c2f9d4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2453,4 +2453,22 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks);
}
+
+ @Test
+ public void testMmConversionLocks() throws Exception {
+ dropTable(new String[] {"T"});
+ CommandProcessorResponse cpr = driver.run("create table T (a int, b int) tblproperties('transactional'='false')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("insert into T values(0,2),(1,4)");
+ checkCmdOnDriver(cpr);
+
+ cpr = driver.compileAndRespond("ALTER TABLE T set tblproperties"
+ + "('transactional'='true', 'transactional_properties'='insert_only')", true);
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets X lock on T
+
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e1c6675/ql/src/test/results/clientpositive/mm_conversions.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_conversions.q.out b/ql/src/test/results/clientpositive/mm_conversions.q.out
deleted file mode 100644
index 4754710..0000000
--- a/ql/src/test/results/clientpositive/mm_conversions.q.out
+++ /dev/null
@@ -1,309 +0,0 @@
-PREHOOK: query: drop table intermediate
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table intermediate
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@intermediate
-POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@intermediate
-PREHOOK: query: insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=455
-POSTHOOK: query: insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=455
-POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=456
-POSTHOOK: query: insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=456
-POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=457
-POSTHOOK: query: insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=457
-POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: drop table simple_to_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table simple_to_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table simple_to_mm(key int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: create table simple_to_mm(key int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple_to_mm
-PREHOOK: query: insert into table simple_to_mm select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: insert into table simple_to_mm select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_to_mm
-POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from simple_to_mm s1 order by key
-PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-POSTHOOK: query: select * from simple_to_mm s1 order by key
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-0
-98
-100
-PREHOOK: query: alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-PREHOOK: type: ALTERTABLE_PROPERTIES
-PREHOOK: Input: default@simple_to_mm
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-POSTHOOK: type: ALTERTABLE_PROPERTIES
-POSTHOOK: Input: default@simple_to_mm
-POSTHOOK: Output: default@simple_to_mm
-PREHOOK: query: select * from simple_to_mm s2 order by key
-PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-POSTHOOK: query: select * from simple_to_mm s2 order by key
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-0
-98
-100
-PREHOOK: query: insert into table simple_to_mm select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: insert into table simple_to_mm select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_to_mm
-POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table simple_to_mm select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: insert into table simple_to_mm select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_to_mm
-POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from simple_to_mm s3 order by key
-PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-POSTHOOK: query: select * from simple_to_mm s3 order by key
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_to_mm
-#### A masked pattern was here ####
-0
-0
-0
-98
-98
-98
-100
-100
-100
-PREHOOK: query: drop table simple_to_mm
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@simple_to_mm
-PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: drop table simple_to_mm
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@simple_to_mm
-POSTHOOK: Output: default@simple_to_mm
-PREHOOK: query: drop table part_to_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table part_to_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@part_to_mm
-PREHOOK: query: insert into table part_to_mm partition(key_mm='455') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=455
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='455') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=455
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from part_to_mm s1 order by key, key_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Input: default@part_to_mm@key_mm=455
-PREHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_to_mm s1 order by key, key_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Input: default@part_to_mm@key_mm=455
-POSTHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-0 455
-0 456
-98 455
-98 456
-100 455
-100 456
-PREHOOK: query: alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-PREHOOK: type: ALTERTABLE_PROPERTIES
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only")
-POSTHOOK: type: ALTERTABLE_PROPERTIES
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Output: default@part_to_mm
-PREHOOK: query: select * from part_to_mm s2 order by key, key_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Input: default@part_to_mm@key_mm=455
-PREHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_to_mm s2 order by key, key_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Input: default@part_to_mm@key_mm=455
-POSTHOOK: Input: default@part_to_mm@key_mm=456
-#### A masked pattern was here ####
-0 455
-0 456
-98 455
-98 456
-100 455
-100 456
-PREHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='456') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=456
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table part_to_mm partition(key_mm='457') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@part_to_mm@key_mm=457
-POSTHOOK: query: insert into table part_to_mm partition(key_mm='457') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@part_to_mm@key_mm=457
-POSTHOOK: Lineage: part_to_mm PARTITION(key_mm=457).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from part_to_mm s3 order by key, key_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Input: default@part_to_mm@key_mm=455
-PREHOOK: Input: default@part_to_mm@key_mm=456
-PREHOOK: Input: default@part_to_mm@key_mm=457
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_to_mm s3 order by key, key_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Input: default@part_to_mm@key_mm=455
-POSTHOOK: Input: default@part_to_mm@key_mm=456
-POSTHOOK: Input: default@part_to_mm@key_mm=457
-#### A masked pattern was here ####
-0 455
-0 456
-0 456
-0 457
-98 455
-98 456
-98 456
-98 457
-100 455
-100 456
-100 456
-100 457
-PREHOOK: query: drop table part_to_mm
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@part_to_mm
-PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: drop table part_to_mm
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@part_to_mm
-POSTHOOK: Output: default@part_to_mm
-PREHOOK: query: drop table intermediate
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@intermediate
-PREHOOK: Output: default@intermediate
-POSTHOOK: query: drop table intermediate
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Output: default@intermediate