You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/04 02:34:01 UTC

svn commit: r1538485 [1/2] - in /hive/branches/tez: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/lib/ ql/src/java/org/a...

Author: gunther
Date: Mon Nov  4 01:34:01 2013
New Revision: 1538485

URL: http://svn.apache.org/r1538485
Log:
HIVE-5734: Enable merge/move tasks for Tez (Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
    hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q
    hive/branches/tez/ql/src/test/results/clientpositive/tez_dml.q.out
Modified:
    hive/branches/tez/build-common.xml
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Modified: hive/branches/tez/build-common.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/build-common.xml?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/build-common.xml (original)
+++ hive/branches/tez/build-common.xml Mon Nov  4 01:34:01 2013
@@ -61,7 +61,7 @@
   <property name="test.junit.output.usefile" value="true"/>
   <property name="minimr.query.files" value="list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q"/>
   <property name="minimr.query.negative.files" value="cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q" />
-  <property name="minitez.query.files" value="tez_join_tests.q,tez_joins_explain.q,mrr.q"/>
+  <property name="minitez.query.files" value="tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q"/>
   <property name="test.silent" value="true"/>
   <property name="hadoopVersion" value="${hadoop.version.ant-internal}"/>
   <property name="test.serialize.qplan" value="false"/>

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Mon Nov  4 01:34:01 2013
@@ -134,16 +134,18 @@ public class HadoopJobExecHelper {
     this.jobId = jobId;
   }
 
-
-  public HadoopJobExecHelper() {
-  }
-
   public HadoopJobExecHelper(JobConf job, LogHelper console,
       Task<? extends Serializable> task, HadoopJobExecHook hookCallBack) {
     this.job = job;
     this.console = console;
     this.task = task;
     this.callBackObj = hookCallBack;
+
+    if (job != null) {
+      // even with tez on some jobs are run as MR. disable the flag in
+      // the conf, so that the backend runs fully as MR.
+      HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ, false);
+    }
   }
 
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Nov  4 01:34:01 2013
@@ -45,10 +45,13 @@ import org.apache.hadoop.hive.ql.io.Buck
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
@@ -613,6 +616,22 @@ public class DagUtils {
       return null;
     }
 
+    // initialize stats publisher if necessary
+    if (work.isGatheringStats()) {
+      StatsPublisher statsPublisher;
+      String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+      if (StatsFactory.setImplementation(statsImplementationClass, conf)) {
+        statsPublisher = StatsFactory.getStatsPublisher();
+        if (!statsPublisher.init(conf)) { // creating stats table if not exists
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+            throw
+              new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+          }
+        }
+      }
+    }
+
+
     // final vertices need to have at least one output
     if (!hasChildren) {
       v.addOutput("out_"+work.getName(),

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Nov  4 01:34:01 2013
@@ -117,6 +117,11 @@ public class TezSessionState {
 
     LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
     session.start();
+
+    // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
+    // id is used for tez to reuse the current session rather than start a new one.
+    conf.set("mapreduce.framework.name", "yarn-tez");
+    conf.set("tez.session.id", session.getApplicationId().toString());
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Mon Nov  4 01:34:01 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 
 /**
@@ -37,12 +38,19 @@ public class IOContext {
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
+ private static IOContext ioContext = new IOContext();
+
   public static IOContext get() {
+    if (SessionState.get() == null) {
+      // this happens on the backend. only one io context needed.
+      return ioContext;
+    }
     return IOContext.threadLocal.get();
   }
 
   public static void clear() {
     IOContext.threadLocal.remove();
+    ioContext = new IOContext();
   }
 
   long currentBlockStart;

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java?rev=1538485&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java Mon Nov  4 01:34:01 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lib;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * CompositeProcessor. Holds a list of node processors to be fired by the same
+ * rule.
+ *
+ */
+public class CompositeProcessor implements NodeProcessor {
+
+  NodeProcessor[] procs;
+
+  public CompositeProcessor(NodeProcessor...nodeProcessors) {
+    procs = nodeProcessors;
+  }
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+      throws SemanticException {
+    for (NodeProcessor proc: procs) {
+      proc.process(nd, stack, procCtx, nodeOutputs);
+    }
+    return null;
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Nov  4 01:34:01 2013
@@ -21,51 +21,25 @@ package org.apache.hadoop.hive.ql.optimi
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
-import org.apache.hadoop.hive.ql.plan.ConditionalWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.InputFormat;
 
 /**
  * Processor for the rule - table scan followed by reduce sink.
@@ -95,8 +69,7 @@ public class GenMRFileSink1 implements N
 
     FileSinkOperator fsOp = (FileSinkOperator) nd;
     boolean isInsertTable = // is INSERT OVERWRITE TABLE
-    fsOp.getConf().getTableInfo().getTableName() != null &&
-        parseCtx.getQB().getParseInfo().isInsertToTable();
+        GenMapRedUtils.isInsertInto(parseCtx, fsOp);
     HiveConf hconf = parseCtx.getConf();
 
     // Mark this task as a final map reduce task (ignoring the optional merge task)
@@ -111,49 +84,12 @@ public class GenMRFileSink1 implements N
       return true;
     }
 
-    // Has the user enabled merging of files for map-only jobs or for all jobs
-    if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) {
-      List<Task<MoveWork>> mvTasks = ctx.getMvTask();
-
-      // In case of unions or map-joins, it is possible that the file has
-      // already been seen.
-      // So, no need to attempt to merge the files again.
-      if ((ctx.getSeenFileSinkOps() == null)
-          || (!ctx.getSeenFileSinkOps().contains(nd))) {
-
-        // no need of merging if the move is to a local file system
-        MoveTask mvTask = (MoveTask) findMoveTask(mvTasks, fsOp);
-
-        if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
-          addStatsTask(fsOp, mvTask, currTask, parseCtx.getConf());
-        }
-
-        if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
-          if (fsOp.getConf().isLinkedFileSink()) {
-            // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
-            // number of reducers are few, so the number of files anyway are small.
-            // However, with this optimization, we are increasing the number of files
-            // possibly by a big margin. So, merge aggresively.
-            if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
-                hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
-              chDir = true;
-            }
-          } else {
-              // There are separate configuration parameters to control whether to
-              // merge for a map-only job
-              // or for a map-reduce job
-              MapredWork currWork = (MapredWork) currTask.getWork();
-              boolean mergeMapOnly =
-                  hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null;
-              boolean mergeMapRed =
-                  hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
-                      currWork.getReduceWork() != null;
-              if (mergeMapOnly || mergeMapRed) {
-                chDir = true;
-              }
-          }
-        }
-      }
+    // In case of unions or map-joins, it is possible that the file has
+    // already been seen.
+    // So, no need to attempt to merge the files again.
+    if ((ctx.getSeenFileSinkOps() == null)
+        || (!ctx.getSeenFileSinkOps().contains(nd))) {
+      chDir = GenMapRedUtils.isMergeRequired(ctx.getMvTask(), hconf, fsOp, currTask, isInsertTable);
     }
 
     String finalName = processFS(fsOp, stack, opProcCtx, chDir);
@@ -162,7 +98,9 @@ public class GenMRFileSink1 implements N
       // Merge the files in the destination table/partitions by creating Map-only merge job
       // If underlying data is RCFile a RCFileBlockMerge task would be created.
       LOG.info("using CombineHiveInputformat for the merge job");
-      createMRWorkForMergingFiles(fsOp, ctx, finalName);
+      GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName,
+          ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),
+          hconf, currTask);
     }
 
     FileSinkDesc fileSinkDesc = fsOp.getConf();
@@ -205,435 +143,6 @@ public class GenMRFileSink1 implements N
   }
 
   /**
-   * Add the StatsTask as a dependent task of the MoveTask
-   * because StatsTask will change the Table/Partition metadata. For atomicity, we
-   * should not change it before the data is actually there done by MoveTask.
-   *
-   * @param nd
-   *          the FileSinkOperator whose results are taken care of by the MoveTask.
-   * @param mvTask
-   *          The MoveTask that moves the FileSinkOperator's results.
-   * @param currTask
-   *          The MapRedTask that the FileSinkOperator belongs to.
-   * @param hconf
-   *          HiveConf
-   */
-  private void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
-      Task<? extends Serializable> currTask, HiveConf hconf) {
-
-    MoveWork mvWork = ((MoveTask) mvTask).getWork();
-    StatsWork statsWork = null;
-    if (mvWork.getLoadTableWork() != null) {
-      statsWork = new StatsWork(mvWork.getLoadTableWork());
-    } else if (mvWork.getLoadFileWork() != null) {
-      statsWork = new StatsWork(mvWork.getLoadFileWork());
-    }
-    assert statsWork != null : "Error when genereting StatsTask";
-    statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
-    MapredWork mrWork = (MapredWork) currTask.getWork();
-
-    // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
-    // in FileSinkDesc is used for stats publishing. They should be consistent.
-    statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
-    Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
-
-    // mark the MapredWork and FileSinkOperator for gathering stats
-    nd.getConf().setGatherStats(true);
-    mrWork.getMapWork().setGatheringStats(true);
-    if (mrWork.getReduceWork() != null) {
-      mrWork.getReduceWork().setGatheringStats(true);
-    }
-    nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
-    nd.getConf().setMaxStatsKeyPrefixLength(
-        hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
-    // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
-
-    // subscribe feeds from the MoveTask so that MoveTask can forward the list
-    // of dynamic partition list to the StatsTask
-    mvTask.addDependentTask(statsTask);
-    statsTask.subscribeFeed(mvTask);
-  }
-
-  /**
-   * @param fsInput The FileSink operator.
-   * @param ctx The MR processing context.
-   * @param finalName the final destination path the merge job should output.
-   * @throws SemanticException
-
-   * create a Map-only merge job using CombineHiveInputFormat for all partitions with
-   * following operators:
-   *          MR job J0:
-   *          ...
-   *          |
-   *          v
-   *          FileSinkOperator_1 (fsInput)
-   *          |
-   *          v
-   *          Merge job J1:
-   *          |
-   *          v
-   *          TableScan (using CombineHiveInputFormat) (tsMerge)
-   *          |
-   *          v
-   *          FileSinkOperator (fsMerge)
-   *
-   *          Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
-   *          do
-   *          not contain the dynamic partitions (their parent). So after the dynamic partitions are
-   *          created (after the first job finished before the moveTask or ConditionalTask start),
-   *          we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
-   *          partition
-   *          directories.
-   *
-   */
-  private void createMRWorkForMergingFiles (FileSinkOperator fsInput, GenMRProcContext ctx,
-   String finalName) throws SemanticException {
-
-    //
-    // 1. create the operator tree
-    //
-    HiveConf conf = ctx.getParseCtx().getConf();
-    FileSinkDesc fsInputDesc = fsInput.getConf();
-
-    // Create a TableScan operator
-    RowSchema inputRS = fsInput.getSchema();
-    Operator<? extends OperatorDesc> tsMerge =
-        GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
-
-    // Create a FileSink operator
-    TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
-    FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
-      conf.getBoolVar(ConfVars.COMPRESSRESULT));
-    FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
-      fsOutputDesc, inputRS, tsMerge);
-
-    // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
-    // needs to include the partition column, and the fsOutput should have
-    // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
-    DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
-    if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
-      // adding DP ColumnInfo to the RowSchema signature
-      ArrayList<ColumnInfo> signature = inputRS.getSignature();
-      String tblAlias = fsInputDesc.getTableInfo().getTableName();
-      LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
-      StringBuilder partCols = new StringBuilder();
-      for (String dpCol : dpCtx.getDPColNames()) {
-        ColumnInfo colInfo = new ColumnInfo(dpCol,
-            TypeInfoFactory.stringTypeInfo, // all partition column type should be string
-            tblAlias, true); // partition column is virtual column
-        signature.add(colInfo);
-        colMap.put(dpCol, dpCol); // input and output have the same column name
-        partCols.append(dpCol).append('/');
-      }
-      partCols.setLength(partCols.length() - 1); // remove the last '/'
-      inputRS.setSignature(signature);
-
-      // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
-      DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
-      dpCtx2.setInputToDPCols(colMap);
-      fsOutputDesc.setDynPartCtx(dpCtx2);
-
-      // update the FileSinkOperator to include partition columns
-      fsInputDesc.getTableInfo().getProperties().setProperty(
-        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
-        partCols.toString()); // list of dynamic partition column names
-    } else {
-      // non-partitioned table
-      fsInputDesc.getTableInfo().getProperties().remove(
-        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-    }
-
-    //
-    // 2. Constructing a conditional task consisting of a move task and a map reduce task
-    //
-    MoveWork dummyMv = new MoveWork(null, null, null,
-        new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
-    MapWork cplan;
-    Serializable work;
-
-    if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
-        fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
-
-      // Check if InputFormatClass is valid
-      String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
-      try {
-        Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
-
-        LOG.info("RCFile format- Using block level merge");
-        cplan = createRCFileMergeTask(fsInputDesc, finalName,
-            dpCtx != null && dpCtx.getNumDPCols() > 0);
-        work = cplan;
-      } catch (ClassNotFoundException e) {
-        String msg = "Illegal input format class: " + inputFormatClass;
-        throw new SemanticException(msg);
-      }
-
-    } else {
-      cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
-      work = new MapredWork();
-      ((MapredWork)work).setMapWork(cplan);
-      // use CombineHiveInputFormat for map-only merging
-    }
-    cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
-    // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
-    // know if merge MR2 will be triggered at execution time
-    ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work,
-        fsInputDesc.getFinalDirName());
-
-    // keep the dynamic partition context in conditional task resolver context
-    ConditionalResolverMergeFilesCtx mrCtx =
-        (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
-    mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
-    mrCtx.setLbCtx(fsInputDesc.getLbCtx());
-
-    //
-    // 3. add the moveTask as the children of the conditional task
-    //
-    linkMoveTask(ctx, fsOutput, cndTsk);
-  }
-
-  /**
-   * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
-   * possible subtrees branching from the ConditionalTask.
-   *
-   * @param ctx
-   * @param newOutput
-   * @param cndTsk
-   */
-  private void linkMoveTask(GenMRProcContext ctx, FileSinkOperator newOutput,
-      ConditionalTask cndTsk) {
-
-    List<Task<MoveWork>> mvTasks = ctx.getMvTask();
-    Task<MoveWork> mvTask = findMoveTask(mvTasks, newOutput);
-
-    for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
-      linkMoveTask(ctx, mvTask, tsk);
-    }
-  }
-
-  /**
-   * Follows the task tree down from task and makes all leaves parents of mvTask
-   *
-   * @param ctx
-   * @param mvTask
-   * @param task
-   */
-  private void linkMoveTask(GenMRProcContext ctx, Task<MoveWork> mvTask,
-      Task<? extends Serializable> task) {
-
-    if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
-      // If it's a leaf, add the move task as a child
-      addDependentMoveTasks(ctx, mvTask, task);
-    } else {
-      // Otherwise, for each child run this method recursively
-      for (Task<? extends Serializable> childTask : task.getDependentTasks()) {
-        linkMoveTask(ctx, mvTask, childTask);
-      }
-    }
-  }
-
-  /**
-   * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
-   * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
-   * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
-   * well.
-   *
-   * @param ctx
-   * @param mvTask
-   * @param parentTask
-   */
-  private void addDependentMoveTasks(GenMRProcContext ctx, Task<MoveWork> mvTask,
-      Task<? extends Serializable> parentTask) {
-
-    if (mvTask != null) {
-      if (ctx.getConf().getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) {
-        DependencyCollectionTask dependencyTask = ctx.getDependencyTaskForMultiInsert();
-        parentTask.addDependentTask(dependencyTask);
-        if (mvTask.getWork().getLoadTableWork() != null) {
-          // Moving tables/partitions depend on the dependencyTask
-          dependencyTask.addDependentTask(mvTask);
-        } else {
-          // Moving files depends on the parentTask (we still want the dependencyTask to depend
-          // on the parentTask)
-          parentTask.addDependentTask(mvTask);
-        }
-      } else {
-        parentTask.addDependentTask(mvTask);
-      }
-    }
-  }
-
-  /**
-   * Create a MapredWork based on input path, the top operator and the input
-   * table descriptor.
-   *
-   * @param conf
-   * @param topOp
-   *          the table scan operator that is the root of the MapReduce task.
-   * @param fsDesc
-   *          the file sink descriptor that serves as the input to this merge task.
-   * @param parentMR
-   *          the parent MapReduce work
-   * @param parentFS
-   *          the last FileSinkOperator in the parent MapReduce work
-   * @return the MapredWork
-   */
-  private MapWork createMRWorkForMergingFiles (HiveConf conf,
-    Operator<? extends OperatorDesc> topOp,  FileSinkDesc fsDesc) {
-
-    ArrayList<String> aliases = new ArrayList<String>();
-    String inputDir = fsDesc.getFinalDirName();
-    TableDesc tblDesc = fsDesc.getTableInfo();
-    aliases.add(inputDir); // dummy alias: just use the input path
-
-    // constructing the default MapredWork
-    MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
-    MapWork cplan = cMrPlan.getMapWork();
-    cplan.getPathToAliases().put(inputDir, aliases);
-    cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
-    cplan.getAliasToWork().put(inputDir, topOp);
-    cplan.setMapperCannotSpanPartns(true);
-
-    return cplan;
-  }
-
-  /**
-   * Create a block level merge task for RCFiles.
-   *
-   * @param fsInputDesc
-   * @param finalName
-   * @return MergeWork if table is stored as RCFile,
-   *         null otherwise
-   */
-  private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
-      String finalName, boolean hasDynamicPartitions) throws SemanticException {
-
-    String inputDir = fsInputDesc.getFinalDirName();
-    TableDesc tblDesc = fsInputDesc.getTableInfo();
-
-    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
-      ArrayList<String> inputDirs = new ArrayList<String>();
-      if (!hasDynamicPartitions
-          && !isSkewedStoredAsDirs(fsInputDesc)) {
-        inputDirs.add(inputDir);
-      }
-
-      MergeWork work = new MergeWork(inputDirs, finalName,
-          hasDynamicPartitions, fsInputDesc.getDynPartCtx());
-      LinkedHashMap<String, ArrayList<String>> pathToAliases =
-          new LinkedHashMap<String, ArrayList<String>>();
-      pathToAliases.put(inputDir, (ArrayList<String>) inputDirs.clone());
-      work.setMapperCannotSpanPartns(true);
-      work.setPathToAliases(pathToAliases);
-      work.setAliasToWork(
-          new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
-      if (hasDynamicPartitions
-          || isSkewedStoredAsDirs(fsInputDesc)) {
-        work.getPathToPartitionInfo().put(inputDir,
-            new PartitionDesc(tblDesc, null));
-      }
-      work.setListBucketingCtx(fsInputDesc.getLbCtx());
-
-      return work;
-    }
-
-    throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
-  }
-
-  /**
-   * check if it is skewed table and stored as dirs.
-   *
-   * @param fsInputDesc
-   * @return
-   */
-  private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
-    return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
-        .isSkewedStoredAsDir();
-  }
-
-  /**
-   * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
-   *
-   * @param conf
-   *          HiveConf
-   * @param currTask
-   *          current leaf task
-   * @param mvWork
-   *          MoveWork for the move task
-   * @param mergeWork
-   *          MapredWork for the merge task.
-   * @param inputPath
-   *          the input directory of the merge/move task
-   * @return The conditional task
-   */
-  private ConditionalTask createCondTask(HiveConf conf,
-      Task<? extends Serializable> currTask, MoveWork mvWork,
-      Serializable mergeWork, String inputPath) {
-
-    // There are 3 options for this ConditionalTask:
-    // 1) Merge the partitions
-    // 2) Move the partitions (i.e. don't merge the partitions)
-    // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
-    // merge others) in this case the merge is done first followed by the move to prevent
-    // conflicts.
-    Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
-    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
-    Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
-    Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
-
-    // NOTE! It is necessary merge task is the parent of the move task, and not
-    // the other way around, for the proper execution of the execute method of
-    // ConditionalTask
-    mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
-
-    List<Serializable> listWorks = new ArrayList<Serializable>();
-    listWorks.add(mvWork);
-    listWorks.add(mergeWork);
-
-    ConditionalWork cndWork = new ConditionalWork(listWorks);
-
-    List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
-    listTasks.add(moveOnlyMoveTask);
-    listTasks.add(mergeOnlyMergeTask);
-    listTasks.add(mergeAndMoveMergeTask);
-
-    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
-    cndTsk.setListTasks(listTasks);
-
-    // create resolver
-    cndTsk.setResolver(new ConditionalResolverMergeFiles());
-    ConditionalResolverMergeFilesCtx mrCtx =
-        new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
-    cndTsk.setResolverCtx(mrCtx);
-
-    // make the conditional task as the child of the current leaf task
-    currTask.addDependentTask(cndTsk);
-
-    return cndTsk;
-  }
-
-  private Task<MoveWork> findMoveTask(
-      List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
-    // find the move task
-    for (Task<MoveWork> mvTsk : mvTasks) {
-      MoveWork mvWork = mvTsk.getWork();
-      String srcDir = null;
-      if (mvWork.getLoadFileWork() != null) {
-        srcDir = mvWork.getLoadFileWork().getSourceDir();
-      } else if (mvWork.getLoadTableWork() != null) {
-        srcDir = mvWork.getLoadTableWork().getSourceDir();
-      }
-
-      String fsOpDirName = fsOp.getConf().getFinalDirName();
-      if ((srcDir != null)
-          && (srcDir.equalsIgnoreCase(fsOpDirName))) {
-        return mvTsk;
-      }
-    }
-    return null;
-  }
-
-  /**
    * Process the FileSink operator to generate a MoveTask if necessary.
    *
    * @param fsOp
@@ -651,6 +160,11 @@ public class GenMRFileSink1 implements N
       NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
 
     GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
+    Task<? extends Serializable> currTask = ctx.getCurrTask();
+
+    // If the directory needs to be changed, send the new directory
+    String dest = null;
+
     List<FileSinkOperator> seenFSOps = ctx.getSeenFileSinkOps();
     if (seenFSOps == null) {
       seenFSOps = new ArrayList<FileSinkOperator>();
@@ -660,49 +174,14 @@ public class GenMRFileSink1 implements N
     }
     ctx.setSeenFileSinkOps(seenFSOps);
 
-    Task<? extends Serializable> currTask = ctx.getCurrTask();
-
-    // If the directory needs to be changed, send the new directory
-    String dest = null;
-
-    if (chDir) {
-      dest = fsOp.getConf().getFinalDirName();
-
-      // generate the temporary file
-      // it must be on the same file system as the current destination
-      ParseContext parseCtx = ctx.getParseCtx();
-      Context baseCtx = parseCtx.getContext();
-      String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri());
-
-      FileSinkDesc fileSinkDesc = fsOp.getConf();
-      // Change all the linked file sink descriptors
-      if (fileSinkDesc.isLinkedFileSink()) {
-        for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
-          String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
-          fsConf.setParentDir(tmpDir);
-          fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
-        }
-      } else {
-        fileSinkDesc.setDirName(tmpDir);
-      }
-    }
-
-    Task<MoveWork> mvTask = null;
-
-    if (!chDir) {
-      mvTask = findMoveTask(ctx.getMvTask(), fsOp);
-    }
+    dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(),
+        ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert());
 
     Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
     String currAliasId = ctx.getCurrAliasId();
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
         ctx.getOpTaskMap();
 
-    // Set the move task to be dependent on the current task
-    if (mvTask != null) {
-      addDependentMoveTasks(ctx, mvTask, currTask);
-    }
-
     // In case of multi-table insert, the path to alias mapping is needed for
     // all the sources. Since there is no
     // reducer, treat it as a plan with null reducer

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Nov  4 01:34:01 2013
@@ -33,11 +33,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -50,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -61,20 +67,31 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
 
 /**
  * General utility common functions for the Processor to convert operator into
@@ -1107,6 +1124,571 @@ public final class GenMapRedUtils {
     }
   }
 
+  /**
+   * @param fsInput The FileSink operator.
+   * @param ctx The MR processing context.
+   * @param finalName the final destination path the merge job should output.
+   * @param dependencyTask
+   * @param mvTasks
+   * @param conf
+   * @param currTask
+   * @throws SemanticException
+
+   * create a Map-only merge job using CombineHiveInputFormat for all partitions with
+   * following operators:
+   *          MR job J0:
+   *          ...
+   *          |
+   *          v
+   *          FileSinkOperator_1 (fsInput)
+   *          |
+   *          v
+   *          Merge job J1:
+   *          |
+   *          v
+   *          TableScan (using CombineHiveInputFormat) (tsMerge)
+   *          |
+   *          v
+   *          FileSinkOperator (fsMerge)
+   *
+   *          Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+   *          do
+   *          not contain the dynamic partitions (their parent). So after the dynamic partitions are
+   *          created (after the first job finished before the moveTask or ConditionalTask start),
+   *          we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+   *          partition
+   *          directories.
+   *
+   */
+  public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
+   String finalName, DependencyCollectionTask dependencyTask,
+   List<Task<MoveWork>> mvTasks, HiveConf conf,
+   Task<? extends Serializable> currTask) throws SemanticException {
+
+    //
+    // 1. create the operator tree
+    //
+    FileSinkDesc fsInputDesc = fsInput.getConf();
+
+    // Create a TableScan operator
+    RowSchema inputRS = fsInput.getSchema();
+    Operator<? extends OperatorDesc> tsMerge =
+        GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
+
+    // Create a FileSink operator
+    TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
+    FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
+      conf.getBoolVar(ConfVars.COMPRESSRESULT));
+    FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
+      fsOutputDesc, inputRS, tsMerge);
+
+    // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
+    // needs to include the partition column, and the fsOutput should have
+    // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
+    DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
+    if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+      // adding DP ColumnInfo to the RowSchema signature
+      ArrayList<ColumnInfo> signature = inputRS.getSignature();
+      String tblAlias = fsInputDesc.getTableInfo().getTableName();
+      LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
+      StringBuilder partCols = new StringBuilder();
+      for (String dpCol : dpCtx.getDPColNames()) {
+        ColumnInfo colInfo = new ColumnInfo(dpCol,
+            TypeInfoFactory.stringTypeInfo, // all partition column type should be string
+            tblAlias, true); // partition column is virtual column
+        signature.add(colInfo);
+        colMap.put(dpCol, dpCol); // input and output have the same column name
+        partCols.append(dpCol).append('/');
+      }
+      partCols.setLength(partCols.length() - 1); // remove the last '/'
+      inputRS.setSignature(signature);
+
+      // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
+      DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
+      dpCtx2.setInputToDPCols(colMap);
+      fsOutputDesc.setDynPartCtx(dpCtx2);
+
+      // update the FileSinkOperator to include partition columns
+      fsInputDesc.getTableInfo().getProperties().setProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+        partCols.toString()); // list of dynamic partition column names
+    } else {
+      // non-partitioned table
+      fsInputDesc.getTableInfo().getProperties().remove(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+    }
+
+    //
+    // 2. Constructing a conditional task consisting of a move task and a map reduce task
+    //
+    MoveWork dummyMv = new MoveWork(null, null, null,
+        new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
+    MapWork cplan;
+    Serializable work;
+
+    if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
+        fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+
+      // Check if InputFormatClass is valid
+      String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+      try {
+        Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
+
+        LOG.info("RCFile format- Using block level merge");
+        cplan = GenMapRedUtils.createRCFileMergeTask(fsInputDesc, finalName,
+            dpCtx != null && dpCtx.getNumDPCols() > 0);
+        work = cplan;
+      } catch (ClassNotFoundException e) {
+        String msg = "Illegal input format class: " + inputFormatClass;
+        throw new SemanticException(msg);
+      }
+
+    } else {
+      cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+      work = new MapredWork();
+      ((MapredWork)work).setMapWork(cplan);
+      // use CombineHiveInputFormat for map-only merging
+    }
+    cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
+    // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
+    // know if merge MR2 will be triggered at execution time
+    ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
+        fsInputDesc.getFinalDirName());
+
+    // keep the dynamic partition context in conditional task resolver context
+    ConditionalResolverMergeFilesCtx mrCtx =
+        (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+    mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+    mrCtx.setLbCtx(fsInputDesc.getLbCtx());
+
+    //
+    // 3. add the moveTask as the children of the conditional task
+    //
+    linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
+  }
+
+  /**
+   * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
+   * possible subtrees branching from the ConditionalTask.
+   *
+   * @param newOutput
+   * @param cndTsk
+   * @param mvTasks
+   * @param hconf
+   * @param dependencyTask
+   */
+  public static void linkMoveTask(FileSinkOperator newOutput,
+      ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf,
+      DependencyCollectionTask dependencyTask) {
+
+    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
+
+    for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
+      linkMoveTask(mvTask, tsk, hconf, dependencyTask);
+    }
+  }
+
+  /**
+   * Follows the task tree down from task and makes all leaves parents of mvTask
+   *
+   * @param mvTask
+   * @param task
+   * @param hconf
+   * @param dependencyTask
+   */
+  public static void linkMoveTask(Task<MoveWork> mvTask,
+      Task<? extends Serializable> task, HiveConf hconf,
+      DependencyCollectionTask dependencyTask) {
+
+    if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
+      // If it's a leaf, add the move task as a child
+      addDependentMoveTasks(mvTask, hconf, task, dependencyTask);
+    } else {
+      // Otherwise, for each child run this method recursively
+      for (Task<? extends Serializable> childTask : task.getDependentTasks()) {
+        linkMoveTask(mvTask, childTask, hconf, dependencyTask);
+      }
+    }
+  }
+
+  /**
+   * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
+   * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
+   * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
+   * well.
+   *
+   * @param mvTask
+   * @param hconf
+   * @param parentTask
+   * @param dependencyTask
+   */
+  public static void addDependentMoveTasks(Task<MoveWork> mvTask, HiveConf hconf,
+      Task<? extends Serializable> parentTask, DependencyCollectionTask dependencyTask) {
+
+    if (mvTask != null) {
+      if (hconf.getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) {
+        parentTask.addDependentTask(dependencyTask);
+        if (mvTask.getWork().getLoadTableWork() != null) {
+          // Moving tables/partitions depend on the dependencyTask
+          dependencyTask.addDependentTask(mvTask);
+        } else {
+          // Moving files depends on the parentTask (we still want the dependencyTask to depend
+          // on the parentTask)
+          parentTask.addDependentTask(mvTask);
+        }
+      } else {
+        parentTask.addDependentTask(mvTask);
+      }
+    }
+  }
+
+
+  /**
+   * Add the StatsTask as a dependent task of the MoveTask
+   * because StatsTask will change the Table/Partition metadata. For atomicity, we
+   * should not change it before the data is actually there done by MoveTask.
+   *
+   * @param nd
+   *          the FileSinkOperator whose results are taken care of by the MoveTask.
+   * @param mvTask
+   *          The MoveTask that moves the FileSinkOperator's results.
+   * @param currTask
+   *          The MapRedTask that the FileSinkOperator belongs to.
+   * @param hconf
+   *          HiveConf
+   */
+  public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
+      Task<? extends Serializable> currTask, HiveConf hconf) {
+
+    MoveWork mvWork = ((MoveTask) mvTask).getWork();
+    StatsWork statsWork = null;
+    if (mvWork.getLoadTableWork() != null) {
+      statsWork = new StatsWork(mvWork.getLoadTableWork());
+    } else if (mvWork.getLoadFileWork() != null) {
+      statsWork = new StatsWork(mvWork.getLoadFileWork());
+    }
+    assert statsWork != null : "Error when genereting StatsTask";
+    statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+
+    if (currTask.getWork() instanceof MapredWork) {
+      MapredWork mrWork = (MapredWork) currTask.getWork();
+      mrWork.getMapWork().setGatheringStats(true);
+      if (mrWork.getReduceWork() != null) {
+        mrWork.getReduceWork().setGatheringStats(true);
+      }
+    } else {
+      TezWork work = (TezWork) currTask.getWork();
+      for (BaseWork w: work.getAllWork()) {
+        w.setGatheringStats(true);
+      }
+    }
+
+    // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+    // in FileSinkDesc is used for stats publishing. They should be consistent.
+    statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
+    Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
+
+    // mark the MapredWork and FileSinkOperator for gathering stats
+    nd.getConf().setGatherStats(true);
+    nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+    nd.getConf().setMaxStatsKeyPrefixLength(
+        hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
+    // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
+
+    // subscribe feeds from the MoveTask so that MoveTask can forward the list
+    // of dynamic partition list to the StatsTask
+    mvTask.addDependentTask(statsTask);
+    statsTask.subscribeFeed(mvTask);
+  }
+
+  /**
+   * Returns true iff current query is an insert into for the given file sink
+   *
+   * @param parseCtx
+   * @param fsOp
+   * @return
+   */
+  public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) {
+    return fsOp.getConf().getTableInfo().getTableName() != null &&
+        parseCtx.getQB().getParseInfo().isInsertToTable();
+  }
+
+  /**
+   * Create a MapredWork based on input path, the top operator and the input
+   * table descriptor.
+   *
+   * @param conf
+   * @param topOp
+   *          the table scan operator that is the root of the MapReduce task.
+   * @param fsDesc
+   *          the file sink descriptor that serves as the input to this merge task.
+   * @param parentMR
+   *          the parent MapReduce work
+   * @param parentFS
+   *          the last FileSinkOperator in the parent MapReduce work
+   * @return the MapredWork
+   */
+  private static MapWork createMRWorkForMergingFiles (HiveConf conf,
+    Operator<? extends OperatorDesc> topOp,  FileSinkDesc fsDesc) {
+
+    ArrayList<String> aliases = new ArrayList<String>();
+    String inputDir = fsDesc.getFinalDirName();
+    TableDesc tblDesc = fsDesc.getTableInfo();
+    aliases.add(inputDir); // dummy alias: just use the input path
+
+    // constructing the default MapredWork
+    MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+    MapWork cplan = cMrPlan.getMapWork();
+    cplan.getPathToAliases().put(inputDir, aliases);
+    cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
+    cplan.getAliasToWork().put(inputDir, topOp);
+    cplan.setMapperCannotSpanPartns(true);
+
+    return cplan;
+  }
+
+  /**
+   * Create a block level merge task for RCFiles.
+   *
+   * @param fsInputDesc
+   * @param finalName
+   * @return MergeWork if table is stored as RCFile,
+   *         null otherwise
+   */
+  public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+      String finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+    String inputDir = fsInputDesc.getFinalDirName();
+    TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+      ArrayList<String> inputDirs = new ArrayList<String>();
+      if (!hasDynamicPartitions
+          && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+        inputDirs.add(inputDir);
+      }
+
+      MergeWork work = new MergeWork(inputDirs, finalName,
+          hasDynamicPartitions, fsInputDesc.getDynPartCtx());
+      LinkedHashMap<String, ArrayList<String>> pathToAliases =
+          new LinkedHashMap<String, ArrayList<String>>();
+      pathToAliases.put(inputDir, (ArrayList<String>) inputDirs.clone());
+      work.setMapperCannotSpanPartns(true);
+      work.setPathToAliases(pathToAliases);
+      work.setAliasToWork(
+          new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
+      if (hasDynamicPartitions
+          || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+        work.getPathToPartitionInfo().put(inputDir,
+            new PartitionDesc(tblDesc, null));
+      }
+      work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+      return work;
+    }
+
+    throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+  }
+
+  /**
+   * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
+   *
+   * @param conf
+   *          HiveConf
+   * @param currTask
+   *          current leaf task
+   * @param mvWork
+   *          MoveWork for the move task
+   * @param mergeWork
+   *          MapredWork for the merge task.
+   * @param inputPath
+   *          the input directory of the merge/move task
+   * @return The conditional task
+   */
+  @SuppressWarnings("unchecked")
+  public static ConditionalTask createCondTask(HiveConf conf,
+      Task<? extends Serializable> currTask, MoveWork mvWork,
+      Serializable mergeWork, String inputPath) {
+
+    // There are 3 options for this ConditionalTask:
+    // 1) Merge the partitions
+    // 2) Move the partitions (i.e. don't merge the partitions)
+    // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
+    // merge others) in this case the merge is done first followed by the move to prevent
+    // conflicts.
+    Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
+    Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
+    Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
+    Task<? extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
+
+    // NOTE! It is necessary merge task is the parent of the move task, and not
+    // the other way around, for the proper execution of the execute method of
+    // ConditionalTask
+    mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
+
+    List<Serializable> listWorks = new ArrayList<Serializable>();
+    listWorks.add(mvWork);
+    listWorks.add(mergeWork);
+
+    ConditionalWork cndWork = new ConditionalWork(listWorks);
+
+    List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+    listTasks.add(moveOnlyMoveTask);
+    listTasks.add(mergeOnlyMergeTask);
+    listTasks.add(mergeAndMoveMergeTask);
+
+    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
+    cndTsk.setListTasks(listTasks);
+
+    // create resolver
+    cndTsk.setResolver(new ConditionalResolverMergeFiles());
+    ConditionalResolverMergeFilesCtx mrCtx =
+        new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+    cndTsk.setResolverCtx(mrCtx);
+
+    // make the conditional task as the child of the current leaf task
+    currTask.addDependentTask(cndTsk);
+
+    return cndTsk;
+  }
+
+  /**
+   * check if it is skewed table and stored as dirs.
+   *
+   * @param fsInputDesc
+   * @return
+   */
+  public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+    return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+        .isSkewedStoredAsDir();
+  }
+
+  public static Task<MoveWork> findMoveTask(
+      List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp) {
+    // find the move task
+    for (Task<MoveWork> mvTsk : mvTasks) {
+      MoveWork mvWork = mvTsk.getWork();
+      String srcDir = null;
+      if (mvWork.getLoadFileWork() != null) {
+        srcDir = mvWork.getLoadFileWork().getSourceDir();
+      } else if (mvWork.getLoadTableWork() != null) {
+        srcDir = mvWork.getLoadTableWork().getSourceDir();
+      }
+
+      String fsOpDirName = fsOp.getConf().getFinalDirName();
+      if ((srcDir != null)
+          && (srcDir.equalsIgnoreCase(fsOpDirName))) {
+        return mvTsk;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns true iff the fsOp requires a merge
+   * @param mvTasks
+   * @param hconf
+   * @param fsOp
+   * @param currTask
+   * @param isInsertTable
+   * @return
+   */
+  public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf, FileSinkOperator fsOp,
+      Task<? extends Serializable> currTask, boolean isInsertTable) {
+
+    // Has the user enabled merging of files for map-only jobs or for all jobs
+    if ((mvTasks != null) && (!mvTasks.isEmpty())) {
+
+      // no need of merging if the move is to a local file system
+      MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+
+      if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+        GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
+      }
+
+      if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
+        if (fsOp.getConf().isLinkedFileSink()) {
+          // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
+          // number of reducers are few, so the number of files anyway are small.
+          // However, with this optimization, we are increasing the number of files
+          // possibly by a big margin. So, merge aggresively.
+          if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
+              hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
+            return true;
+          }
+        } else {
+          // There are separate configuration parameters to control whether to
+          // merge for a map-only job
+          // or for a map-reduce job
+          ReduceWork reduceWork = currTask.getWork() instanceof MapredWork
+              ? ((MapredWork) currTask.getWork()).getReduceWork() : null;
+          boolean mergeMapOnly =
+              hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+          boolean mergeMapRed =
+              hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+              reduceWork != null;
+          if (mergeMapOnly || mergeMapRed) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Create and add any dependent move tasks
+   *
+   * @param currTask
+   * @param chDir
+   * @param fsOp
+   * @param parseCtx
+   * @param mvTasks
+   * @param hconf
+   * @param dependencyTask
+   * @return
+   */
+  public static String createMoveTask(Task<? extends Serializable> currTask, boolean chDir,
+      FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks,
+      HiveConf hconf, DependencyCollectionTask dependencyTask) {
+
+    String dest = null;
+
+    if (chDir) {
+      dest = fsOp.getConf().getFinalDirName();
+
+      // generate the temporary file
+      // it must be on the same file system as the current destination
+      Context baseCtx = parseCtx.getContext();
+      String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri());
+
+      FileSinkDesc fileSinkDesc = fsOp.getConf();
+      // Change all the linked file sink descriptors
+      if (fileSinkDesc.isLinkedFileSink()) {
+        for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
+          String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
+          fsConf.setParentDir(tmpDir);
+          fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
+        }
+      } else {
+        fileSinkDesc.setDirName(tmpDir);
+      }
+    }
+
+    Task<MoveWork> mvTask = null;
+
+    if (!chDir) {
+      mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+    }
+
+    // Set the move task to be dependent on the current task
+    if (mvTask != null) {
+      GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask);
+    }
+
+    return dest;
+  }
+
   private GenMapRedUtils() {
     // prevent instantiation
   }

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1538485&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Mon Nov  4 01:34:01 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+
+/**
+ * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ *
+ */
+public class FileSinkProcessor implements NodeProcessor {
+
+  static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
+
+  @Override
+  /*
+   * (non-Javadoc)
+   * we should ideally not modify the tree we traverse.
+   * However, since we need to walk the tree at any time when we modify the
+   * operator, we might as well do it here.
+   */
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procCtx, Object... nodeOutputs)
+      throws SemanticException {
+
+    GenTezProcContext context = (GenTezProcContext) procCtx;
+    FileSinkOperator fileSink = (FileSinkOperator) nd;
+    ParseContext parseContext = context.parseContext;
+
+
+    boolean isInsertTable = // is INSERT OVERWRITE TABLE
+        GenMapRedUtils.isInsertInto(parseContext, fileSink);
+    HiveConf hconf = parseContext.getConf();
+
+    boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
+        hconf, fileSink, context.currentTask, isInsertTable);
+
+    String finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+        chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
+
+    if (chDir) {
+      // Merge the files in the destination table/partitions by creating Map-only merge job
+      // If underlying data is RCFile a RCFileBlockMerge task would be created.
+      LOG.info("using CombineHiveInputformat for the merge job");
+      GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
+          context.dependencyTask, context.moveTask,
+          hconf, context.currentTask);
+    }
+
+    return true;
+  }
+}
\ No newline at end of file

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon Nov  4 01:34:01 2013
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.hooks.R
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -97,6 +99,9 @@ public class GenTezProcContext implement
   // remember the dummy ops we created
   public final Map<Operator<?>, List<Operator<?>>> linkChildOpWithDummyOp;
 
+  // used to group dependent tasks for multi table inserts
+  public final DependencyCollectionTask dependencyTask;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -115,5 +120,7 @@ public class GenTezProcContext implement
     this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
     this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
     this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
+    this.dependencyTask = (DependencyCollectionTask)
+        TaskFactory.get(new DependencyCollectionWork(), conf);
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Nov  4 01:34:01 2013
@@ -71,6 +71,12 @@ public class GenTezWork implements NodeP
     // packing into a vertex, typically a table scan, union or join
     Operator<?> root = context.currentRootOperator;
     if (root == null) {
+      // if there are no more rootOperators we're dealing with multiple
+      // file sinks off of the same table scan. Bail.
+      if (context.rootOperators.isEmpty()) {
+        return null;
+      }
+
       // null means that we're starting with a new table scan
       // the graph walker walks the rootOperators in the same
       // order so we can just take the next

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1538485&r1=1538484&r2=1538485&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Nov  4 01:34:01 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -126,16 +127,17 @@ public class TezCompiler extends TaskCom
     // the operator stack.
     // The dispatcher generates the plan from the operator tree
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"),
+    opRules.put(new RuleRegExp("Split Work - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"),
         genTezWork);
-    opRules.put(new RuleRegExp(new String("No more walking on ReduceSink-MapJoin"),
+
+    opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin",
         ReduceSinkOperator.getOperatorName() + "%" +
         MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc());
-    opRules.put(new RuleRegExp(new String("Split Work - FileSink"),
-        FileSinkOperator.getOperatorName() + "%"),
-        genTezWork);
 
+    opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink",
+        FileSinkOperator.getOperatorName() + "%"),
+        new CompositeProcessor(new FileSinkProcessor(), genTezWork));
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along

Added: hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q?rev=1538485&view=auto
==============================================================================
--- hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q (added)
+++ hive/branches/tez/ql/src/test/queries/clientpositive/tez_dml.q Mon Nov  4 01:34:01 2013
@@ -0,0 +1,37 @@
+set hive.optimize.tez=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+-- CTAS
+EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt;
+CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt;
+
+SELECT * FROM tmp_src;
+
+-- dyn partitions
+CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int);
+EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src;
+INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src;
+
+SELECT * FROM tmp_src_part;
+
+-- multi insert
+CREATE TABLE even (c int, d string);
+CREATE TABLE odd (c int, d string);
+
+EXPLAIN
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1;
+
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1;
+
+SELECT * FROM even;
+SELECT * FROM odd;
+
+-- drop the tables
+DROP TABLE even;
+DROP TABLE odd;
+DROP TABLE tmp_src;
+DROP TABLE tmp_src_part;