You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/08 01:52:50 UTC
[30/38] hive git commit: HIVE-14637 : edit or split MoveTask to
commit job results to metastore (Sergey Shelukhin)
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 4e44d49..bb7001a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1305,6 +1305,7 @@ public final class GenMapRedUtils {
//
// 2. Constructing a conditional task consisting of a move task and a map reduce task
//
+ // TODO# movetask is created here; handle MM tables
MoveWork dummyMv = new MoveWork(null, null, null,
new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
MapWork cplan;
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
index ee67443..e2887fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
@@ -71,6 +71,7 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
Set<Operator<? extends OperatorDesc>> ops = new HashSet<>();
+ /* TODO# wtf
if (currTask instanceof MapRedTask) {
MapRedTask mr = (MapRedTask) currTask;
ops.addAll(mr.getWork().getAllOperators());
@@ -84,7 +85,7 @@ public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
for (BaseWork w : sparkWork.getAllWork()) {
ops.addAll(w.getAllOperators());
}
- }
+ }*/
setOrAnnotateStats(ops, physicalContext.getParseContext());
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 9329e00..d5a6d2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1056,6 +1056,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
partSpec == null ? new HashMap<String, String>() : partSpec);
ltd.setLbCtx(lbCtx);
+ // TODO# movetask is created here; handle MM tables
Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
conf);
truncateTask.addDependentTask(moveTsk);
@@ -1668,6 +1669,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
partSpec == null ? new HashMap<String, String>() : partSpec);
ltd.setLbCtx(lbCtx);
+ // TODO# movetask is created here; handle MM tables
Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false),
conf);
mergeTask.addDependentTask(moveTsk);
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index d562ddf..6cefbfc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -326,6 +326,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
Utilities.getTableDesc(table), new TreeMap<String, String>(),
replace);
+ // TODO# movetask is created here; handle MM tables
Task<?> loadTableTask = TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, false), conf);
copyTask.addDependentTask(loadTableTask);
@@ -400,6 +401,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Utilities.getTableDesc(table),
partSpec.getPartSpec(), true);
loadTableWork.setInheritTableSpecs(false);
+ // TODO# movetask is created here; handle MM tables
Task<?> loadPartTask = TaskFactory.get(new MoveWork(
getInputs(), getOutputs(), loadTableWork, null, false),
conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index a49b813..9c9e6fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -270,6 +270,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
loadTableWork.setInheritTableSpecs(false);
}
+ // TODO# movetask is created here; handle MM tables
Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, true, isLocal), conf);
if (rTask != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 4353d3a..92ad50d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -331,14 +331,6 @@ public class ParseContext {
}
/**
- * @param loadTableWork
- * the loadTableWork to set
- */
- public void setLoadTableWork(List<LoadTableDesc> loadTableWork) {
- this.loadTableWork = loadTableWork;
- }
-
- /**
* @return the loadFileWork
*/
public List<LoadFileDesc> getLoadFileWork() {
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 499530e..b550235 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -267,7 +267,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
private final Map<JoinOperator, QBJoinTree> joinContext;
private final Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
private final HashMap<TableScanOperator, Table> topToTable;
- private final Map<FileSinkOperator, Table> fsopToTable;
private final List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
private final HashMap<TableScanOperator, Map<String, String>> topToTableProps;
private QB qb;
@@ -367,7 +366,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
// Must be deterministic order map for consistent q-test output across Java versions
topToTable = new LinkedHashMap<TableScanOperator, Table>();
- fsopToTable = new HashMap<FileSinkOperator, Table>();
reduceSinkOperatorsAddedByEnforceBucketingSorting = new ArrayList<ReduceSinkOperator>();
topToTableProps = new HashMap<TableScanOperator, Map<String, String>>();
destTableId = 1;
@@ -426,7 +424,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
opToPartToSkewedPruner.clear();
opToSamplePruner.clear();
nameToSplitSample.clear();
- fsopToTable.clear();
resultSchema = null;
createVwDesc = null;
viewsExpanded = null;
@@ -6547,6 +6544,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ListBucketingCtx lbCtx = null;
Map<String, String> partSpec = null;
boolean isMmTable = false;
+ Long mmWriteId = null;
switch (dest_type.intValue()) {
case QBMetaData.DEST_TABLE: {
@@ -6570,7 +6568,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
boolean isNonNativeTable = dest_tab.isNonNative();
- isMmTable = isMmTable(dest_tab);
+ isMmTable = AcidUtils.isMmTable(dest_tab);
if (isNonNativeTable || isMmTable) {
queryTmpdir = dest_path;
} else {
@@ -6603,9 +6601,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
acidOp = getAcidType(table_desc.getOutputFileFormatClass());
checkAcidConstraints(qb, table_desc, dest_tab);
}
+ try {
+ mmWriteId = getMmWriteId(dest_tab, isMmTable);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
dest_tab.getDbName(), dest_tab.getTableName());
- ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, isMmTable);
+ ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, mmWriteId);
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
} else {
@@ -6638,7 +6641,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
.getAuthority(), partPath.toUri().getPath());
- isMmTable = isMmTable(dest_tab);
+ isMmTable = AcidUtils.isMmTable(dest_tab);
queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path);
Utilities.LOG14535.info("createFS for partition specifying " + queryTmpdir + " from " + dest_path);
table_desc = Utilities.getTableDesc(dest_tab);
@@ -6658,7 +6661,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
acidOp = getAcidType(table_desc.getOutputFileFormatClass());
checkAcidConstraints(qb, table_desc, dest_tab);
}
- ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, isMmTable);
+ try {
+ mmWriteId = getMmWriteId(dest_tab, isMmTable);
+ } catch (HiveException e) {
+ // How is this a semantic exception? Stupid Java and signatures.
+ throw new SemanticException(e);
+ }
+ ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, mmWriteId);
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
ltd.setLbCtx(lbCtx);
@@ -6856,13 +6865,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
}
+ assert isMmTable == (mmWriteId != null);
FileSinkDesc fileSinkDesc = createFileSinkDesc(table_desc, dest_part,
dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
- canBeMerged, isMmTable);
- if (isMmTable) {
- fileSinkDesc.setExecutionPrefix(ctx.getExecutionPrefix());
- }
+ canBeMerged, mmWriteId);
Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
fileSinkDesc, fsRS, input), inputRR);
@@ -6876,7 +6883,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
FileSinkOperator fso = (FileSinkOperator) output;
fso.getConf().setTable(dest_tab);
- fsopToTable.put(fso, dest_tab);
// the following code is used to collect column stats when
// hive.stats.autogather=true
// and it is an insert overwrite or insert into table
@@ -6895,10 +6901,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return output;
}
- private static boolean isMmTable(Table table) {
- // TODO: perhaps it should be a 3rd value for 'transactional'?
- String value = table.getProperty(hive_metastoreConstants.TABLE_IS_MM);
- return value != null && value.equalsIgnoreCase("true");
+ private static Long getMmWriteId(Table tbl, boolean isMmTable) throws HiveException {
+ if (!isMmTable) return null;
+ // Get the next write ID for this table. We will prefix files with this write ID.
+ return Hive.get().getNextTableWriteId(tbl.getDbName(), tbl.getTableName());
}
private FileSinkDesc createFileSinkDesc(TableDesc table_desc,
@@ -6906,7 +6912,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean destTableIsAcid, boolean destTableIsTemporary,
boolean destTableIsMaterialization, Path queryTmpdir,
SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
- RowSchema fsRS, boolean canBeMerged, boolean isMmTable) throws SemanticException {
+ RowSchema fsRS, boolean canBeMerged, Long mmWriteId) throws SemanticException {
FileSinkDesc fileSinkDesc = new FileSinkDesc(
queryTmpdir,
table_desc,
@@ -6919,7 +6925,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
rsCtx.getPartnCols(),
dpCtx,
dest_path,
- isMmTable);
+ mmWriteId);
fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
// If this is an insert, update, or delete on an ACID table then mark that so the
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index fb5ca57..723719d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -206,6 +206,7 @@ public abstract class TaskCompiler {
}
} else if (!isCStats) {
for (LoadTableDesc ltd : loadTableWork) {
+ // TODO# HERE
Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
mvTask.add(tsk);
// Check to see if we are stale'ing any indexes and auto-update them if we want
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index f51999d..63cc0cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -96,8 +96,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
private transient Table table;
private Path destPath;
private boolean isHiveServerQuery;
- private boolean isMmTable;
- private String executionPrefix;
+ private Long mmWriteId;
public FileSinkDesc() {
}
@@ -109,7 +108,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
final boolean compressed, final int destTableId, final boolean multiFileSpray,
final boolean canBeMerged, final int numFiles, final int totalFiles,
final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
- boolean isMmTable) {
+ Long mmWriteId) {
this.dirName = dirName;
this.tableInfo = tableInfo;
@@ -123,7 +122,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
this.dpCtx = dpCtx;
this.dpSortState = DPSortState.NONE;
this.destPath = destPath;
- this.isMmTable = isMmTable;
+ this.mmWriteId = mmWriteId;
}
public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -145,7 +144,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public Object clone() throws CloneNotSupportedException {
FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
- partitionCols, dpCtx, destPath, isMmTable);
+ partitionCols, dpCtx, destPath, mmWriteId);
ret.setCompressCodec(compressCodec);
ret.setCompressType(compressType);
ret.setGatherStats(gatherStats);
@@ -159,7 +158,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
ret.setWriteType(writeType);
ret.setTransactionId(txnId);
ret.setStatsTmpDir(statsTmpDir);
- ret.setExecutionPrefix(executionPrefix);
return ret;
}
@@ -254,7 +252,11 @@ public class FileSinkDesc extends AbstractOperatorDesc {
}
public boolean isMmTable() {
- return isMmTable;
+ return mmWriteId != null;
+ }
+
+ public long getMmWriteId() {
+ return mmWriteId;
}
public boolean isMaterialization() {
@@ -482,12 +484,4 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public void setStatsTmpDir(String statsCollectionTempDir) {
this.statsTmpDir = statsCollectionTempDir;
}
-
- public String getExecutionPrefix() {
- return this.executionPrefix;
- }
-
- public void setExecutionPrefix(String value) {
- this.executionPrefix = value;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 3b49197..fc8726c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -42,7 +42,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
// Need to remember whether this is an acid compliant operation, and if so whether it is an
// insert, update, or delete.
private AcidUtils.Operation writeType;
- private boolean isMmTable;
+ private Long mmWriteId;
// TODO: the below seems like they should just be combined into partitionDesc
private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@ -52,10 +52,10 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec,
final boolean replace,
- final AcidUtils.Operation writeType, boolean isMmTable) {
+ final AcidUtils.Operation writeType, Long mmWriteId) {
super(sourcePath);
Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
- init(table, partitionSpec, replace, writeType, isMmTable);
+ init(table, partitionSpec, replace, writeType, mmWriteId);
}
/**
@@ -70,15 +70,14 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
final Map<String, String> partitionSpec,
final boolean replace) {
// TODO# we assume mm=false here
- this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, false);
+ this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, null);
}
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec,
- final AcidUtils.Operation writeType, boolean isMmTable) {
- // TODO# we assume mm=false here
- this(sourcePath, table, partitionSpec, true, writeType, isMmTable);
+ final AcidUtils.Operation writeType, Long mmWriteId) {
+ this(sourcePath, table, partitionSpec, true, writeType, mmWriteId);
}
/**
@@ -91,22 +90,21 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec) {
// TODO# we assume mm=false here
- this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, false);
+ this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, null);
}
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final DynamicPartitionCtx dpCtx,
final AcidUtils.Operation writeType,
- boolean isReplace,
- boolean isMmTable) {
+ boolean isReplace, Long mmWriteId) {
super(sourcePath);
Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
this.dpCtx = dpCtx;
if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
- init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable);
+ init(table, dpCtx.getPartSpec(), isReplace, writeType, mmWriteId);
} else {
- init(table, new LinkedHashMap<String, String>(), isReplace, writeType, isMmTable);
+ init(table, new LinkedHashMap<String, String>(), isReplace, writeType, mmWriteId);
}
}
@@ -114,12 +112,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec,
final boolean replace,
- AcidUtils.Operation writeType, boolean isMmTable) {
+ AcidUtils.Operation writeType, Long mmWriteId) {
this.table = table;
this.partitionSpec = partitionSpec;
this.replace = replace;
this.writeType = writeType;
- this.isMmTable = isMmTable;
+ this.mmWriteId = mmWriteId;
}
@Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -147,7 +145,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
@Explain(displayName = "micromanaged table")
public boolean isMmTable() {
- return isMmTable;
+ return mmWriteId != null;
}
public void setReplace(boolean replace) {
@@ -187,4 +185,8 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
public AcidUtils.Operation getWriteType() {
return writeType;
}
+
+ public Long getMmWriteId() {
+ return mmWriteId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 227b0d2..9f498c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
*/
@Explain(displayName = "Move Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public class MoveWork implements Serializable {
- // TODO# all the places where MoveWork is created need to be handled.
private static final long serialVersionUID = 1L;
private LoadTableDesc loadTableWork;
private LoadFileDesc loadFileWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index d3c3611..066d2b6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -146,7 +146,7 @@ public class TestExecDriver extends TestCase {
db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true);
db.createTable(src, cols, null, TextInputFormat.class,
HiveIgnoreKeyTextOutputFormat.class);
- db.loadTable(hadoopDataFile[i], src, false, true, false, false, false);
+ db.loadTable(hadoopDataFile[i], src, false, true, false, false, false, null);
i++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 1c27873..909114c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -285,7 +285,7 @@ public class TestFileSinkOperator {
partColMap.put(PARTCOL_NAME, null);
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
//todo: does this need the finalDestination?
- desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, false);
+ desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null);
} else {
desc = new FileSinkDesc(basePath, tableDesc, false);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q
index 11259cb..8d19df6 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -4,18 +4,30 @@ set hive.exec.dynamic.partition.mode=nonstrict;
set hive.fetch.task.conversion=none;
set tez.grouping.min-size=1;
set tez.grouping.max-size=2;
-set hive.tez.auto.reducer.parallelism=false;
+set hive.tez.auto.reducer.parallelism=false;
+
+drop table part_mm;
+drop table simple_mm;
+drop table intermediate;
create table intermediate(key int) partitioned by (p int) stored as orc;
-insert into table intermediate partition(p='455') select key from src limit 3;
-insert into table intermediate partition(p='456') select key from src limit 3;
-insert into table intermediate partition(p='457') select key from src limit 3;
+insert into table intermediate partition(p='455') select key from src limit 2;
+insert into table intermediate partition(p='456') select key from src limit 2;
-create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+
+explain insert into table part_mm partition(key_mm='455') select key from intermediate;
+insert into table part_mm partition(key_mm='455') select key from intermediate;
+insert into table part_mm partition(key_mm='456') select key from intermediate;
+insert into table part_mm partition(key_mm='455') select key from intermediate;
+select * from part_mm;
-explain insert into table simple_mm partition(key_mm='455') select key from intermediate;
-insert into table simple_mm partition(key_mm='455') select key from intermediate;
+create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true');
+insert into table simple_mm select key from intermediate;
+insert into table simple_mm select key from intermediate;
+select * from simple_mm;
+drop table part_mm;
drop table simple_mm;
drop table intermediate;
http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index 8f1af4c..f357020 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -1,3 +1,15 @@
+PREHOOK: query: drop table part_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table part_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table intermediate
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table intermediate
+POSTHOOK: type: DROPTABLE
PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
@@ -6,44 +18,35 @@ POSTHOOK: query: create table intermediate(key int) partitioned by (p int) store
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@intermediate
-PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 2
PREHOOK: type: QUERY
PREHOOK: Input: default@src
PREHOOK: Output: default@intermediate@p=455
-POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
POSTHOOK: Output: default@intermediate@p=455
POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
+PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 2
PREHOOK: type: QUERY
PREHOOK: Input: default@src
PREHOOK: Output: default@intermediate@p=456
-POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
+POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
POSTHOOK: Output: default@intermediate@p=456
POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: default@intermediate@p=457
-POSTHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: default@intermediate@p=457
-POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
-PREHOOK: Output: default@simple_mm
-POSTHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@part_mm
+POSTHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple_mm
-PREHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: Output: default@part_mm
+PREHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
PREHOOK: type: QUERY
-POSTHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -60,19 +63,19 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: intermediate
- Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: int)
outputColumnNames: _col0
- Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
- name: default.simple_mm
+ name: default.part_mm
Execution mode: llap
LLAP IO: all inputs
@@ -89,27 +92,133 @@ STAGE PLANS:
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
- name: default.simple_mm
+ name: default.part_mm
micromanaged table: true
Stage: Stage-3
Stats-Aggr Operator
-PREHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=456
+POSTHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=456
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from part_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@part_mm
+PREHOOK: Input: default@part_mm@key_mm=455
+PREHOOK: Input: default@part_mm@key_mm=456
+#### A masked pattern was here ####
+POSTHOOK: query: select * from part_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@part_mm
+POSTHOOK: Input: default@part_mm@key_mm=455
+POSTHOOK: Input: default@part_mm@key_mm=456
+#### A masked pattern was here ####
+0 455
+455 455
+0 455
+455 455
+0 455
+455 455
+0 455
+455 455
+0 456
+455 456
+0 456
+455 456
+PREHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: insert into table simple_mm select key from intermediate
PREHOOK: type: QUERY
PREHOOK: Input: default@intermediate
PREHOOK: Input: default@intermediate@p=455
PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Input: default@intermediate@p=457
-PREHOOK: Output: default@simple_mm@key_mm=455
-POSTHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert into table simple_mm select key from intermediate
POSTHOOK: type: QUERY
POSTHOOK: Input: default@intermediate
POSTHOOK: Input: default@intermediate@p=455
POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Input: default@intermediate@p=457
-POSTHOOK: Output: default@simple_mm@key_mm=455
-POSTHOOK: Lineage: simple_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table simple_mm select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert into table simple_mm select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+0
+455
+0
+455
+0
+455
+0
+455
+PREHOOK: query: drop table part_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@part_mm
+PREHOOK: Output: default@part_mm
+POSTHOOK: query: drop table part_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@part_mm
+POSTHOOK: Output: default@part_mm
PREHOOK: query: drop table simple_mm
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@simple_mm