You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/04 02:34:01 UTC
svn commit: r1538485 [1/2] - in /hive/branches/tez: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/mr/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/lib/ ql/src/java/org/a...
Author: gunther
Date: Mon Nov 4 01:34:01 2013
New Revision: 1538485
URL: http://svn.apache.org/r1538485
Log:
HIVE-5734: Enable merge/move tasks for Tez (Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q
hive/branches/tez/ql/src/test/results/clientpositive/tez_dml.q.out
Modified:
hive/branches/tez/build-common.xml
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Modified: hive/branches/tez/build-common.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/build-common.xml?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/build-common.xml (original)
+++ hive/branches/tez/build-common.xml Mon Nov 4 01:34:01 2013
@@ -61,7 +61,7 @@
<property name="test.junit.output.usefile" value="true"/>
<property name="minimr.query.files" value="list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q"/>
<property name="minimr.query.negative.files" value="cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q" />
- <property name="minitez.query.files" value="tez_join_tests.q,tez_joins_explain.q,mrr.q"/>
+ <property name="minitez.query.files" value="tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q"/>
<property name="test.silent" value="true"/>
<property name="hadoopVersion" value="${hadoop.version.ant-internal}"/>
<property name="test.serialize.qplan" value="false"/>
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Mon Nov 4 01:34:01 2013
@@ -134,16 +134,18 @@ public class HadoopJobExecHelper {
this.jobId = jobId;
}
-
- public HadoopJobExecHelper() {
- }
-
public HadoopJobExecHelper(JobConf job, LogHelper console,
Task<? extends Serializable> task, HadoopJobExecHook hookCallBack) {
this.job = job;
this.console = console;
this.task = task;
this.callBackObj = hookCallBack;
+
+ if (job != null) {
+ // even with tez on some jobs are run as MR. disable the flag in
+ // the conf, so that the backend runs fully as MR.
+ HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ, false);
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Nov 4 01:34:01 2013
@@ -45,10 +45,13 @@ import org.apache.hadoop.hive.ql.io.Buck
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
@@ -613,6 +616,22 @@ public class DagUtils {
return null;
}
+ // initialize stats publisher if necessary
+ if (work.isGatheringStats()) {
+ StatsPublisher statsPublisher;
+ String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ if (StatsFactory.setImplementation(statsImplementationClass, conf)) {
+ statsPublisher = StatsFactory.getStatsPublisher();
+ if (!statsPublisher.init(conf)) { // creating stats table if not exists
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+ throw
+ new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ }
+ }
+ }
+ }
+
+
// final vertices need to have at least one output
if (!hasChildren) {
v.addOutput("out_"+work.getName(),
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Nov 4 01:34:01 2013
@@ -117,6 +117,11 @@ public class TezSessionState {
LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
session.start();
+
+ // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
+ // id is used for tez to reuse the current session rather than start a new one.
+ conf.set("mapreduce.framework.name", "yarn-tez");
+ conf.set("tez.session.id", session.getApplicationId().toString());
}
/**
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Mon Nov 4 01:34:01 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -37,12 +38,19 @@ public class IOContext {
protected synchronized IOContext initialValue() { return new IOContext(); }
};
+ private static IOContext ioContext = new IOContext();
+
public static IOContext get() {
+ if (SessionState.get() == null) {
+ // this happens on the backend. only one io context needed.
+ return ioContext;
+ }
return IOContext.threadLocal.get();
}
public static void clear() {
IOContext.threadLocal.remove();
+ ioContext = new IOContext();
}
long currentBlockStart;
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java?rev=1538485&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java Mon Nov 4 01:34:01 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lib;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * CompositeProcessor. Holds a list of node processors to be fired by the same
+ * rule.
+ *
+ */
+public class CompositeProcessor implements NodeProcessor {
+
+ NodeProcessor[] procs;
+
+ public CompositeProcessor(NodeProcessor...nodeProcessors) {
+ procs = nodeProcessors;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ for (NodeProcessor proc: procs) {
+ proc.process(nd, stack, procCtx, nodeOutputs);
+ }
+ return null;
+ }
+}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Nov 4 01:34:01 2013
@@ -21,51 +21,25 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
-import org.apache.hadoop.hive.ql.plan.ConditionalWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.InputFormat;
/**
* Processor for the rule - table scan followed by reduce sink.
@@ -95,8 +69,7 @@ public class GenMRFileSink1 implements N
FileSinkOperator fsOp = (FileSinkOperator) nd;
boolean isInsertTable = // is INSERT OVERWRITE TABLE
- fsOp.getConf().getTableInfo().getTableName() != null &&
- parseCtx.getQB().getParseInfo().isInsertToTable();
+ GenMapRedUtils.isInsertInto(parseCtx, fsOp);
HiveConf hconf = parseCtx.getConf();
// Mark this task as a final map reduce task (ignoring the optional merge task)
@@ -111,49 +84,12 @@ public class GenMRFileSink1 implements N
return true;
}
- // Has the user enabled merging of files for map-only jobs or for all jobs
- if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) {
- List<Task<MoveWork>> mvTasks = ctx.getMvTask();
-
- // In case of unions or map-joins, it is possible that the file has
- // already been seen.
- // So, no need to attempt to merge the files again.
- if ((ctx.getSeenFileSinkOps() == null)
- || (!ctx.getSeenFileSinkOps().contains(nd))) {
-
- // no need of merging if the move is to a local file system
- MoveTask mvTask = (MoveTask) findMoveTask(mvTasks, fsOp);
-
- if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
- addStatsTask(fsOp, mvTask, currTask, parseCtx.getConf());
- }
-
- if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
- if (fsOp.getConf().isLinkedFileSink()) {
- // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
- // number of reducers are few, so the number of files anyway are small.
- // However, with this optimization, we are increasing the number of files
- // possibly by a big margin. So, merge aggresively.
- if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
- chDir = true;
- }
- } else {
- // There are separate configuration parameters to control whether to
- // merge for a map-only job
- // or for a map-reduce job
- MapredWork currWork = (MapredWork) currTask.getWork();
- boolean mergeMapOnly =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null;
- boolean mergeMapRed =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
- currWork.getReduceWork() != null;
- if (mergeMapOnly || mergeMapRed) {
- chDir = true;
- }
- }
- }
- }
+ // In case of unions or map-joins, it is possible that the file has
+ // already been seen.
+ // So, no need to attempt to merge the files again.
+ if ((ctx.getSeenFileSinkOps() == null)
+ || (!ctx.getSeenFileSinkOps().contains(nd))) {
+ chDir = GenMapRedUtils.isMergeRequired(ctx.getMvTask(), hconf, fsOp, currTask, isInsertTable);
}
String finalName = processFS(fsOp, stack, opProcCtx, chDir);
@@ -162,7 +98,9 @@ public class GenMRFileSink1 implements N
// Merge the files in the destination table/partitions by creating Map-only merge job
// If underlying data is RCFile a RCFileBlockMerge task would be created.
LOG.info("using CombineHiveInputformat for the merge job");
- createMRWorkForMergingFiles(fsOp, ctx, finalName);
+ GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName,
+ ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),
+ hconf, currTask);
}
FileSinkDesc fileSinkDesc = fsOp.getConf();
@@ -205,435 +143,6 @@ public class GenMRFileSink1 implements N
}
/**
- * Add the StatsTask as a dependent task of the MoveTask
- * because StatsTask will change the Table/Partition metadata. For atomicity, we
- * should not change it before the data is actually there done by MoveTask.
- *
- * @param nd
- * the FileSinkOperator whose results are taken care of by the MoveTask.
- * @param mvTask
- * The MoveTask that moves the FileSinkOperator's results.
- * @param currTask
- * The MapRedTask that the FileSinkOperator belongs to.
- * @param hconf
- * HiveConf
- */
- private void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
- Task<? extends Serializable> currTask, HiveConf hconf) {
-
- MoveWork mvWork = ((MoveTask) mvTask).getWork();
- StatsWork statsWork = null;
- if (mvWork.getLoadTableWork() != null) {
- statsWork = new StatsWork(mvWork.getLoadTableWork());
- } else if (mvWork.getLoadFileWork() != null) {
- statsWork = new StatsWork(mvWork.getLoadFileWork());
- }
- assert statsWork != null : "Error when genereting StatsTask";
- statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
- MapredWork mrWork = (MapredWork) currTask.getWork();
-
- // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
- // in FileSinkDesc is used for stats publishing. They should be consistent.
- statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
- Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
-
- // mark the MapredWork and FileSinkOperator for gathering stats
- nd.getConf().setGatherStats(true);
- mrWork.getMapWork().setGatheringStats(true);
- if (mrWork.getReduceWork() != null) {
- mrWork.getReduceWork().setGatheringStats(true);
- }
- nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
- nd.getConf().setMaxStatsKeyPrefixLength(
- hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
- // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
-
- // subscribe feeds from the MoveTask so that MoveTask can forward the list
- // of dynamic partition list to the StatsTask
- mvTask.addDependentTask(statsTask);
- statsTask.subscribeFeed(mvTask);
- }
-
- /**
- * @param fsInput The FileSink operator.
- * @param ctx The MR processing context.
- * @param finalName the final destination path the merge job should output.
- * @throws SemanticException
-
- * create a Map-only merge job using CombineHiveInputFormat for all partitions with
- * following operators:
- * MR job J0:
- * ...
- * |
- * v
- * FileSinkOperator_1 (fsInput)
- * |
- * v
- * Merge job J1:
- * |
- * v
- * TableScan (using CombineHiveInputFormat) (tsMerge)
- * |
- * v
- * FileSinkOperator (fsMerge)
- *
- * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
- * do
- * not contain the dynamic partitions (their parent). So after the dynamic partitions are
- * created (after the first job finished before the moveTask or ConditionalTask start),
- * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
- * partition
- * directories.
- *
- */
- private void createMRWorkForMergingFiles (FileSinkOperator fsInput, GenMRProcContext ctx,
- String finalName) throws SemanticException {
-
- //
- // 1. create the operator tree
- //
- HiveConf conf = ctx.getParseCtx().getConf();
- FileSinkDesc fsInputDesc = fsInput.getConf();
-
- // Create a TableScan operator
- RowSchema inputRS = fsInput.getSchema();
- Operator<? extends OperatorDesc> tsMerge =
- GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
-
- // Create a FileSink operator
- TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
- FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
- conf.getBoolVar(ConfVars.COMPRESSRESULT));
- FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
- fsOutputDesc, inputRS, tsMerge);
-
- // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
- // needs to include the partition column, and the fsOutput should have
- // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
- DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
- if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
- // adding DP ColumnInfo to the RowSchema signature
- ArrayList<ColumnInfo> signature = inputRS.getSignature();
- String tblAlias = fsInputDesc.getTableInfo().getTableName();
- LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
- StringBuilder partCols = new StringBuilder();
- for (String dpCol : dpCtx.getDPColNames()) {
- ColumnInfo colInfo = new ColumnInfo(dpCol,
- TypeInfoFactory.stringTypeInfo, // all partition column type should be string
- tblAlias, true); // partition column is virtual column
- signature.add(colInfo);
- colMap.put(dpCol, dpCol); // input and output have the same column name
- partCols.append(dpCol).append('/');
- }
- partCols.setLength(partCols.length() - 1); // remove the last '/'
- inputRS.setSignature(signature);
-
- // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
- DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
- dpCtx2.setInputToDPCols(colMap);
- fsOutputDesc.setDynPartCtx(dpCtx2);
-
- // update the FileSinkOperator to include partition columns
- fsInputDesc.getTableInfo().getProperties().setProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
- partCols.toString()); // list of dynamic partition column names
- } else {
- // non-partitioned table
- fsInputDesc.getTableInfo().getProperties().remove(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- }
-
- //
- // 2. Constructing a conditional task consisting of a move task and a map reduce task
- //
- MoveWork dummyMv = new MoveWork(null, null, null,
- new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
- MapWork cplan;
- Serializable work;
-
- if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
- fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
-
- // Check if InputFormatClass is valid
- String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
- try {
- Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
-
- LOG.info("RCFile format- Using block level merge");
- cplan = createRCFileMergeTask(fsInputDesc, finalName,
- dpCtx != null && dpCtx.getNumDPCols() > 0);
- work = cplan;
- } catch (ClassNotFoundException e) {
- String msg = "Illegal input format class: " + inputFormatClass;
- throw new SemanticException(msg);
- }
-
- } else {
- cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
- work = new MapredWork();
- ((MapredWork)work).setMapWork(cplan);
- // use CombineHiveInputFormat for map-only merging
- }
- cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
- // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
- // know if merge MR2 will be triggered at execution time
- ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work,
- fsInputDesc.getFinalDirName());
-
- // keep the dynamic partition context in conditional task resolver context
- ConditionalResolverMergeFilesCtx mrCtx =
- (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
- mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
- mrCtx.setLbCtx(fsInputDesc.getLbCtx());
-
- //
- // 3. add the moveTask as the children of the conditional task
- //
- linkMoveTask(ctx, fsOutput, cndTsk);
- }
-
- /**
- * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
- * possible subtrees branching from the ConditionalTask.
- *
- * @param ctx
- * @param newOutput
- * @param cndTsk
- */
- private void linkMoveTask(GenMRProcContext ctx, FileSinkOperator newOutput,
- ConditionalTask cndTsk) {
-
- List<Task<MoveWork>> mvTasks = ctx.getMvTask();
- Task<MoveWork> mvTask = findMoveTask(mvTasks, newOutput);
-
- for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
- linkMoveTask(ctx, mvTask, tsk);
- }
- }
-
- /**
- * Follows the task tree down from task and makes all leaves parents of mvTask
- *
- * @param ctx
- * @param mvTask
- * @param task
- */
- private void linkMoveTask(GenMRProcContext ctx, Task<MoveWork> mvTask,
- Task<? extends Serializable> task) {
-
- if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
- // If it's a leaf, add the move task as a child
- addDependentMoveTasks(ctx, mvTask, task);
- } else {
- // Otherwise, for each child run this method recursively
- for (Task<? extends Serializable> childTask : task.getDependentTasks()) {
- linkMoveTask(ctx, mvTask, childTask);
- }
- }
- }
-
- /**
- * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
- * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
- * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
- * well.
- *
- * @param ctx
- * @param mvTask
- * @param parentTask
- */
- private void addDependentMoveTasks(GenMRProcContext ctx, Task<MoveWork> mvTask,
- Task<? extends Serializable> parentTask) {
-
- if (mvTask != null) {
- if (ctx.getConf().getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) {
- DependencyCollectionTask dependencyTask = ctx.getDependencyTaskForMultiInsert();
- parentTask.addDependentTask(dependencyTask);
- if (mvTask.getWork().getLoadTableWork() != null) {
- // Moving tables/partitions depend on the dependencyTask
- dependencyTask.addDependentTask(mvTask);
- } else {
- // Moving files depends on the parentTask (we still want the dependencyTask to depend
- // on the parentTask)
- parentTask.addDependentTask(mvTask);
- }
- } else {
- parentTask.addDependentTask(mvTask);
- }
- }
- }
-
- /**
- * Create a MapredWork based on input path, the top operator and the input
- * table descriptor.
- *
- * @param conf
- * @param topOp
- * the table scan operator that is the root of the MapReduce task.
- * @param fsDesc
- * the file sink descriptor that serves as the input to this merge task.
- * @param parentMR
- * the parent MapReduce work
- * @param parentFS
- * the last FileSinkOperator in the parent MapReduce work
- * @return the MapredWork
- */
- private MapWork createMRWorkForMergingFiles (HiveConf conf,
- Operator<? extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
-
- ArrayList<String> aliases = new ArrayList<String>();
- String inputDir = fsDesc.getFinalDirName();
- TableDesc tblDesc = fsDesc.getTableInfo();
- aliases.add(inputDir); // dummy alias: just use the input path
-
- // constructing the default MapredWork
- MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
- MapWork cplan = cMrPlan.getMapWork();
- cplan.getPathToAliases().put(inputDir, aliases);
- cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
- cplan.getAliasToWork().put(inputDir, topOp);
- cplan.setMapperCannotSpanPartns(true);
-
- return cplan;
- }
-
- /**
- * Create a block level merge task for RCFiles.
- *
- * @param fsInputDesc
- * @param finalName
- * @return MergeWork if table is stored as RCFile,
- * null otherwise
- */
- private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
- String finalName, boolean hasDynamicPartitions) throws SemanticException {
-
- String inputDir = fsInputDesc.getFinalDirName();
- TableDesc tblDesc = fsInputDesc.getTableInfo();
-
- if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
- ArrayList<String> inputDirs = new ArrayList<String>();
- if (!hasDynamicPartitions
- && !isSkewedStoredAsDirs(fsInputDesc)) {
- inputDirs.add(inputDir);
- }
-
- MergeWork work = new MergeWork(inputDirs, finalName,
- hasDynamicPartitions, fsInputDesc.getDynPartCtx());
- LinkedHashMap<String, ArrayList<String>> pathToAliases =
- new LinkedHashMap<String, ArrayList<String>>();
- pathToAliases.put(inputDir, (ArrayList<String>) inputDirs.clone());
- work.setMapperCannotSpanPartns(true);
- work.setPathToAliases(pathToAliases);
- work.setAliasToWork(
- new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
- if (hasDynamicPartitions
- || isSkewedStoredAsDirs(fsInputDesc)) {
- work.getPathToPartitionInfo().put(inputDir,
- new PartitionDesc(tblDesc, null));
- }
- work.setListBucketingCtx(fsInputDesc.getLbCtx());
-
- return work;
- }
-
- throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
- }
-
- /**
- * check if it is skewed table and stored as dirs.
- *
- * @param fsInputDesc
- * @return
- */
- private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
- return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
- .isSkewedStoredAsDir();
- }
-
- /**
- * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
- *
- * @param conf
- * HiveConf
- * @param currTask
- * current leaf task
- * @param mvWork
- * MoveWork for the move task
- * @param mergeWork
- * MapredWork for the merge task.
- * @param inputPath
- * the input directory of the merge/move task
- * @return The conditional task
- */
- private ConditionalTask createCondTask(HiveConf conf,
- Task<? extends Serializable> currTask, MoveWork mvWork,
- Serializable mergeWork, String inputPath) {
-
- // There are 3 options for this ConditionalTask:
- // 1) Merge the partitions
- // 2) Move the partitions (i.e. don't merge the partitions)
- // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
- // merge others) in this case the merge is done first followed by the move to prevent
- // conflicts.
- Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
- Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
- Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
- Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
-
- // NOTE! It is necessary merge task is the parent of the move task, and not
- // the other way around, for the proper execution of the execute method of
- // ConditionalTask
- mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
-
- List<Serializable> listWorks = new ArrayList<Serializable>();
- listWorks.add(mvWork);
- listWorks.add(mergeWork);
-
- ConditionalWork cndWork = new ConditionalWork(listWorks);
-
- List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
- listTasks.add(moveOnlyMoveTask);
- listTasks.add(mergeOnlyMergeTask);
- listTasks.add(mergeAndMoveMergeTask);
-
- ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
- cndTsk.setListTasks(listTasks);
-
- // create resolver
- cndTsk.setResolver(new ConditionalResolverMergeFiles());
- ConditionalResolverMergeFilesCtx mrCtx =
- new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
- cndTsk.setResolverCtx(mrCtx);
-
- // make the conditional task as the child of the current leaf task
- currTask.addDependentTask(cndTsk);
-
- return cndTsk;
- }
-
- private Task<MoveWork> findMoveTask(
- List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
- // find the move task
- for (Task<MoveWork> mvTsk : mvTasks) {
- MoveWork mvWork = mvTsk.getWork();
- String srcDir = null;
- if (mvWork.getLoadFileWork() != null) {
- srcDir = mvWork.getLoadFileWork().getSourceDir();
- } else if (mvWork.getLoadTableWork() != null) {
- srcDir = mvWork.getLoadTableWork().getSourceDir();
- }
-
- String fsOpDirName = fsOp.getConf().getFinalDirName();
- if ((srcDir != null)
- && (srcDir.equalsIgnoreCase(fsOpDirName))) {
- return mvTsk;
- }
- }
- return null;
- }
-
- /**
* Process the FileSink operator to generate a MoveTask if necessary.
*
* @param fsOp
@@ -651,6 +160,11 @@ public class GenMRFileSink1 implements N
NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
+ Task<? extends Serializable> currTask = ctx.getCurrTask();
+
+ // If the directory needs to be changed, send the new directory
+ String dest = null;
+
List<FileSinkOperator> seenFSOps = ctx.getSeenFileSinkOps();
if (seenFSOps == null) {
seenFSOps = new ArrayList<FileSinkOperator>();
@@ -660,49 +174,14 @@ public class GenMRFileSink1 implements N
}
ctx.setSeenFileSinkOps(seenFSOps);
- Task<? extends Serializable> currTask = ctx.getCurrTask();
-
- // If the directory needs to be changed, send the new directory
- String dest = null;
-
- if (chDir) {
- dest = fsOp.getConf().getFinalDirName();
-
- // generate the temporary file
- // it must be on the same file system as the current destination
- ParseContext parseCtx = ctx.getParseCtx();
- Context baseCtx = parseCtx.getContext();
- String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri());
-
- FileSinkDesc fileSinkDesc = fsOp.getConf();
- // Change all the linked file sink descriptors
- if (fileSinkDesc.isLinkedFileSink()) {
- for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
- String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
- fsConf.setParentDir(tmpDir);
- fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
- }
- } else {
- fileSinkDesc.setDirName(tmpDir);
- }
- }
-
- Task<MoveWork> mvTask = null;
-
- if (!chDir) {
- mvTask = findMoveTask(ctx.getMvTask(), fsOp);
- }
+ dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(),
+ ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert());
Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
String currAliasId = ctx.getCurrAliasId();
HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
ctx.getOpTaskMap();
- // Set the move task to be dependent on the current task
- if (mvTask != null) {
- addDependentMoveTasks(ctx, mvTask, currTask);
- }
-
// In case of multi-table insert, the path to alias mapping is needed for
// all the sources. Since there is no
// reducer, treat it as a plan with null reducer
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Nov 4 01:34:01 2013
@@ -33,11 +33,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -50,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -61,20 +67,31 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
/**
* General utility common functions for the Processor to convert operator into
@@ -1107,6 +1124,571 @@ public final class GenMapRedUtils {
}
}
+ /**
+ * @param fsInput The FileSink operator.
+ * @param ctx The MR processing context.
+ * @param finalName the final destination path the merge job should output.
+ * @param dependencyTask
+ * @param mvTasks
+ * @param conf
+ * @param currTask
+ * @throws SemanticException
+
+ * create a Map-only merge job using CombineHiveInputFormat for all partitions with
+ * following operators:
+ * MR job J0:
+ * ...
+ * |
+ * v
+ * FileSinkOperator_1 (fsInput)
+ * |
+ * v
+ * Merge job J1:
+ * |
+ * v
+ * TableScan (using CombineHiveInputFormat) (tsMerge)
+ * |
+ * v
+ * FileSinkOperator (fsMerge)
+ *
+ * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+ * do
+ * not contain the dynamic partitions (their parent). So after the dynamic partitions are
+ * created (after the first job finished before the moveTask or ConditionalTask start),
+ * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+ * partition
+ * directories.
+ *
+ */
+ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
+ String finalName, DependencyCollectionTask dependencyTask,
+ List<Task<MoveWork>> mvTasks, HiveConf conf,
+ Task<? extends Serializable> currTask) throws SemanticException {
+
+ //
+ // 1. create the operator tree
+ //
+ FileSinkDesc fsInputDesc = fsInput.getConf();
+
+ // Create a TableScan operator
+ RowSchema inputRS = fsInput.getSchema();
+ Operator<? extends OperatorDesc> tsMerge =
+ GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
+
+ // Create a FileSink operator
+ TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
+ FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
+ conf.getBoolVar(ConfVars.COMPRESSRESULT));
+ FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
+ fsOutputDesc, inputRS, tsMerge);
+
+ // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
+ // needs to include the partition column, and the fsOutput should have
+ // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
+ DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+ // adding DP ColumnInfo to the RowSchema signature
+ ArrayList<ColumnInfo> signature = inputRS.getSignature();
+ String tblAlias = fsInputDesc.getTableInfo().getTableName();
+ LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
+ StringBuilder partCols = new StringBuilder();
+ for (String dpCol : dpCtx.getDPColNames()) {
+ ColumnInfo colInfo = new ColumnInfo(dpCol,
+ TypeInfoFactory.stringTypeInfo, // all partition column type should be string
+ tblAlias, true); // partition column is virtual column
+ signature.add(colInfo);
+ colMap.put(dpCol, dpCol); // input and output have the same column name
+ partCols.append(dpCol).append('/');
+ }
+ partCols.setLength(partCols.length() - 1); // remove the last '/'
+ inputRS.setSignature(signature);
+
+ // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
+ DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
+ dpCtx2.setInputToDPCols(colMap);
+ fsOutputDesc.setDynPartCtx(dpCtx2);
+
+ // update the FileSinkOperator to include partition columns
+ fsInputDesc.getTableInfo().getProperties().setProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+ partCols.toString()); // list of dynamic partition column names
+ } else {
+ // non-partitioned table
+ fsInputDesc.getTableInfo().getProperties().remove(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+ }
+
+ //
+ // 2. Constructing a conditional task consisting of a move task and a map reduce task
+ //
+ MoveWork dummyMv = new MoveWork(null, null, null,
+ new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
+ MapWork cplan;
+ Serializable work;
+
+ if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
+ fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+
+ // Check if InputFormatClass is valid
+ String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ try {
+ Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
+
+ LOG.info("RCFile format- Using block level merge");
+ cplan = GenMapRedUtils.createRCFileMergeTask(fsInputDesc, finalName,
+ dpCtx != null && dpCtx.getNumDPCols() > 0);
+ work = cplan;
+ } catch (ClassNotFoundException e) {
+ String msg = "Illegal input format class: " + inputFormatClass;
+ throw new SemanticException(msg);
+ }
+
+ } else {
+ cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+ work = new MapredWork();
+ ((MapredWork)work).setMapWork(cplan);
+ // use CombineHiveInputFormat for map-only merging
+ }
+ cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
+ // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
+ // know if merge MR2 will be triggered at execution time
+ ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
+ fsInputDesc.getFinalDirName());
+
+ // keep the dynamic partition context in conditional task resolver context
+ ConditionalResolverMergeFilesCtx mrCtx =
+ (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+ mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+ mrCtx.setLbCtx(fsInputDesc.getLbCtx());
+
+ //
+ // 3. add the moveTask as the children of the conditional task
+ //
+ linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
+ }
+
+ /**
+ * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
+ * possible subtrees branching from the ConditionalTask.
+ *
+ * @param newOutput
+ * @param cndTsk
+ * @param mvTasks
+ * @param hconf
+ * @param dependencyTask
+ */
+ public static void linkMoveTask(FileSinkOperator newOutput,
+ ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf,
+ DependencyCollectionTask dependencyTask) {
+
+ Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
+
+ for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
+ linkMoveTask(mvTask, tsk, hconf, dependencyTask);
+ }
+ }
+
+ /**
+ * Follows the task tree down from task and makes all leaves parents of mvTask
+ *
+ * @param mvTask
+ * @param task
+ * @param hconf
+ * @param dependencyTask
+ */
+ public static void linkMoveTask(Task<MoveWork> mvTask,
+ Task<? extends Serializable> task, HiveConf hconf,
+ DependencyCollectionTask dependencyTask) {
+
+ if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
+ // If it's a leaf, add the move task as a child
+ addDependentMoveTasks(mvTask, hconf, task, dependencyTask);
+ } else {
+ // Otherwise, for each child run this method recursively
+ for (Task<? extends Serializable> childTask : task.getDependentTasks()) {
+ linkMoveTask(mvTask, childTask, hconf, dependencyTask);
+ }
+ }
+ }
+
+ /**
+ * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
+ * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
+ * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
+ * well.
+ *
+ * @param mvTask
+ * @param hconf
+ * @param parentTask
+ * @param dependencyTask
+ */
+ public static void addDependentMoveTasks(Task<MoveWork> mvTask, HiveConf hconf,
+ Task<? extends Serializable> parentTask, DependencyCollectionTask dependencyTask) {
+
+ if (mvTask != null) {
+ if (hconf.getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) {
+ parentTask.addDependentTask(dependencyTask);
+ if (mvTask.getWork().getLoadTableWork() != null) {
+ // Moving tables/partitions depend on the dependencyTask
+ dependencyTask.addDependentTask(mvTask);
+ } else {
+ // Moving files depends on the parentTask (we still want the dependencyTask to depend
+ // on the parentTask)
+ parentTask.addDependentTask(mvTask);
+ }
+ } else {
+ parentTask.addDependentTask(mvTask);
+ }
+ }
+ }
+
+
+ /**
+ * Add the StatsTask as a dependent task of the MoveTask
+ * because StatsTask will change the Table/Partition metadata. For atomicity, we
+ * should not change it before the data is actually there done by MoveTask.
+ *
+ * @param nd
+ * the FileSinkOperator whose results are taken care of by the MoveTask.
+ * @param mvTask
+ * The MoveTask that moves the FileSinkOperator's results.
+ * @param currTask
+ * The MapRedTask that the FileSinkOperator belongs to.
+ * @param hconf
+ * HiveConf
+ */
+ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
+ Task<? extends Serializable> currTask, HiveConf hconf) {
+
+ MoveWork mvWork = ((MoveTask) mvTask).getWork();
+ StatsWork statsWork = null;
+ if (mvWork.getLoadTableWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadTableWork());
+ } else if (mvWork.getLoadFileWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadFileWork());
+ }
+ assert statsWork != null : "Error when genereting StatsTask";
+ statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+
+ if (currTask.getWork() instanceof MapredWork) {
+ MapredWork mrWork = (MapredWork) currTask.getWork();
+ mrWork.getMapWork().setGatheringStats(true);
+ if (mrWork.getReduceWork() != null) {
+ mrWork.getReduceWork().setGatheringStats(true);
+ }
+ } else {
+ TezWork work = (TezWork) currTask.getWork();
+ for (BaseWork w: work.getAllWork()) {
+ w.setGatheringStats(true);
+ }
+ }
+
+ // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+ // in FileSinkDesc is used for stats publishing. They should be consistent.
+ statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
+ Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
+
+ // mark the MapredWork and FileSinkOperator for gathering stats
+ nd.getConf().setGatherStats(true);
+ nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+ nd.getConf().setMaxStatsKeyPrefixLength(
+ hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
+ // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
+
+ // subscribe feeds from the MoveTask so that MoveTask can forward the list
+ // of dynamic partition list to the StatsTask
+ mvTask.addDependentTask(statsTask);
+ statsTask.subscribeFeed(mvTask);
+ }
+
+ /**
+ * Returns true iff current query is an insert into for the given file sink
+ *
+ * @param parseCtx
+ * @param fsOp
+ * @return
+ */
+ public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) {
+ return fsOp.getConf().getTableInfo().getTableName() != null &&
+ parseCtx.getQB().getParseInfo().isInsertToTable();
+ }
+
+ /**
+ * Create a MapredWork based on input path, the top operator and the input
+ * table descriptor.
+ *
+ * @param conf
+ * @param topOp
+ * the table scan operator that is the root of the MapReduce task.
+ * @param fsDesc
+ * the file sink descriptor that serves as the input to this merge task.
+ * @param parentMR
+ * the parent MapReduce work
+ * @param parentFS
+ * the last FileSinkOperator in the parent MapReduce work
+ * @return the MapredWork
+ */
+ private static MapWork createMRWorkForMergingFiles (HiveConf conf,
+ Operator<? extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
+
+ ArrayList<String> aliases = new ArrayList<String>();
+ String inputDir = fsDesc.getFinalDirName();
+ TableDesc tblDesc = fsDesc.getTableInfo();
+ aliases.add(inputDir); // dummy alias: just use the input path
+
+ // constructing the default MapredWork
+ MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+ MapWork cplan = cMrPlan.getMapWork();
+ cplan.getPathToAliases().put(inputDir, aliases);
+ cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
+ cplan.getAliasToWork().put(inputDir, topOp);
+ cplan.setMapperCannotSpanPartns(true);
+
+ return cplan;
+ }
+
+ /**
+ * Create a block level merge task for RCFiles.
+ *
+ * @param fsInputDesc
+ * @param finalName
+ * @return MergeWork if table is stored as RCFile,
+ * null otherwise
+ */
+ public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+ String finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+ String inputDir = fsInputDesc.getFinalDirName();
+ TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ ArrayList<String> inputDirs = new ArrayList<String>();
+ if (!hasDynamicPartitions
+ && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ inputDirs.add(inputDir);
+ }
+
+ MergeWork work = new MergeWork(inputDirs, finalName,
+ hasDynamicPartitions, fsInputDesc.getDynPartCtx());
+ LinkedHashMap<String, ArrayList<String>> pathToAliases =
+ new LinkedHashMap<String, ArrayList<String>>();
+ pathToAliases.put(inputDir, (ArrayList<String>) inputDirs.clone());
+ work.setMapperCannotSpanPartns(true);
+ work.setPathToAliases(pathToAliases);
+ work.setAliasToWork(
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
+ if (hasDynamicPartitions
+ || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ work.getPathToPartitionInfo().put(inputDir,
+ new PartitionDesc(tblDesc, null));
+ }
+ work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+ return work;
+ }
+
+ throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+ }
+
+ /**
+ * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
+ *
+ * @param conf
+ * HiveConf
+ * @param currTask
+ * current leaf task
+ * @param mvWork
+ * MoveWork for the move task
+ * @param mergeWork
+ * MapredWork for the merge task.
+ * @param inputPath
+ * the input directory of the merge/move task
+ * @return The conditional task
+ */
+ @SuppressWarnings("unchecked")
+ public static ConditionalTask createCondTask(HiveConf conf,
+ Task<? extends Serializable> currTask, MoveWork mvWork,
+ Serializable mergeWork, String inputPath) {
+
+ // There are 3 options for this ConditionalTask:
+ // 1) Merge the partitions
+ // 2) Move the partitions (i.e. don't merge the partitions)
+ // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
+ // merge others) in this case the merge is done first followed by the move to prevent
+ // conflicts.
+ Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
+ Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
+ Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
+ Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
+
+ // NOTE! It is necessary merge task is the parent of the move task, and not
+ // the other way around, for the proper execution of the execute method of
+ // ConditionalTask
+ mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
+
+ List<Serializable> listWorks = new ArrayList<Serializable>();
+ listWorks.add(mvWork);
+ listWorks.add(mergeWork);
+
+ ConditionalWork cndWork = new ConditionalWork(listWorks);
+
+ List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+ listTasks.add(moveOnlyMoveTask);
+ listTasks.add(mergeOnlyMergeTask);
+ listTasks.add(mergeAndMoveMergeTask);
+
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
+ cndTsk.setListTasks(listTasks);
+
+ // create resolver
+ cndTsk.setResolver(new ConditionalResolverMergeFiles());
+ ConditionalResolverMergeFilesCtx mrCtx =
+ new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+ cndTsk.setResolverCtx(mrCtx);
+
+ // make the conditional task as the child of the current leaf task
+ currTask.addDependentTask(cndTsk);
+
+ return cndTsk;
+ }
+
+ /**
+ * check if it is skewed table and stored as dirs.
+ *
+ * @param fsInputDesc
+ * @return
+ */
+ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+ return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+ .isSkewedStoredAsDir();
+ }
+
+ public static Task<MoveWork> findMoveTask(
+ List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
+ // find the move task
+ for (Task<MoveWork> mvTsk : mvTasks) {
+ MoveWork mvWork = mvTsk.getWork();
+ String srcDir = null;
+ if (mvWork.getLoadFileWork() != null) {
+ srcDir = mvWork.getLoadFileWork().getSourceDir();
+ } else if (mvWork.getLoadTableWork() != null) {
+ srcDir = mvWork.getLoadTableWork().getSourceDir();
+ }
+
+ String fsOpDirName = fsOp.getConf().getFinalDirName();
+ if ((srcDir != null)
+ && (srcDir.equalsIgnoreCase(fsOpDirName))) {
+ return mvTsk;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns true iff the fsOp requires a merge
+ * @param mvTasks
+ * @param hconf
+ * @param fsOp
+ * @param currTask
+ * @param isInsertTable
+ * @return
+ */
+ public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf, FileSinkOperator fsOp,
+ Task<? extends Serializable> currTask, boolean isInsertTable) {
+
+ // Has the user enabled merging of files for map-only jobs or for all jobs
+ if ((mvTasks != null) && (!mvTasks.isEmpty())) {
+
+ // no need of merging if the move is to a local file system
+ MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+
+ if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+ GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
+ }
+
+ if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
+ if (fsOp.getConf().isLinkedFileSink()) {
+ // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
+ // number of reducers are few, so the number of files anyway are small.
+ // However, with this optimization, we are increasing the number of files
+ // possibly by a big margin. So, merge aggresively.
+ if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
+ return true;
+ }
+ } else {
+ // There are separate configuration parameters to control whether to
+ // merge for a map-only job
+ // or for a map-reduce job
+ ReduceWork reduceWork = currTask.getWork() instanceof MapredWork
+ ? ((MapredWork) currTask.getWork()).getReduceWork() : null;
+ boolean mergeMapOnly =
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+ boolean mergeMapRed =
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+ reduceWork != null;
+ if (mergeMapOnly || mergeMapRed) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Create and add any dependent move tasks
+ *
+ * @param currTask
+ * @param chDir
+ * @param fsOp
+ * @param parseCtx
+ * @param mvTasks
+ * @param hconf
+ * @param dependencyTask
+ * @return
+ */
+ public static String createMoveTask(Task<? extends Serializable> currTask, boolean chDir,
+ FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks,
+ HiveConf hconf, DependencyCollectionTask dependencyTask) {
+
+ String dest = null;
+
+ if (chDir) {
+ dest = fsOp.getConf().getFinalDirName();
+
+ // generate the temporary file
+ // it must be on the same file system as the current destination
+ Context baseCtx = parseCtx.getContext();
+ String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri());
+
+ FileSinkDesc fileSinkDesc = fsOp.getConf();
+ // Change all the linked file sink descriptors
+ if (fileSinkDesc.isLinkedFileSink()) {
+ for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
+ String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
+ fsConf.setParentDir(tmpDir);
+ fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
+ }
+ } else {
+ fileSinkDesc.setDirName(tmpDir);
+ }
+ }
+
+ Task<MoveWork> mvTask = null;
+
+ if (!chDir) {
+ mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+ }
+
+ // Set the move task to be dependent on the current task
+ if (mvTask != null) {
+ GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask);
+ }
+
+ return dest;
+ }
+
private GenMapRedUtils() {
// prevent instantiation
}
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1538485&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Mon Nov 4 01:34:01 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+
+/**
+ * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ *
+ */
+public class FileSinkProcessor implements NodeProcessor {
+
+ static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
+
+ @Override
+ /*
+ * (non-Javadoc)
+ * we should ideally not modify the tree we traverse.
+ * However, since we need to walk the tree at any time when we modify the
+ * operator, we might as well do it here.
+ */
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ FileSinkOperator fileSink = (FileSinkOperator) nd;
+ ParseContext parseContext = context.parseContext;
+
+
+ boolean isInsertTable = // is INSERT OVERWRITE TABLE
+ GenMapRedUtils.isInsertInto(parseContext, fileSink);
+ HiveConf hconf = parseContext.getConf();
+
+ boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
+ hconf, fileSink, context.currentTask, isInsertTable);
+
+ String finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+ chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
+
+ if (chDir) {
+ // Merge the files in the destination table/partitions by creating Map-only merge job
+ // If underlying data is RCFile a RCFileBlockMerge task would be created.
+ LOG.info("using CombineHiveInputformat for the merge job");
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
+ context.dependencyTask, context.moveTask,
+ hconf, context.currentTask);
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon Nov 4 01:34:01 2013
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.hooks.R
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -97,6 +99,9 @@ public class GenTezProcContext implement
// remember the dummy ops we created
public final Map<Operator<?>, List<Operator<?>>> linkChildOpWithDummyOp;
+ // used to group dependent tasks for multi table inserts
+ public final DependencyCollectionTask dependencyTask;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -115,5 +120,7 @@ public class GenTezProcContext implement
this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
+ this.dependencyTask = (DependencyCollectionTask)
+ TaskFactory.get(new DependencyCollectionWork(), conf);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Nov 4 01:34:01 2013
@@ -71,6 +71,12 @@ public class GenTezWork implements NodeP
// packing into a vertex, typically a table scan, union or join
Operator<?> root = context.currentRootOperator;
if (root == null) {
+ // if there are no more rootOperators we're dealing with multiple
+ // file sinks off of the same table scan. Bail.
+ if (context.rootOperators.isEmpty()) {
+ return null;
+ }
+
// null means that we're starting with a new table scan
// the graph walker walks the rootOperators in the same
// order so we can just take the next
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Nov 4 01:34:01 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -126,16 +127,17 @@ public class TezCompiler extends TaskCom
// the operator stack.
// The dispatcher generates the plan from the operator tree
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"),
+ opRules.put(new RuleRegExp("Split Work - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"),
genTezWork);
- opRules.put(new RuleRegExp(new String("No more walking on ReduceSink-MapJoin"),
+
+ opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin",
ReduceSinkOperator.getOperatorName() + "%" +
MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc());
- opRules.put(new RuleRegExp(new String("Split Work - FileSink"),
- FileSinkOperator.getOperatorName() + "%"),
- genTezWork);
+ opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink",
+ FileSinkOperator.getOperatorName() + "%"),
+ new CompositeProcessor(new FileSinkProcessor(), genTezWork));
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Added: hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q?rev=1538485&view=auto
==============================================================================
--- hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q (added)
+++ hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q Mon Nov 4 01:34:01 2013
@@ -0,0 +1,37 @@
+set hive.optimize.tez=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+-- CTAS
+EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt;
+CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt;
+
+SELECT * FROM tmp_src;
+
+-- dyn partitions
+CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int);
+EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src;
+INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src;
+
+SELECT * FROM tmp_src_part;
+
+-- multi insert
+CREATE TABLE even (c int, d string);
+CREATE TABLE odd (c int, d string);
+
+EXPLAIN
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1;
+
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1;
+
+SELECT * FROM even;
+SELECT * FROM odd;
+
+-- drop the tables
+DROP TABLE even;
+DROP TABLE odd;
+DROP TABLE tmp_src;
+DROP TABLE tmp_src_part;