You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/02/17 08:58:48 UTC

svn commit: r1568894 [1/3] - in /hive/branches/tez: itests/qtest/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/ ql/src/java/org/apache/hadoop/hive/ql/pa...

Author: gunther
Date: Mon Feb 17 07:58:47 2014
New Revision: 1568894

URL: http://svn.apache.org/r1568894
Log:
HIVE-6362: Support union all on tez (Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union2.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union3.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union4.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union5.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union6.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union7.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union8.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/union9.q.out
Modified:
    hive/branches/tez/itests/qtest/pom.xml
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
    hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/insert1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/insert_into1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/insert_into2.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/join1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/load_dyn_part1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/mapreduce1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/mapreduce2.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/merge1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/merge2.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/sample1.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/tez/tez_dml.q.out

Modified: hive/branches/tez/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/pom.xml?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/pom.xml (original)
+++ hive/branches/tez/itests/qtest/pom.xml Mon Feb 17 07:58:47 2014
@@ -39,7 +39,7 @@
     <minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q</minimr.query.files>
     <minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q</minimr.query.negative.files>
     <minitez.query.files>tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q</minitez.query.files>
-    <minitez.query.files.shared>join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q</minitez.query.files.shared>
+    <minitez.query.files.shared>join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
     <beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
 me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
 verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>
   </properties>
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Feb 17 07:58:47 2014
@@ -777,6 +777,14 @@ public final class Utilities {
     }
   }
 
+  public static Set<Operator<?>> cloneOperatorTree(Configuration conf, Set<Operator<?>> roots) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    serializePlan(roots, baos, conf, true);
+    Set<Operator<?>> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+        roots.getClass(), conf, true);
+    return result;
+  }
+
   private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) {
     PerfLogger perfLogger = PerfLogger.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Feb 17 07:58:47 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -82,6 +83,7 @@ import org.apache.tez.dag.api.EdgeProper
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -90,6 +92,7 @@ import org.apache.tez.dag.api.VertexLoca
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -98,6 +101,8 @@ import org.apache.tez.mapreduce.hadoop.M
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -189,9 +194,56 @@ public class DagUtils {
   }
 
   /**
+   * Given a Vertex group and a vertex createEdge will create an
+   * Edge between them.
+   *
+   * @param group The parent VertexGroup
+   * @param wConf The job conf of the child vertex
+   * @param w The child vertex
+   * @param edgeType the type of connection between the two
+   * endpoints.
+   */
+  public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+      Vertex w, EdgeType edgeType)
+      throws IOException {
+    
+    Class mergeInputClass;
+    
+    LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
+    w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+
+    switch (edgeType) {
+    case BROADCAST_EDGE:
+      mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+      break;
+
+    case SIMPLE_EDGE:
+    default:
+      mergeInputClass = TezMergedLogicalInput.class;
+      break;
+    }
+
+    return new GroupInputEdge(group, w, createEdgeProperty(edgeType),
+         new InputDescriptor(mergeInputClass.getName()));
+  }
+
+  /**
+   * Given two vertices a, b update their configurations to be used in an Edge a-b
+   */
+  public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) 
+    throws IOException {
+
+    // Tez needs to setup output subsequent input pairs correctly
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+
+    // update payloads (configuration for the vertices might have changed)
+    v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
+    w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+  }
+
+  /**
    * Given two vertices and their respective configuration objects createEdge
-   * will create an Edge object that connects the two. Currently the edge will
-   * always be a stable bi-partite edge.
+   * will create an Edge object that connects the two.
    *
    * @param vConf JobConf of the first vertex
    * @param v The first vertex (source)
@@ -203,13 +255,15 @@ public class DagUtils {
       EdgeType edgeType)
       throws IOException {
 
-    // Tez needs to setup output subsequent input pairs correctly
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+    updateConfigurationForEdge(vConf, v, wConf, w);
 
-    // update payloads (configuration for the vertices might have changed)
-    v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
-    w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+    return new Edge(v, w, createEdgeProperty(edgeType));
+  }
 
+  /*
+   * Helper function to create an edge property from an edge type.
+   */
+  private EdgeProperty createEdgeProperty(EdgeType edgeType) {
     DataMovementType dataMovementType;
     Class logicalInputClass;
     Class logicalOutputClass;
@@ -235,7 +289,8 @@ public class DagUtils {
             SchedulingType.SEQUENTIAL,
             new OutputDescriptor(logicalOutputClass.getName()),
             new InputDescriptor(logicalInputClass.getName()));
-    return new Edge(v, w, edgeProperty);
+
+    return edgeProperty;
   }
 
   /*

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Feb 17 07:58:47 2014
@@ -55,7 +55,6 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 
 /**
  * Process input from tez LogicalInput and write output - for a map plan
@@ -184,15 +183,19 @@ public class ReduceRecordProcessor  exte
 
   @Override
   void run() throws IOException{
-    List<ShuffledMergedInput> shuffleInputs = getShuffleInputs(inputs);
+    List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
     KeyValuesReader kvsReader;
 
-    if(shuffleInputs.size() == 1){
-      //no merging of inputs required
-      kvsReader = shuffleInputs.get(0).getReader();
-    }else {
-      //get a sort merged input
-      kvsReader = new InputMerger(shuffleInputs);
+    try {
+      if(shuffleInputs.size() == 1){
+        //no merging of inputs required
+        kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader();
+      }else {
+        //get a sort merged input
+        kvsReader = new InputMerger(shuffleInputs);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
     }
 
     while(kvsReader.next()){
@@ -211,12 +214,12 @@ public class ReduceRecordProcessor  exte
    * @param inputs
    * @return
    */
-  private List<ShuffledMergedInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
+  private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
     //the reduce plan inputs have tags, add all inputs that have tags
     Map<Integer, String> tag2input = redWork.getTagToInput();
-    ArrayList<ShuffledMergedInput> shuffleInputs = new ArrayList<ShuffledMergedInput>();
+    ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
     for(String inpStr : tag2input.values()){
-      shuffleInputs.add((ShuffledMergedInput)inputs.get(inpStr));
+      shuffleInputs.add((LogicalInput)inputs.get(inpStr));
     }
     return shuffleInputs;
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Feb 17 07:58:47 2014
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.log.Per
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -54,9 +56,11 @@ import org.apache.tez.common.counters.Te
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 
@@ -97,9 +101,6 @@ public class TezTask extends Task<TezWor
     DAGClient client = null;
     TezSessionState session = null;
 
-    // Tez requires us to use RPC for the query plan
-    HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
-
     try {
       // Get or create Context object. If we create it we have to clean
       // it later as well.
@@ -216,24 +217,68 @@ public class TezTask extends Task<TezWor
 
       // translate work to vertex
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
-      JobConf wxConf = utils.initializeVertexConf(conf, w);
-      Vertex wx = utils.createVertex(wxConf, w, tezDir,
-         appJarLr, additionalLr, fs, ctx, !isFinal);
-      dag.addVertex(wx);
-      utils.addCredentials(w, dag);
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
-      workToVertex.put(w, wx);
-      workToConf.put(w, wxConf);
-
-      // add all dependencies (i.e.: edges) to the graph
-      for (BaseWork v: work.getChildren(w)) {
-        assert workToVertex.containsKey(v);
-        Edge e = null;
 
-        EdgeType edgeType = work.getEdgeProperty(w, v);
+      if (w instanceof UnionWork) {
+        // Special case for unions. These items translate to VertexGroups
+
+        List<BaseWork> unionWorkItems = new LinkedList<BaseWork>();
+        List<BaseWork> children = new LinkedList<BaseWork>();
+
+        // split the children into vertices that make up the union and vertices that are
+        // proper children of the union
+        for (BaseWork v: work.getChildren(w)) {
+          EdgeType type = work.getEdgeProperty(w, v);
+          if (type == EdgeType.CONTAINS) {
+            unionWorkItems.add(v);
+          } else {
+            children.add(v);
+          }
+        }
+
+        // create VertexGroup
+        Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
 
-        e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
-        dag.addEdge(e);
+        int i = 0;
+        for (BaseWork v: unionWorkItems) {
+          vertexArray[i++] = workToVertex.get(v);
+        }
+        VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+        
+        // now hook up the children
+        for (BaseWork v: children) {
+          // need to pairwise patch up the configuration of the vertices
+          for (BaseWork part: unionWorkItems) {
+            utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part), 
+                 workToConf.get(v), workToVertex.get(v));
+          }
+          
+          // finally we can create the grouped edge
+          GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
+               workToVertex.get(v), work.getEdgeProperty(w, v));
+
+          dag.addEdge(e);
+        }
+      } else {
+        // Regular vertices
+        JobConf wxConf = utils.initializeVertexConf(conf, w);
+        Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, 
+          additionalLr, fs, ctx, !isFinal);
+        dag.addVertex(wx);
+        utils.addCredentials(w, dag);
+        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
+        workToVertex.put(w, wx);
+        workToConf.put(w, wxConf);
+        
+        // add all dependencies (i.e.: edges) to the graph
+        for (BaseWork v: work.getChildren(w)) {
+          assert workToVertex.containsKey(v);
+          Edge e = null;
+
+          EdgeType edgeType = work.getEdgeProperty(w, v);
+          
+          e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
+          dag.addEdge(e);
+        }
       }
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java Mon Feb 17 07:58:47 2014
@@ -26,12 +26,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor;
 import org.apache.hadoop.io.BinaryComparable;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 
 /**
  * A KeyValuesReader implementation that returns a sorted stream of key-values
- * by doing a sorted merge of the key-value in ShuffledMergedInputs.
+ * by doing a sorted merge of the key-value in LogicalInputs.
  * Tags are in the last byte of the key, so no special handling for tags is required.
  * Uses a priority queue to pick the KeyValuesReader of the input that is next in
  * sort order.
@@ -42,12 +43,12 @@ public class InputMerger implements KeyV
   private PriorityQueue<KeyValuesReader> pQueue = null;
   private KeyValuesReader nextKVReader = null;
 
-  public InputMerger(List<ShuffledMergedInput> shuffleInputs) throws IOException {
-    //get KeyValuesReaders from the ShuffledMergedInput and add them to priority queue
+  public InputMerger(List<? extends Input> shuffleInputs) throws Exception {
+    //get KeyValuesReaders from the LogicalInput and add them to priority queue
     int initialCapacity = shuffleInputs.size();
     pQueue = new PriorityQueue<KeyValuesReader>(initialCapacity, new KVReaderComparator());
-    for(ShuffledMergedInput input : shuffleInputs){
-      addToQueue(input.getReader());
+    for(Input input : shuffleInputs){
+      addToQueue((KeyValuesReader)input.getReader());
     }
   }
 

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1568894&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Mon Feb 17 07:58:47 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.tools;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * TezMergedLogicalInput is an adapter to make union input look like
+ * a single input in tez.
+ */
+public class TezMergedLogicalInput extends MergedLogicalInput {
+
+  @Override
+  public Reader getReader() throws Exception {
+    return new InputMerger(getInputs());
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Mon Feb 17 07:58:47 2014
@@ -51,28 +51,9 @@ public class FileSinkProcessor implement
 
     GenTezProcContext context = (GenTezProcContext) procCtx;
     FileSinkOperator fileSink = (FileSinkOperator) nd;
-    ParseContext parseContext = context.parseContext;
-
-
-    boolean isInsertTable = // is INSERT OVERWRITE TABLE
-        GenMapRedUtils.isInsertInto(parseContext, fileSink);
-    HiveConf hconf = parseContext.getConf();
-
-    boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
-        hconf, fileSink, context.currentTask, isInsertTable);
-
-    Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
-        chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
-
-    if (chDir) {
-      // Merge the files in the destination table/partitions by creating Map-only merge job
-      // If underlying data is RCFile a RCFileBlockMerge task would be created.
-      LOG.info("using CombineHiveInputformat for the merge job");
-      GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
-          context.dependencyTask, context.moveTask,
-          hconf, context.currentTask);
-    }
-
+    
+    // just remember it for later processing
+    context.fileSinkSet.add(fileSink);
     return true;
   }
 }
\ No newline at end of file

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon Feb 17 07:58:47 2014
@@ -20,14 +20,19 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.Ba
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 
 /**
@@ -105,6 +111,15 @@ public class GenTezProcContext implement
   // used to group dependent tasks for multi table inserts
   public final DependencyCollectionTask dependencyTask;
 
+  // used to hook up unions
+  public final Map<Operator<?>, BaseWork> unionWorkMap;
+  public final List<UnionOperator> currentUnionOperators;
+  public final Set<BaseWork> workWithUnionOperators;
+
+  // we link filesink that will write to the same final location
+  public final Map<Path, List<FileSinkDesc>> linkedFileSinks;
+  public final Set<FileSinkOperator> fileSinkSet;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -126,6 +141,11 @@ public class GenTezProcContext implement
     this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
     this.dependencyTask = (DependencyCollectionTask)
         TaskFactory.get(new DependencyCollectionWork(), conf);
+    this.unionWorkMap = new HashMap<Operator<?>, BaseWork>();
+    this.currentUnionOperators = new LinkedList<UnionOperator>();
+    this.workWithUnionOperators = new HashSet<BaseWork>();
+    this.linkedFileSinks = new HashMap<Path, List<FileSinkDesc>>();
+    this.fileSinkSet = new HashSet<FileSinkOperator>();
 
     rootTasks.add(currentTask);
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Mon Feb 17 07:58:47 2014
@@ -18,17 +18,35 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 
 /**
@@ -59,6 +77,13 @@ public class GenTezUtils {
     sequenceNumber = 0;
   }
 
+  public UnionWork createUnionWork(GenTezProcContext context, Operator<?> operator, TezWork tezWork) {
+    UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
+    context.unionWorkMap.put(operator, unionWork);
+    tezWork.add(unionWork);
+    return unionWork;
+  }
+
   public ReduceWork createReduceWork(GenTezProcContext context, Operator<?> root, TezWork tezWork) {
     assert !root.getParentOperators().isEmpty();
     ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
@@ -128,4 +153,111 @@ public class GenTezUtils {
     GenMapRedUtils.setMapWork(mapWork, context.parseContext,
         context.inputs, partitions, root, alias, context.conf, false);
   }
+
+  // removes any union operator and clones the plan
+  public void removeUnionOperators(Configuration conf, GenTezProcContext context, 
+      BaseWork work) 
+    throws SemanticException {
+
+    Set<Operator<?>> roots = work.getAllRootOperators();
+    
+    // need to clone the plan.
+    Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+
+    Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+
+    Iterator<Operator<?>> it = newRoots.iterator();
+    for (Operator<?> orig: roots) {
+      replacementMap.put(orig,it.next());
+    }
+
+    // now we remove all the unions. we throw away any branch that's not reachable from
+    // the current set of roots. The reason is that those branches will be handled in
+    // different tasks.
+    Deque<Operator<?>> operators = new LinkedList<Operator<?>>();
+    operators.addAll(newRoots);
+
+    Set<Operator<?>> seen = new HashSet<Operator<?>>();
+
+    while(!operators.isEmpty()) {
+      Operator<?> current = operators.pop();
+      seen.add(current);
+
+      if (current instanceof FileSinkOperator) {
+        FileSinkOperator fileSink = (FileSinkOperator)current;
+        
+        // remember it for additional processing later
+        context.fileSinkSet.add(fileSink);
+
+        FileSinkDesc desc = fileSink.getConf();
+        Path path = desc.getDirName();
+        List<FileSinkDesc> linked;
+        
+        if (!context.linkedFileSinks.containsKey(path)) {
+          linked = new ArrayList<FileSinkDesc>();
+          context.linkedFileSinks.put(path, linked);
+        }
+        linked = context.linkedFileSinks.get(path);
+        linked.add(desc);
+
+        desc.setDirName(new Path(path, ""+linked.size()));
+        desc.setLinkedFileSinkDesc(linked);
+      }
+      
+      if (current instanceof UnionOperator) {
+        Operator<?> parent = null;
+        int count = 0;
+
+        for (Operator<?> op: current.getParentOperators()) {
+          if (seen.contains(op)) {
+            ++count;
+            parent = op;
+          }
+        }
+        
+        // we should have been able to reach the union from only one side.
+        assert count <= 1;
+
+        if (parent == null) {
+          // root operator is union (can happen in reducers)
+          replacementMap.put(current, current.getChildOperators().get(0));
+        } else {
+          parent.removeChildAndAdoptItsChildren(current);
+        }
+      }
+
+      if (current instanceof FileSinkOperator
+          || current instanceof ReduceSinkOperator) {
+        current.setChildOperators(null);
+      } else {
+        operators.addAll(current.getChildOperators());
+      }
+    }   
+    work.replaceRoots(replacementMap);
+  }
+
+  public void processFileSink(GenTezProcContext context, FileSinkOperator fileSink)
+      throws SemanticException {
+
+    ParseContext parseContext = context.parseContext;
+    
+    boolean isInsertTable = // is INSERT OVERWRITE TABLE
+        GenMapRedUtils.isInsertInto(parseContext, fileSink);
+    HiveConf hconf = parseContext.getConf();
+    
+    boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
+        hconf, fileSink, context.currentTask, isInsertTable);
+
+    Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+        chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
+
+    if (chDir) {
+      // Merge the files in the destination table/partitions by creating Map-only merge job
+      // If underlying data is RCFile a RCFileBlockMerge task would be created.
+      LOG.info("using CombineHiveInputformat for the merge job");
+      GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
+          context.dependencyTask, context.moveTask,
+          hconf, context.currentTask);
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Feb 17 07:58:47 2014
@@ -19,15 +19,21 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -37,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 
 /**
@@ -106,6 +113,41 @@ public class GenTezWork implements NodeP
       context.rootToWorkMap.put(root, work);
     }
 
+    // This is where we cut the tree as described above. We also remember that
+    // we might have to connect parent work with this work later.
+    for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
+      context.leafOperatorToFollowingWork.put(parent, work);
+      LOG.debug("Removing " + parent + " as parent from " + root);
+      root.removeParent(parent);
+    }
+
+    if (!context.currentUnionOperators.isEmpty()) {      
+      // if there are union all operators we need to add the work to the set
+      // of union operators.
+
+      UnionWork unionWork;
+      if (context.unionWorkMap.containsKey(operator)) {
+        // we've seen this terminal before and have created a union work object.
+        // just need to add this work to it. There will be no children of this one
+        // since we've passed this operator before.
+        assert operator.getChildOperators().isEmpty();
+        unionWork = (UnionWork) context.unionWorkMap.get(operator);
+
+      } else {
+        // first time through. we need to create a union work object and add this
+        // work to it. Subsequent work should reference the union and not the actual
+        // work.
+        unionWork = utils.createUnionWork(context, operator, tezWork);
+      }
+
+      // finally hook everything up
+      tezWork.connect(unionWork, work, EdgeType.CONTAINS);
+      unionWork.addUnionOperators(context.currentUnionOperators);
+      context.currentUnionOperators.clear();
+      context.workWithUnionOperators.add(work);
+      work = unionWork;
+    }
+
     // We're scanning a tree from roots to leaf (this is not technically
     // correct, demux and mux operators might form a diamond shape, but
     // we will only scan one path and ignore the others, because the
@@ -134,16 +176,10 @@ public class GenTezWork implements NodeP
       // remember the output name of the reduce sink
       rs.getConf().setOutputName(rWork.getName());
 
-      // add dependency between the two work items
-      tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
-    }
-
-    // This is where we cut the tree as described above. We also remember that
-    // we might have to connect parent work with this work later.
-    for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
-      context.leafOperatorToFollowingWork.put(parent, work);
-      LOG.debug("Removing " + parent + " as parent from " + root);
-      root.removeParent(parent);
+      if (!context.unionWorkMap.containsKey(operator)) {
+        // add dependency between the two work items
+        tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
+      }
     }
 
     // No children means we're at the bottom. If there are more operators to scan
@@ -182,7 +218,7 @@ public class GenTezWork implements NodeP
       for (BaseWork parentWork : linkWorkList) {
         tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
 
-        // need to set up output name for reduce sink not that we know the name
+        // need to set up output name for reduce sink now that we know the name
         // of the downstream work
         for (ReduceSinkOperator r:
                context.linkWorkWithReduceSinkMap.get(parentWork)) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Feb 17 07:58:47 2014
@@ -31,6 +31,7 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
 /**
  * TezCompiler translates the operator plan into TezTasks.
@@ -75,6 +78,22 @@ public class TezCompiler extends TaskCom
   }
 
   @Override
+  public void init(HiveConf conf, LogHelper console, Hive db) {
+    super.init(conf, console, db);
+    
+    // Tez requires us to use RPC for the query plan
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
+
+    // We require the use of recursive input dirs for union processing
+    conf.setBoolean("mapred.input.dir.recursive", true);
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
+
+    // Don't auto-merge files in tez
+    HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPFILES, false);
+    HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPREDFILES, false);
+  }
+
+  @Override
   protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
       Set<WriteEntity> outputs) throws SemanticException {
 
@@ -138,14 +157,18 @@ public class TezCompiler extends TaskCom
         TableScanOperator.getOperatorName() + "%"),
         new ProcessAnalyzeTable(GenTezUtils.getUtils()));
 
-    opRules.put(new RuleRegExp("Bail on Union",
+    opRules.put(new RuleRegExp("Handle union",
         UnionOperator.getOperatorName() + "%"), new NodeProcessor()
     {
       @Override
       public Object process(Node n, Stack<Node> s,
           NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-        throw new SemanticException("Unions not yet supported on Tez."
-            +" Please use MR for this query");
+        GenTezProcContext context = (GenTezProcContext) procCtx;
+        UnionOperator union = (UnionOperator) n;
+
+        // simply need to remember that we've seen a union.
+        context.currentUnionOperators.add(union);
+        return null;
       }
     });
 
@@ -156,20 +179,31 @@ public class TezCompiler extends TaskCom
     topNodes.addAll(pCtx.getTopOps().values());
     GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
+
+    // we need to clone some operator plans and remove union operators still
+    for (BaseWork w: procCtx.workWithUnionOperators) {
+      GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+    }
+
+    // finally make sure the file sink operators are set up right
+    for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
+      GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
+    }
   }
 
   @Override
   protected void setInputFormat(Task<? extends Serializable> task) {
     if (task instanceof TezTask) {
       TezWork work = ((TezTask)task).getWork();
-      Set<BaseWork> roots = work.getRoots();
-      for (BaseWork w: roots) {
-        assert w instanceof MapWork;
-        MapWork mapWork = (MapWork)w;
-        HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork();
-        if (!opMap.isEmpty()) {
-          for (Operator<? extends OperatorDesc> op : opMap.values()) {
-            setInputFormat(mapWork, op);
+      List<BaseWork> all = work.getAllWork();
+      for (BaseWork w: all) {
+        if (w instanceof MapWork) {
+          MapWork mapWork = (MapWork) w;
+          HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork();
+          if (!opMap.isEmpty()) {
+            for (Operator<? extends OperatorDesc> op : opMap.values()) {
+              setInputFormat(mapWork, op);
+            }
           }
         }
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Mon Feb 17 07:58:47 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
@@ -82,7 +83,9 @@ public abstract class BaseWork extends A
     dummyOps.add(dummyOp);
   }
 
-  protected abstract Set<Operator<?>> getAllRootOperators();
+  public abstract void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap);
+
+  public abstract Set<Operator<?>> getAllRootOperators();
 
   public Set<Operator<?>> getAllOperators() {
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Mon Feb 17 07:58:47 2014
@@ -305,6 +305,17 @@ public class MapWork extends BaseWork {
   }
 
   @Override
+  public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
+    LinkedHashMap<String, Operator<?>> newAliasToWork = new LinkedHashMap<String, Operator<?>>();
+
+    for (Map.Entry<String, Operator<?>> entry: aliasToWork.entrySet()) {
+      newAliasToWork.put(entry.getKey(), replacementMap.get(entry.getValue()));
+    }
+
+    setAliasToWork(newAliasToWork);
+  }
+
+  @Override
   @Explain(displayName = "Map Operator Tree")
   public Set<Operator<?>> getAllRootOperators() {
     Set<Operator<?>> opSet = new LinkedHashSet<Operator<?>>();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Mon Feb 17 07:58:47 2014
@@ -129,7 +129,13 @@ public class ReduceWork extends BaseWork
   }
 
   @Override
-  protected Set<Operator<?>> getAllRootOperators() {
+  public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
+    assert replacementMap.size() == 1;
+    setReducer(replacementMap.get(getReducer()));
+  }
+
+  @Override
+  public Set<Operator<?>> getAllRootOperators() {
     Set<Operator<?>> opSet = new LinkedHashSet<Operator<?>>();
     opSet.add(getReducer());
     return opSet;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Mon Feb 17 07:58:47 2014
@@ -45,7 +45,8 @@ public class TezWork extends AbstractOpe
 
   public enum EdgeType {
     SIMPLE_EDGE,
-    BROADCAST_EDGE
+    BROADCAST_EDGE,
+    CONTAINS
   }
 
   private static transient final Log LOG = LogFactory.getLog(TezWork.class);

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java?rev=1568894&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java Mon Feb 17 07:58:47 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+
+/**
+ * Simple wrapper for union all cases. All contributing work for a union all
+ * is collected here. Downstream work will connect to the union not the individual
+ * work.
+ */
+public class UnionWork extends BaseWork {
+  
+  private final Set<UnionOperator> unionOperators = new HashSet<UnionOperator>();
+
+  public UnionWork() {
+    super();
+  }
+  
+  public UnionWork(String name) {
+    super(name);
+  }
+
+  @Explain(displayName = "Vertex")
+  @Override
+  public String getName() {
+    return super.getName();
+  }
+
+  @Override
+  public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
+  }
+
+  @Override
+  public Set<Operator<?>> getAllRootOperators() {
+    return new HashSet<Operator<?>>();
+  }
+
+  public void addUnionOperators(Collection<UnionOperator> unions) {
+    unionOperators.addAll(unions);
+  }
+
+  public Set<UnionOperator> getUnionOperators() {
+    return unionOperators;
+  }
+}

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out Mon Feb 17 07:58:47 2014
@@ -13,14 +13,9 @@ INSERT OVERWRITE TABLE dest_j1 SELECT sr
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+  Stage-2 depends on stages: Stage-1
   Stage-0 depends on stages: Stage-2
   Stage-3 depends on stages: Stage-0
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
 
 STAGE PLANS:
   Stage: Stage-1
@@ -68,15 +63,6 @@ STAGE PLANS:
                             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                             name: default.dest_j1
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
@@ -93,40 +79,6 @@ STAGE PLANS:
   Stage: Stage-3
     Stats-Aggr Operator
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest_j1
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest_j1
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key)
 INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value
 PREHOOK: type: QUERY

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out Mon Feb 17 07:58:47 2014
@@ -21,15 +21,10 @@ POSTHOOK: query: explain create table nz
 POSTHOOK: type: CREATETABLE_AS_SELECT
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-9 depends on stages: Stage-2, Stage-0
-  Stage-3 depends on stages: Stage-9
-  Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-2, Stage-0
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-1
@@ -80,19 +75,10 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.nzhang_CTAS1
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
-  Stage: Stage-9
+  Stage: Stage-4
       Create Table Operator:
         Create Table
           columns: k string, value string
@@ -109,40 +95,6 @@ STAGE PLANS:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_CTAS1
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_CTAS1
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: create table nzhang_CTAS1 as select key k, value from src sort by k, value limit 10
 PREHOOK: type: CREATETABLE_AS_SELECT
 PREHOOK: Input: default@src
@@ -208,15 +160,10 @@ POSTHOOK: query: explain create table nz
 POSTHOOK: type: CREATETABLE_AS_SELECT
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-9 depends on stages: Stage-2, Stage-0
-  Stage-3 depends on stages: Stage-9
-  Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-2, Stage-0
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-1
@@ -267,19 +214,10 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.nzhang_ctas2
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
-  Stage: Stage-9
+  Stage: Stage-4
       Create Table Operator:
         Create Table
           columns: key string, value string
@@ -296,40 +234,6 @@ STAGE PLANS:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_ctas2
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_ctas2
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: create table nzhang_ctas2 as select * from src sort by key, value limit 10
 PREHOOK: type: CREATETABLE_AS_SELECT
 PREHOOK: Input: default@src
@@ -395,15 +299,10 @@ POSTHOOK: query: explain create table nz
 POSTHOOK: type: CREATETABLE_AS_SELECT
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-9 depends on stages: Stage-2, Stage-0
-  Stage-3 depends on stages: Stage-9
-  Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-2, Stage-0
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-1
@@ -454,19 +353,10 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
                         name: default.nzhang_ctas3
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
-  Stage: Stage-9
+  Stage: Stage-4
       Create Table Operator:
         Create Table
           columns: half_key double, conb string
@@ -484,18 +374,6 @@ STAGE PLANS:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-4
-    Block level merge
-
-  Stage: Stage-6
-    Block level merge
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: create table nzhang_ctas3 row format serde "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" stored as RCFile as select key/2 half_key, concat(value, "_con") conb  from src sort by half_key, conb limit 10
 PREHOOK: type: CREATETABLE_AS_SELECT
 PREHOOK: Input: default@src
@@ -624,15 +502,10 @@ POSTHOOK: query: explain create table nz
 POSTHOOK: type: CREATETABLE_AS_SELECT
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-9 depends on stages: Stage-2, Stage-0
-  Stage-3 depends on stages: Stage-9
-  Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-2, Stage-0
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-1
@@ -683,19 +556,10 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.nzhang_ctas4
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
-  Stage: Stage-9
+  Stage: Stage-4
       Create Table Operator:
         Create Table
           columns: key string, value string
@@ -713,40 +577,6 @@ STAGE PLANS:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_ctas4
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_ctas4
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: create table nzhang_ctas4 row format delimited fields terminated by ',' stored as textfile as select key, value from src sort by key, value limit 10
 PREHOOK: type: CREATETABLE_AS_SELECT
 PREHOOK: Input: default@src
@@ -853,15 +683,10 @@ TOK_CREATETABLE
 
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-9 depends on stages: Stage-2, Stage-0
-  Stage-3 depends on stages: Stage-9
-  Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-2, Stage-0
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-1
@@ -979,19 +804,10 @@ STAGE PLANS:
                     GatherStats: true
                     MultiFileSpray: false
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
-  Stage: Stage-9
+  Stage: Stage-4
       Create Table Operator:
         Create Table
           columns: key string, value string
@@ -1012,140 +828,6 @@ STAGE PLANS:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  GatherStats: false
-                  File Output Operator
-                    compressed: false
-                    GlobalTableId: 0
-#### A masked pattern was here ####
-                    NumFilesPerFileSink: 1
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        properties:
-                          columns _col0,_col1
-                          columns.types string:string
-                          field.delim ,
-                          line.delim 
-
-                          name default.nzhang_ctas5
-                          serialization.format ,
-                          serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_ctas5
-                    TotalFiles: 1
-                    GatherStats: false
-                    MultiFileSpray: false
-            Path -> Alias:
-#### A masked pattern was here ####
-            Path -> Partition:
-#### A masked pattern was here ####
-                Partition
-                  input format: org.apache.hadoop.mapred.TextInputFormat
-                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                  properties:
-                    columns _col0,_col1
-                    columns.types string:string
-                    field.delim ,
-                    line.delim 
-
-                    name default.nzhang_ctas5
-                    serialization.format ,
-                    serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                
-                    input format: org.apache.hadoop.mapred.TextInputFormat
-                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                    properties:
-                      columns _col0,_col1
-                      columns.types string:string
-                      field.delim ,
-                      line.delim 
-
-                      name default.nzhang_ctas5
-                      serialization.format ,
-                      serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                    name: default.nzhang_ctas5
-                  name: default.nzhang_ctas5
-            Truncated Path -> Alias:
-#### A masked pattern was here ####
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  GatherStats: false
-                  File Output Operator
-                    compressed: false
-                    GlobalTableId: 0
-#### A masked pattern was here ####
-                    NumFilesPerFileSink: 1
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        properties:
-                          columns _col0,_col1
-                          columns.types string:string
-                          field.delim ,
-                          line.delim 
-
-                          name default.nzhang_ctas5
-                          serialization.format ,
-                          serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.nzhang_ctas5
-                    TotalFiles: 1
-                    GatherStats: false
-                    MultiFileSpray: false
-            Path -> Alias:
-#### A masked pattern was here ####
-            Path -> Partition:
-#### A masked pattern was here ####
-                Partition
-                  input format: org.apache.hadoop.mapred.TextInputFormat
-                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                  properties:
-                    columns _col0,_col1
-                    columns.types string:string
-                    field.delim ,
-                    line.delim 
-
-                    name default.nzhang_ctas5
-                    serialization.format ,
-                    serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                
-                    input format: org.apache.hadoop.mapred.TextInputFormat
-                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                    properties:
-                      columns _col0,_col1
-                      columns.types string:string
-                      field.delim ,
-                      line.delim 
-
-                      name default.nzhang_ctas5
-                      serialization.format ,
-                      serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                    name: default.nzhang_ctas5
-                  name: default.nzhang_ctas5
-            Truncated Path -> Alias:
-#### A masked pattern was here ####
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: create table nzhang_ctas5 row format delimited fields terminated by ',' lines terminated by '\012' stored as textfile as select key, value from src sort by key, value limit 10
 PREHOOK: type: CREATETABLE_AS_SELECT
 PREHOOK: Input: default@src

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out Mon Feb 17 07:58:47 2014
@@ -11,14 +11,9 @@ FROM src INSERT OVERWRITE TABLE dest_g1 
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+  Stage-2 depends on stages: Stage-1
   Stage-0 depends on stages: Stage-2
   Stage-3 depends on stages: Stage-0
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
 
 STAGE PLANS:
   Stage: Stage-1
@@ -77,15 +72,6 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest_g1
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
@@ -102,40 +88,6 @@ STAGE PLANS:
   Stage: Stage-3
     Stats-Aggr Operator
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest_g1
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest_g1
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest_g1 SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out Mon Feb 17 07:58:47 2014
@@ -13,14 +13,9 @@ INSERT OVERWRITE TABLE dest_g2 SELECT su
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+  Stage-2 depends on stages: Stage-1
   Stage-0 depends on stages: Stage-2
   Stage-3 depends on stages: Stage-0
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
 
 STAGE PLANS:
   Stage: Stage-1
@@ -63,15 +58,6 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest_g2
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
@@ -88,40 +74,6 @@ STAGE PLANS:
   Stage: Stage-3
     Stats-Aggr Operator
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest_g2
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest_g2
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: FROM src
 INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))) GROUP BY substr(src.key,1,1)
 PREHOOK: type: QUERY

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out Mon Feb 17 07:58:47 2014
@@ -31,14 +31,9 @@ INSERT OVERWRITE TABLE dest1 SELECT 
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+  Stage-2 depends on stages: Stage-1
   Stage-0 depends on stages: Stage-2
   Stage-3 depends on stages: Stage-0
-  Stage-4
-  Stage-6
-  Stage-7 depends on stages: Stage-6
 
 STAGE PLANS:
   Stage: Stage-1
@@ -92,15 +87,6 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
 
-  Stage: Stage-8
-    Conditional Operator
-
-  Stage: Stage-5
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
   Stage: Stage-2
     Dependency Collection
 
@@ -117,40 +103,6 @@ STAGE PLANS:
   Stage: Stage-3
     Stats-Aggr Operator
 
-  Stage: Stage-4
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest1
-
-  Stage: Stage-6
-    Tez
-      Vertices:
-        Merge 
-            Map Operator Tree:
-                TableScan
-                  File Output Operator
-                    compressed: false
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest1
-
-  Stage: Stage-7
-    Move Operator
-      files:
-          hdfs directory: true
-#### A masked pattern was here ####
-
 PREHOOK: query: FROM src
 INSERT OVERWRITE TABLE dest1 SELECT 
   sum(substr(src.value,5)),