You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/02 02:03:39 UTC
[40/50] [abbrv] hive git commit: HIVE-14671 : merge master into
hive-14535 (Sergey Shelukhin) UNVERIFIED (build only)
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5107a89,24d1681..a777475
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@@ -93,8 -96,8 +96,9 @@@ import org.apache.hadoop.hive.ql.plan.F
import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
@@@ -1358,45 -1350,38 +1362,41 @@@ public final class GenMapRedUtils
cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
// NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
// know if merge MR2 will be triggered at execution time
- Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOutput);
+ MoveWork dummyMv = null;
+ if (srcMmWriteId == null) {
+ // Only create the movework for non-MM table. No action needed for a MM table.
+ Utilities.LOG14535.info("creating dummy movetask for merge (with lfd)");
+ dummyMv = new MoveWork(null, null, null,
+ new LoadFileDesc(inputDirName, finalName, true, null, null), false);
+ } else {
+ // TODO# create the noop MoveWork to avoid q file changes for now. else should be removed.
+ dummyMv = new MoveWork(null, null, null,
+ new LoadFileDesc(inputDirName, finalName, true, null, null), false);
+ dummyMv.setNoop(true);
+ }
- ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
- fsInputDesc.getMergeInputDirName().toString());
-
- // keep the dynamic partition context in conditional task resolver context
- ConditionalResolverMergeFilesCtx mrCtx =
- (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
- mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
- mrCtx.setLbCtx(fsInputDesc.getLbCtx());
-
- //
- // 3. add the moveTask as the children of the conditional task
- //
+ // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the
+ // MM directory, the original MoveTask still commits based on the parent. Note that this path
+ // can only be triggered for a merge that's part of insert for now; MM tables do not support
+ // concatenate. Keeping the old logic for non-MM tables with temp directories and stuff.
+ // TODO# is this correct?
+ Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName;
+ Utilities.LOG14535.info("Looking for MoveTask to make it dependant on the conditional tasks");
++
+ Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
+ mvTasks, fsopPath, fsInputDesc.isMmTable());
++ // TODO# questionable master merge here
+ ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
- fsInputDesc.getFinalDirName(), finalName, mvTask, dependencyTask);
++ fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask);
+
+ // keep the dynamic partition context in conditional task resolver context
+ ConditionalResolverMergeFilesCtx mrCtx =
+ (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+ mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+ mrCtx.setLbCtx(fsInputDesc.getLbCtx());
- }
-
- /**
- * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
- * possible subtrees branching from the ConditionalTask.
- *
- * @param newOutput
- * @param cndTsk
- * @param mvTasks
- * @param hconf
- * @param dependencyTask
- */
- public static void linkMoveTask(FileSinkOperator newOutput,
- ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf,
- DependencyCollectionTask dependencyTask) {
-
- Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
-- for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
- linkMoveTask(mvTask, tsk, conf, dependencyTask);
- linkMoveTask(mvTask, tsk, hconf, dependencyTask);
-- }
}
++
/**
* Follows the task tree down from task and makes all leaves parents of mvTask
*
@@@ -1644,15 -1706,29 +1725,33 @@@
* MoveWork for the move task
* @param mergeWork
* MapredWork for the merge task.
- * @param inputPath
+ * @param condInputPath
* the input directory of the merge/move task
+ * @param condOutputPath
+ * the output directory of the merge/move task
+ * @param moveTaskToLink
+ * a MoveTask that may be linked to the conditional sub-tasks
+ * @param dependencyTask
+ * a dependency task that may be linked to the conditional sub-tasks
* @return The conditional task
*/
+ @SuppressWarnings("unchecked")
- public static ConditionalTask createCondTask(HiveConf conf,
- Task<? extends Serializable> currTask, MoveWork mvWork,
- Serializable mergeWork, String inputPath) {
- Utilities.LOG14535.info("Creating conditional merge task for " + inputPath);
+ private static ConditionalTask createCondTask(HiveConf conf,
- Task<? extends Serializable> currTask, MoveWork dummyMoveWork, Serializable mergeWork,
- Path condInputPath, Path condOutputPath, Task<MoveWork> moveTaskToLink, DependencyCollectionTask dependencyTask) {
++ Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork,
++ Path condInputPath, Path condOutputPath, Task<MoveWork> moveTaskToLink,
++ DependencyCollectionTask dependencyTask) {
++ Utilities.LOG14535.info("Creating conditional merge task for " + condInputPath);
++ // Create a dummy task if no move is needed.
++ Serializable moveWork = mvWork != null ? mvWork : new DependencyCollectionWork();
+
++ // TODO: this should never happen for mm tables.
+ boolean shouldMergeMovePaths = (moveTaskToLink != null && dependencyTask == null
+ && shouldMergeMovePaths(conf, condInputPath, condOutputPath, moveTaskToLink.getWork()));
+
- MoveWork workForMoveOnlyTask;
++ Serializable workForMoveOnlyTask = moveWork;
+ if (shouldMergeMovePaths) {
+ workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork());
- } else {
- workForMoveOnlyTask = dummyMoveWork;
+ }
// There are 3 options for this ConditionalTask:
// 1) Merge the partitions
@@@ -1660,14 -1736,10 +1759,12 @@@
// 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
// merge others) in this case the merge is done first followed by the move to prevent
// conflicts.
+ // TODO: if we are not dealing with concatenate DDL, we should not create a merge+move path
+ // because it should be impossible to get incompatible outputs.
- // Create a dummy task if no move is needed.
- Serializable moveWork = mvWork != null ? mvWork : new DependencyCollectionWork();
Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
- Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(moveWork, conf);
+ Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(workForMoveOnlyTask, conf);
Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
- Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(dummyMoveWork, conf);
+ Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(moveWork, conf);
// NOTE! It is necessary merge task is the parent of the move task, and not
// the other way around, for the proper execution of the execute method of
@@@ -1697,6 -1769,20 +1794,21 @@@
// make the conditional task as the child of the current leaf task
currTask.addDependentTask(cndTsk);
+ if (shouldMergeMovePaths) {
+ // If a new MoveWork was created, then we should link all dependent tasks from the MoveWork to link.
+ if (moveTaskToLink.getDependentTasks() != null) {
+ for (Task dependentTask : moveTaskToLink.getDependentTasks()) {
+ moveOnlyMoveTask.addDependentTask(dependentTask);
+ }
+ }
+ } else {
+ addDependentMoveTasks(moveTaskToLink, conf, moveOnlyMoveTask, dependencyTask);
+ }
+
++
+ addDependentMoveTasks(moveTaskToLink, conf, mergeOnlyMergeTask, dependencyTask);
+ addDependentMoveTasks(moveTaskToLink, conf, mergeAndMoveMoveTask, dependencyTask);
+
return cndTsk;
}
@@@ -1820,30 -1906,26 +1932,29 @@@
Path dest = null;
+ FileSinkDesc fileSinkDesc = fsOp.getConf();
+ boolean isMmTable = fileSinkDesc.isMmTable();
if (chDir) {
- FileSinkDesc fileSinkDesc = fsOp.getConf();
- dest = fileSinkDesc.getFinalDirName();
-
- // generate the temporary file
- // it must be on the same file system as the current destination
- Context baseCtx = parseCtx.getContext();
-
- // Create the required temporary file in the HDFS location if the destination
- // path of the FileSinkOperator table is a blobstore path.
- Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath(), true);
--
- // Change all the linked file sink descriptors
- if (fileSinkDesc.isLinkedFileSink()) {
- for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
- fsConf.setParentDir(tmpDir);
- fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName()));
+ dest = fileSinkDesc.getMergeInputDirName();
+ if (!isMmTable) {
+ // generate the temporary file
+ // it must be on the same file system as the current destination
+ Context baseCtx = parseCtx.getContext();
+
+ // Create the required temporary file in the HDFS location if the destination
+ // path of the FileSinkOperator table is a blobstore path.
- Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
++ Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath(), true);
+
+ // Change all the linked file sink descriptors
+ if (fileSinkDesc.isLinkedFileSink()) {
+ for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) {
+ fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName()));
+ Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; dest was " + fileSinkDesc.getDestPath());
+ }
+ } else {
+ fileSinkDesc.setDirName(tmpDir);
+ Utilities.LOG14535.info("createMoveTask setting tmpDir chDir " + tmpDir + "; dest was " + fileSinkDesc.getDestPath());
}
- } else {
- fileSinkDesc.setDirName(tmpDir);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
index 5a9c72f,32d1de1..b58db88
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
@@@ -318,11 -342,6 +342,12 @@@ public class StatsOptimizer extends Tra
return null; // todo we can collapse this part of tree into single TS
}
++ /* TODO# seems to be removed in master?
+ Table tbl = tsOp.getConf().getTableMetadata();
+ if (AcidUtils.isFullAcidTable(tbl)) {
+ Logger.info("Table " + tbl.getTableName() + " is ACID table. Skip StatsOptimizer.");
+ return null;
- }
++ }*/
List<Object> oneRow = new ArrayList<Object>();
Hive hive = Hive.get(pctx.getConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 28c8eea,796ccc8..103f822
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@@ -211,30 -209,31 +210,35 @@@ public class EximUtil
}
}
- public static String relativeToAbsolutePath(HiveConf conf, String location) throws SemanticException {
- boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE)
- || conf.getBoolVar(HiveConf.ConfVars.HIVEEXIMTESTMODE);
- if (testMode) {
- URI uri = new Path(location).toUri();
- String scheme = uri.getScheme();
- String authority = uri.getAuthority();
- String path = uri.getPath();
- if (!path.startsWith("/")) {
- path = (new Path(System.getProperty("test.tmp.dir"),
- path)).toUri().getPath();
- }
- if (StringUtils.isEmpty(scheme)) {
+ public static String relativeToAbsolutePath(HiveConf conf, String location)
+ throws SemanticException {
+ try {
- boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
++ boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE)
++ || conf.getBoolVar(HiveConf.ConfVars.HIVEEXIMTESTMODE);;
+ if (testMode) {
+ URI uri = new Path(location).toUri();
+ FileSystem fs = FileSystem.get(uri, conf);
+ String scheme = fs.getScheme();
+ String authority = uri.getAuthority();
+ String path = uri.getPath();
+ if (!path.startsWith("/")) {
+ path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath();
+ }
++ if (StringUtils.isEmpty(scheme)) {
+ scheme = "pfile";
++ }
+ try {
+ uri = new URI(scheme, authority, path, null, null);
+ } catch (URISyntaxException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+ return uri.toString();
+ } else {
+ // no-op for non-test mode for now
+ return location;
}
- try {
- uri = new URI(scheme, authority, path, null, null);
- } catch (URISyntaxException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
- }
- return uri.toString();
- } else {
- //no-op for non-test mode for now
- return location;
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.IO_ERROR.getMsg() + ": " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index b46f615,08bad63..b5820d6
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@@ -235,25 -202,13 +235,26 @@@ public class ExportSemanticAnalyzer ext
}
} else {
Path fromPath = ts.tableHandle.getDataLocation();
- Path toDataPath = new Path(parentPath, "data");
+ Path toDataPath = new Path(parentPath, EximUtil.DATA_PATH_NAME);
- Task<? extends Serializable> rTask =
- ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
- rootTasks.add(rTask);
+ Task<?> copyTask = null;
+ if (replicationSpec.isInReplicationScope()) {
+ if (isMmTable) {
+ // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
+ throw new SemanticException(
+ "Not supported right now because Replication is completely screwed");
+ }
+ copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
+ } else {
++ // TODO# master merge - did master remove this path or did it never exit? we need it for MM
+ CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath, conf);
+ copyTask = TaskFactory.get(cw, conf);
+ }
+ rootTasks.add(copyTask);
inputs.add(new ReadEntity(ts.tableHandle));
}
- outputs.add(toWriteEntity(parentPath,conf));
+ outputs.add(toWriteEntity(parentPath, conf));
+ } catch (HiveException | IOException ex) {
+ throw new SemanticException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index bceef45,7bb48a9..6df9ad5
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -207,12 -213,12 +218,16 @@@ public class ImportSemanticAnalyzer ext
// Create table associated with the import
// Executed if relevant, and used to contain all the other details about the table if not.
CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
+ boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
+ if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
+ tblDesc.setReplicationSpec(replicationSpec);
+ }
+
if (isExternalSet){
+ if (isSourceMm) {
+ throw new SemanticException("Cannot import an MM table as external");
+ }
tblDesc.setExternal(isExternalSet);
// This condition-check could have been avoided, but to honour the old
// default of not calling if it wasn't set, we retain that behaviour.
@@@ -296,8 -296,8 +316,8 @@@
} else {
createReplImportTasks(
tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, waitOnCreateDb, table,
+ isPartSpecSet, replicationSpec, waitOnPrecursor, table,
- fromURI, fs, wh, x);
+ fromURI, fs, wh, x, mmWriteId, isSourceMm);
}
return tableExists;
}
@@@ -355,35 -355,16 +375,35 @@@
return tblDesc;
}
+
private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
- ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
+ ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
+ Long mmWriteId, boolean isSourceMm) {
- Path dataPath = new Path(fromURI.toString(), "data");
+ Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
- Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
- LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
- Utilities.getTableDesc(table), new TreeMap<String, String>(),
- replace);
- Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
- x.getOutputs(), loadTableWork, null, false), x.getConf());
+ Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtPath)
+ : new Path(tgtPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
+ Utilities.LOG14535.info("adding import work for table with source location: "
+ + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm "
+ + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
+
+ Task<?> copyTask = null;
+ if (replicationSpec.isInReplicationScope()) {
+ if (isSourceMm || mmWriteId != null) {
+ // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
+ throw new RuntimeException(
+ "Not supported right now because Replication is completely screwed");
+ }
- ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
++ copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
+ } else {
+ CopyWork cw = new CopyWork(dataPath, destPath, false);
+ cw.setSkipSourceMmDirs(isSourceMm);
+ copyTask = TaskFactory.get(cw, x.getConf());
+ }
+
+ LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
+ Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId);
+ MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
+ Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
return loadTableTask;
@@@ -861,12 -814,12 +887,12 @@@
private static void createReplImportTasks(
CreateTableDesc tblDesc,
List<AddPartitionDesc> partitionDescs,
- boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+ boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor,
Table table, URI fromURI, FileSystem fs, Warehouse wh,
- EximUtil.SemanticAnalyzerWrapperContext x)
+ EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
throws HiveException, URISyntaxException, IOException, MetaException {
- Task dr = null;
+ Task<?> dr = null;
WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){
@@@ -931,11 -884,10 +957,12 @@@
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
+ Task<?> ict = createImportCommitTask(
+ tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+ addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
+ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
}
} else {
x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@@ -955,10 -907,10 +982,11 @@@
if (table.isPartitioned()) {
x.getLOG().debug("table partitioned");
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+ addPartitionDesc.setReplicationSpec(replicationSpec);
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
-
+ Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask(
+ tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
@@@ -997,10 -949,9 +1025,10 @@@
return; // silently return, table is newer than our replacement.
}
if (!replicationSpec.isMetadataOnly()) {
- loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
+ // repl-imports are replace-into
+ loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x, mmWriteId, isSourceMm);
} else {
- x.getTasks().add(alterTableTask(tblDesc, x));
+ x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 31bee14,248dd63..f5b1c23
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -6585,13 -6735,11 +6693,13 @@@ public class SemanticAnalyzer extends B
}
boolean isNonNativeTable = dest_tab.isNonNative();
- if (isNonNativeTable) {
+ isMmTable = MetaStoreUtils.isInsertOnlyTable(dest_tab.getParameters());
+ if (isNonNativeTable || isMmTable) {
queryTmpdir = dest_path;
} else {
- queryTmpdir = ctx.getTempDirForPath(dest_path);
+ queryTmpdir = ctx.getTempDirForPath(dest_path, true);
}
+ Utilities.LOG14535.info("create filesink w/DEST_TABLE specifying " + queryTmpdir + " from " + dest_path);
if (dpCtx != null) {
// set the root of the temporary path where dynamic partition columns will populate
dpCtx.setRootPath(queryTmpdir);
@@@ -6632,10 -6775,48 +6740,11 @@@
// This is a non-native table.
// We need to set stats as inaccurate.
setStatsForNonNativeTable(dest_tab);
+ createInsertDesc(dest_tab, !qb.getParseInfo().isInsertIntoTable(dest_tab.getTableName()));
}
- WriteEntity output = null;
-
- // Here only register the whole table for post-exec hook if no DP present
- // in the case of DP, we will register WriteEntity in MoveTask when the
- // list of dynamically created partitions are known.
- if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
- if (!outputs.add(output)) {
- throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
- .getMsg(dest_tab.getTableName()));
- }
- }
- if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
- // No static partition specified
- if (dpCtx.getNumSPCols() == 0) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
- outputs.add(output);
- output.setDynamicPartitionWrite(true);
- }
- // part of the partition specified
- // Create a DummyPartition in this case. Since, the metastore does not store partial
- // partitions currently, we need to store dummy partitions
- else {
- try {
- String ppath = dpCtx.getSPPath();
- ppath = ppath.substring(0, ppath.length() - 1);
- DummyPartition p =
- new DummyPartition(dest_tab, dest_tab.getDbName()
- + "@" + dest_tab.getTableName() + "@" + ppath,
- partSpec);
- output = new WriteEntity(p, getWriteType(dest), false);
- output.setDynamicPartitionWrite(true);
- outputs.add(output);
- } catch (HiveException e) {
- throw new SemanticException(e.getMessage(), e);
- }
- }
- }
-
+ WriteEntity output = generateTableWriteEntity(
+ dest, dest_tab, partSpec, ltd, dpCtx, isNonNativeTable);
ctx.getLoadTableOutputMap().put(ltd, output);
break;
}
@@@ -6657,9 -6858,7 +6766,9 @@@
dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
.getAuthority(), partPath.toUri().getPath());
- queryTmpdir = ctx.getTempDirForPath(dest_path, true);
+ isMmTable = MetaStoreUtils.isInsertOnlyTable(dest_tab.getParameters());
- queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path);
++ queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path, true);
+ Utilities.LOG14535.info("create filesink w/DEST_PARTITION specifying " + queryTmpdir + " from " + dest_path);
table_desc = Utilities.getTableDesc(dest_tab);
// Add sorting/bucketing if needed
@@@ -6703,6 -6896,26 +6812,8 @@@
case QBMetaData.DEST_DFS_FILE: {
dest_path = new Path(qbm.getDestFileForAlias(dest));
- if (isLocal) {
- // for local directory - we always write to map-red intermediate
- // store and then copy to local fs
- queryTmpdir = ctx.getMRTmpPath();
- } else {
- // otherwise write to the file system implied by the directory
- // no copy is required. we may want to revisit this policy in future
-
- try {
- Path qPath = FileUtils.makeQualified(dest_path, conf);
- queryTmpdir = ctx.getTempDirForPath(qPath, true);
- } catch (Exception e) {
- throw new SemanticException("Error creating temporary folder on: "
- + dest_path, e);
- }
- }
- String cols = "";
- String colTypes = "";
+ ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
+
// CTAS case: the file output format and serde are defined by the create
// table command rather than taking the default value
List<FieldSchema> field_schemas = null;
@@@ -6725,21 -6930,56 +6836,21 @@@
destTableIsTemporary = false;
}
- boolean first = true;
- for (ColumnInfo colInfo : colInfos) {
- String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
-
- if (nm[1] != null) { // non-null column alias
- colInfo.setAlias(nm[1]);
- }
-
- String colName = colInfo.getInternalName(); //default column name
- if (field_schemas != null) {
- FieldSchema col = new FieldSchema();
- if (!("".equals(nm[0])) && nm[1] != null) {
- colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove ``
- }
- colName = fixCtasColumnName(colName);
- col.setName(colName);
- String typeName = colInfo.getType().getTypeName();
- // CTAS should NOT create a VOID type
- if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
- throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE
- .getMsg(colName));
- }
- col.setType(typeName);
- field_schemas.add(col);
- }
-
- if (!first) {
- cols = cols.concat(",");
- colTypes = colTypes.concat(":");
- }
-
- first = false;
- cols = cols.concat(colName);
-
- // Replace VOID type with string when the output is a temp table or
- // local files.
- // A VOID type can be generated under the query:
- //
- // select NULL from tt;
- // or
- // insert overwrite local directory "abc" select NULL from tt;
- //
- // where there is no column type to which the NULL value should be
- // converted.
- //
- String tName = colInfo.getType().getTypeName();
- if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
- colTypes = colTypes.concat(serdeConstants.STRING_TYPE_NAME);
- } else {
- colTypes = colTypes.concat(tName);
+ if (isLocal) {
+ assert !isMmTable;
+ // for local directory - we always write to map-red intermediate
+ // store and then copy to local fs
+ queryTmpdir = ctx.getMRTmpPath();
+ } else {
+ // otherwise write to the file system implied by the directory
+ // no copy is required. we may want to revisit this policy in future
+ try {
+ Path qPath = FileUtils.makeQualified(dest_path, conf);
- queryTmpdir = isMmTable ? qPath : ctx.getTempDirForPath(qPath);
++ queryTmpdir = isMmTable ? qPath : ctx.getTempDirForPath(qPath, true);
+ Utilities.LOG14535.info("Setting query directory " + queryTmpdir + " from " + dest_path + " (" + isMmTable + ")");
+ } catch (Exception e) {
+ throw new SemanticException("Error creating temporary folder on: "
+ + dest_path, e);
}
}
@@@ -7016,14 -7137,22 +7127,14 @@@
} else if (dpCtx != null) {
fileSinkDesc.setStaticSpec(dpCtx.getSPPath());
}
+ return fileSinkDesc;
+ }
- if (isHiveServerQuery &&
- null != table_desc &&
- table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) &&
- HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
- fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true);
- } else {
- fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false);
- }
-
- Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
- fileSinkDesc, fsRS, input), inputRR);
-
+ private void handleLineage(LoadTableDesc ltd, Operator output)
+ throws SemanticException {
if (ltd != null && SessionState.get() != null) {
SessionState.get().getLineageState()
- .mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output);
+ .mapDirToOp(ltd.getSourcePath(), (FileSinkOperator) output);
} else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {
Path tlocation = null;
@@@ -7036,116 -7165,43 +7147,124 @@@
}
SessionState.get().getLineageState()
- .mapDirToFop(tlocation, (FileSinkOperator) output);
+ .mapDirToOp(tlocation, (FileSinkOperator) output);
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
- + dest_path + " row schema: " + inputRR.toString());
+ private WriteEntity generateTableWriteEntity(String dest, Table dest_tab,
+ Map<String, String> partSpec, LoadTableDesc ltd,
+ DynamicPartitionCtx dpCtx, boolean isNonNativeTable)
+ throws SemanticException {
+ WriteEntity output = null;
+
+ // Here only register the whole table for post-exec hook if no DP present
+ // in the case of DP, we will register WriteEntity in MoveTask when the
+ // list of dynamically created partitions are known.
+ if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
+ output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
+ if (!outputs.add(output)) {
+ throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
+ .getMsg(dest_tab.getTableName()));
+ }
}
- FileSinkOperator fso = (FileSinkOperator) output;
- fso.getConf().setTable(dest_tab);
- fsopToTable.put(fso, dest_tab);
- // the following code is used to collect column stats when
- // hive.stats.autogather=true
- // and it is an insert overwrite or insert into table
- if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
- && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
- && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
- if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
- genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
- .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
- } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
- genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
- .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
-
+ if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
+ // No static partition specified
+ if (dpCtx.getNumSPCols() == 0) {
+ output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
+ outputs.add(output);
+ output.setDynamicPartitionWrite(true);
+ }
+ // part of the partition specified
+ // Create a DummyPartition in this case. Since, the metastore does not store partial
+ // partitions currently, we need to store dummy partitions
+ else {
+ try {
+ String ppath = dpCtx.getSPPath();
+ ppath = ppath.substring(0, ppath.length() - 1);
+ DummyPartition p =
+ new DummyPartition(dest_tab, dest_tab.getDbName()
+ + "@" + dest_tab.getTableName() + "@" + ppath,
+ partSpec);
+ output = new WriteEntity(p, getWriteType(dest), false);
+ output.setDynamicPartitionWrite(true);
+ outputs.add(output);
+ } catch (HiveException e) {
+ throw new SemanticException(e.getMessage(), e);
+ }
}
}
return output;
}
+ private void checkExternalTable(Table dest_tab) throws SemanticException {
+ if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
+ (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
+ throw new SemanticException(
+ ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
+ }
+ }
+
+ private void checkImmutableTable(QB qb, Table dest_tab, Path dest_path, boolean isPart)
+ throws SemanticException {
+ // If the query here is an INSERT_INTO and the target is an immutable table,
+ // verify that our destination is empty before proceeding
+ if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable(
+ dest_tab.getDbName(), dest_tab.getTableName())) {
+ return;
+ }
+ try {
+ FileSystem fs = dest_path.getFileSystem(conf);
+ if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
+ LOG.warn("Attempted write into an immutable table : "
+ + dest_tab.getTableName() + " : " + dest_path);
+ throw new SemanticException(
+ ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error while trying to determine if immutable table "
+ + (isPart ? "partition " : "") + "has any data : " + dest_tab.getTableName()
+ + " : " + dest_path);
+ throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
+ }
+ }
+
+ private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table dest_tab,
+ Map<String, String> partSpec, String dest) throws SemanticException {
+ List<FieldSchema> parts = dest_tab.getPartitionKeys();
+ if (parts == null || parts.isEmpty()) return null; // table is not partitioned
+ if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
+ throw new SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
+ }
+ DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest);
+ if (dpCtx == null) {
+ dest_tab.validatePartColumnNames(partSpec, false);
+ dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+ conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+ conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+ qbm.setDPCtx(dest, dpCtx);
+ }
+
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
+ throw new SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
+ ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
+ }
+ if ((dest_tab.getNumBuckets() > 0)) {
+ dpCtx.setNumBuckets(dest_tab.getNumBuckets());
+ }
+ return dpCtx;
+ }
+
+
+ private void createInsertDesc(Table table, boolean overwrite) {
+ Task<? extends Serializable>[] tasks = new Task[this.rootTasks.size()];
+ tasks = this.rootTasks.toArray(tasks);
+ InsertTableDesc insertTableDesc = new InsertTableDesc(table.getTTable(), overwrite);
+ TaskFactory
+ .getAndMakeChild(new DDLWork(getInputs(), getOutputs(), insertTableDesc), conf, tasks);
+ }
+
private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
String tableName = table_desc.getTableName();
@@@ -11892,10 -11972,15 +12034,15 @@@
if(curFs != null) {
locStats = curFs.getFileStatus(locPath);
}
- if(locStats != null && locStats.isDir()) {
+ if (locStats != null && locStats.isDir()) {
FileStatus[] lStats = curFs.listStatus(locPath);
if(lStats != null && lStats.length != 0) {
- throw new SemanticException(ErrorMsg.CTAS_LOCATION_NONEMPTY.getMsg(location));
+ // Don't throw an exception if the target location only contains the staging-dirs
+ for (FileStatus lStat : lStats) {
+ if (!lStat.getPath().getName().startsWith(HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR))) {
+ throw new SemanticException(ErrorMsg.CTAS_LOCATION_NONEMPTY.getMsg(location));
+ }
+ }
}
}
} catch (FileNotFoundException nfe) {
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 7039f1f,aa77850..684f1c1
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@@ -47,9 -45,20 +47,21 @@@ public class LoadTableDesc extends org.
// TODO: the below seems like they should just be combined into partitionDesc
private org.apache.hadoop.hive.ql.plan.TableDesc table;
private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
+ private boolean commitMmWriteId = true;
- private LoadTableDesc(final Path sourcePath,
+ public LoadTableDesc(final LoadTableDesc o) {
+ super(o.getSourcePath());
+
+ this.replace = o.replace;
+ this.dpCtx = o.dpCtx;
+ this.lbCtx = o.lbCtx;
+ this.inheritTableSpecs = o.inheritTableSpecs;
+ this.writeType = o.writeType;
+ this.table = o.table;
+ this.partitionSpec = o.partitionSpec;
+ }
+
+ public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec,
final boolean replace,
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index ff450db,8ce211f..50adc42
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@@ -81,9 -77,22 +81,19 @@@ public class MoveWork implements Serial
public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
boolean checkFileFormat) {
- this(inputs, outputs);
- this.loadTableWork = loadTableWork;
- this.loadFileWork = loadFileWork;
- this.checkFileFormat = checkFileFormat;
+ this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false);
}
+ public MoveWork(final MoveWork o) {
+ loadTableWork = o.getLoadTableWork();
+ loadFileWork = o.getLoadFileWork();
+ loadMultiFilesWork = o.getLoadMultiFilesWork();
+ checkFileFormat = o.getCheckFileFormat();
+ srcLocal = o.isSrcLocal();
+ inputs = o.getInputs();
+ outputs = o.getOutputs();
+ }
+
@Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public LoadTableDesc getLoadTableWork() {
return loadTableWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 5fafa57,a90dd35..a40b085
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@@ -555,9 -564,7 +564,8 @@@ public class TestTxnCommands
return sb.toString();
}
- private static final Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
private List<String> runStatementOnDriver(String stmt) throws Exception {
+ LOG.info("Running " + stmt);
CommandProcessorResponse cpr = d.run(stmt);
if(cpr.getResponseCode() != 0) {
throw new RuntimeException(stmt + " failed: " + cpr);
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index cf2ab66,aa23df8..3482dba
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@@ -21,7 -21,12 +21,11 @@@ import static org.junit.Assert.*
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.FileNotFoundException;
import java.io.IOException;
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.nio.ByteBuffer;
+ import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.sql.Date;
import java.sql.Timestamp;
@@@ -29,17 -34,29 +33,18 @@@ import java.text.SimpleDateFormat
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.TimeZone;
-import java.util.TreeSet;
import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@@ -99,12 -120,9 +108,14 @@@ import org.apache.hadoop.mapred.OutputF
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.MockFileSystem;
+import org.apache.hive.common.util.MockFileSystem.MockBlock;
+import org.apache.hive.common.util.MockFileSystem.MockFile;
+import org.apache.hive.common.util.MockFileSystem.MockOutputStream;
+import org.apache.hive.common.util.MockFileSystem.MockPath;
- import org.apache.orc.OrcProto;
+ import org.apache.hadoop.util.Progressable;
+ import org.apache.orc.*;
+ import org.apache.orc.impl.PhysicalFsWriter;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/hive/blob/748c1bd2/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
index 0000000,68ccda9..7bc9073
mode 000000,100644..100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
@@@ -1,0 -1,289 +1,290 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hive.ql.optimizer;
+
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.ql.CompilationOpContext;
+ import org.apache.hadoop.hive.ql.exec.*;
+ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+ import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+ import org.apache.hadoop.hive.ql.parse.SemanticException;
+ import org.apache.hadoop.hive.ql.plan.*;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import java.io.Serializable;
+ import java.util.Arrays;
+ import java.util.List;
+ import java.util.Properties;
+
+ import static org.junit.Assert.*;
+ import static org.mockito.Mockito.mock;
+ import static org.mockito.Mockito.reset;
+ import static org.mockito.Mockito.when;
+
+ public class TestGenMapRedUtilsCreateConditionalTask {
+ private static HiveConf hiveConf;
+
+ private Task dummyMRTask;
+
+ @BeforeClass
+ public static void initializeSessionState() {
+ hiveConf = new HiveConf();
+ }
+
+ @Before
+ public void setUp() {
+ dummyMRTask = new MapRedTask();
+ }
+
+ @Test
+ public void testMovePathsThatCannotBeMerged() {
+ final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+ final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
+ final MoveWork mockWork = mock(MoveWork.class);
+
+ assertFalse("A MoveWork null object cannot be merged.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, null));
+
+ hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "false");
+ assertFalse("Merging paths is not allowed when BlobStorage optimizations are disabled.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+ // Enable BlobStore optimizations for the rest of tests
+ hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "true");
+
+ reset(mockWork);
+ when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc());
+ assertFalse("Merging paths is not allowed when MultiFileWork is found in the MoveWork object.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+ reset(mockWork);
+ when(mockWork.getLoadFileWork()).thenReturn(mock(LoadFileDesc.class));
+ when(mockWork.getLoadTableWork()).thenReturn(mock(LoadTableDesc.class));
+ assertFalse("Merging paths is not allowed when both LoadFileWork & LoadTableWork are found in the MoveWork object.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+ reset(mockWork);
+ when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condInputPath, condOutputPath, false, "", ""));
+ assertFalse("Merging paths is not allowed when both conditional output path is not equals to MoveWork input path.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+
+ reset(mockWork);
+ when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, new Path("unused"), false, "", ""));
+ assertFalse("Merging paths is not allowed when conditional input path is not a BlobStore path.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, new Path("hdfs://hdfs-path"), condOutputPath, mockWork));
+
+ reset(mockWork);
+ when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, new Path("hdfs://hdfs-path"), false, "", ""));
+ assertFalse("Merging paths is not allowed when MoveWork output path is not a BlobStore path.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+ }
+
+ @Test
+ public void testMovePathsThatCanBeMerged() {
+ final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+ final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
+ final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003");
+ final MoveWork mockWork = mock(MoveWork.class);
+
+ when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, targetMoveWorkPath, false, "", ""));
+
+ assertTrue("Merging BlobStore paths should be allowed.",
+ GenMapRedUtils.shouldMergeMovePaths(hiveConf, condInputPath, condOutputPath, mockWork));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergePathWithInvalidMoveWorkThrowsException() {
+ final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+ final MoveWork mockWork = mock(MoveWork.class);
+
+ when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc());
+ GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+ }
+
+ @Test
+ public void testMergePathValidMoveWorkReturnsNewMoveWork() {
+ final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
+ final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
+ final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003");
+ final MoveWork mockWork = mock(MoveWork.class);
+ MoveWork newWork;
+
+ // test using loadFileWork
+ when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(condOutputPath, targetMoveWorkPath, false, "", ""));
+ newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+ assertNotNull(newWork);
+ assertNotEquals(newWork, mockWork);
+ assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath());
+ assertEquals(targetMoveWorkPath, newWork.getLoadFileWork().getTargetDir());
+
+ // test using loadTableWork
+ TableDesc tableDesc = new TableDesc();
+ reset(mockWork);
- when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc(condOutputPath, tableDesc, null));
++ when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc(
++ condOutputPath, tableDesc, null, null));
+ newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+ assertNotNull(newWork);
+ assertNotEquals(newWork, mockWork);
+ assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath());
+ assertTrue(newWork.getLoadTableWork().getTable().equals(tableDesc));
+ }
+
+ @Test
+ public void testConditionalMoveTaskIsOptimized() throws SemanticException {
+ hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "true");
+
+ Path sinkDirName = new Path("s3a://bucket/scratch/-ext-10002");
+ FileSinkOperator fileSinkOperator = createFileSinkOperator(sinkDirName);
+
+ Path finalDirName = new Path("s3a://bucket/scratch/-ext-10000");
+ Path tableLocation = new Path("s3a://bucket/warehouse/table");
+ Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
+ List<Task<MoveWork>> moveTaskList = Arrays.asList(moveTask);
+
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+ ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
+ Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
+ Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
+ Task<? extends Serializable> mergeAndMoveTask = conditionalTask.getListTasks().get(2);
+
+ /*
+ * OPTIMIZATION
+ * The ConditionalTask avoids linking 2 MoveTask that are expensive on blobstorage systems. Instead of
+ * linking, it creates one MoveTask where the source is the first MoveTask source, and target is the
+ * second MoveTask target.
+ */
+
+ // Verify moveOnlyTask is optimized
+ assertNull(moveOnlyTask.getChildTasks());
+ verifyMoveTask(moveOnlyTask, sinkDirName, tableLocation);
+
+ // Verify mergeOnlyTask is NOT optimized (a merge task writes directly to finalDirName, then a MoveTask is executed)
+ assertEquals(1, mergeOnlyTask.getChildTasks().size());
+ verifyMoveTask(mergeOnlyTask.getChildTasks().get(0), finalDirName, tableLocation);
+
+ // Verify mergeAndMoveTask is NOT optimized
+ assertEquals(1, mergeAndMoveTask.getChildTasks().size());
+ assertEquals(1, mergeAndMoveTask.getChildTasks().get(0).getChildTasks().size());
+ verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0), sinkDirName, finalDirName);
+ verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0).getChildTasks().get(0), finalDirName, tableLocation);
+ }
+
+ @Test
+ public void testConditionalMoveTaskIsNotOptimized() throws SemanticException {
+ hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "false");
+
+ Path sinkDirName = new Path("s3a://bucket/scratch/-ext-10002");
+ FileSinkOperator fileSinkOperator = createFileSinkOperator(sinkDirName);
+
+ Path finalDirName = new Path("s3a://bucket/scratch/-ext-10000");
+ Path tableLocation = new Path("s3a://bucket/warehouse/table");
+ Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
+ List<Task<MoveWork>> moveTaskList = Arrays.asList(moveTask);
+
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+ ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
+ Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
+ Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
+ Task<? extends Serializable> mergeAndMoveTask = conditionalTask.getListTasks().get(2);
+
+ // Verify moveOnlyTask is NOT optimized
+ assertEquals(1, moveOnlyTask.getChildTasks().size());
+ verifyMoveTask(moveOnlyTask, sinkDirName, finalDirName);
+ verifyMoveTask(moveOnlyTask.getChildTasks().get(0), finalDirName, tableLocation);
+
+ // Verify mergeOnlyTask is NOT optimized
+ assertEquals(1, mergeOnlyTask.getChildTasks().size());
+ verifyMoveTask(mergeOnlyTask.getChildTasks().get(0), finalDirName, tableLocation);
+
+ // Verify mergeAndMoveTask is NOT optimized
+ assertEquals(1, mergeAndMoveTask.getChildTasks().size());
+ assertEquals(1, mergeAndMoveTask.getChildTasks().get(0).getChildTasks().size());
+ verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0), sinkDirName, finalDirName);
+ verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0).getChildTasks().get(0), finalDirName, tableLocation);
+ }
+
+ @Test
+ public void testConditionalMoveOnHdfsIsNotOptimized() throws SemanticException {
+ hiveConf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, "true");
+
+ Path sinkDirName = new Path("hdfs://bucket/scratch/-ext-10002");
+ FileSinkOperator fileSinkOperator = createFileSinkOperator(sinkDirName);
+
+ Path finalDirName = new Path("hdfs://bucket/scratch/-ext-10000");
+ Path tableLocation = new Path("hdfs://bucket/warehouse/table");
+ Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
+ List<Task<MoveWork>> moveTaskList = Arrays.asList(moveTask);
+
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+ ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
+ Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
+ Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
+ Task<? extends Serializable> mergeAndMoveTask = conditionalTask.getListTasks().get(2);
+
+ // Verify moveOnlyTask is NOT optimized
+ assertEquals(1, moveOnlyTask.getChildTasks().size());
+ verifyMoveTask(moveOnlyTask, sinkDirName, finalDirName);
+ verifyMoveTask(moveOnlyTask.getChildTasks().get(0), finalDirName, tableLocation);
+
+ // Verify mergeOnlyTask is NOT optimized
+ assertEquals(1, mergeOnlyTask.getChildTasks().size());
+ verifyMoveTask(mergeOnlyTask.getChildTasks().get(0), finalDirName, tableLocation);
+
+ // Verify mergeAndMoveTask is NOT optimized
+ assertEquals(1, mergeAndMoveTask.getChildTasks().size());
+ assertEquals(1, mergeAndMoveTask.getChildTasks().get(0).getChildTasks().size());
+ verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0), sinkDirName, finalDirName);
+ verifyMoveTask(mergeAndMoveTask.getChildTasks().get(0).getChildTasks().get(0), finalDirName, tableLocation);
+ }
+
+ private FileSinkOperator createFileSinkOperator(Path finalDirName) {
+ FileSinkOperator fileSinkOperator = mock(FileSinkOperator.class);
+
+ TableDesc tableDesc = new TableDesc(HiveInputFormat.class, HiveOutputFormat.class, new Properties());
+ FileSinkDesc fileSinkDesc = new FileSinkDesc(finalDirName, tableDesc, false);
+ fileSinkDesc.setDirName(finalDirName);
+
+ when(fileSinkOperator.getConf()).thenReturn(fileSinkDesc);
+ when(fileSinkOperator.getSchema()).thenReturn(mock(RowSchema.class));
+ fileSinkDesc.setTableInfo(tableDesc);
+
+ when(fileSinkOperator.getCompilationOpContext()).thenReturn(mock(CompilationOpContext.class));
+
+ return fileSinkOperator;
+ }
+
+ private Task<MoveWork> createMoveTask(Path source, Path destination) {
+ Task<MoveWork> moveTask = mock(MoveTask.class);
+ MoveWork moveWork = new MoveWork();
+ moveWork.setLoadFileWork(new LoadFileDesc(source, destination, true, null, null));
+
+ when(moveTask.getWork()).thenReturn(moveWork);
+
+ return moveTask;
+ }
+
+ private void verifyMoveTask(Task<? extends Serializable> task, Path source, Path target) {
+ MoveTask moveTask = (MoveTask)task;
+ assertEquals(source, moveTask.getWork().getLoadFileWork().getSourcePath());
+ assertEquals(target, moveTask.getWork().getLoadFileWork().getTargetDir());
+ }
+ }