You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/02/17 08:58:48 UTC
svn commit: r1568894 [1/3] - in /hive/branches/tez: itests/qtest/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/
ql/src/java/org/apache/hadoop/hive/ql/pa...
Author: gunther
Date: Mon Feb 17 07:58:47 2014
New Revision: 1568894
URL: http://svn.apache.org/r1568894
Log:
HIVE-6362: Support union all on tez (Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
hive/branches/tez/ql/src/test/results/clientpositive/tez/union2.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union3.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union4.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union5.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union6.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union7.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union8.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/union9.q.out
Modified:
hive/branches/tez/itests/qtest/pom.xml
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/insert1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/insert_into1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/insert_into2.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/join1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/load_dyn_part1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/mapreduce1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/mapreduce2.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/merge1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/merge2.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/sample1.q.out
hive/branches/tez/ql/src/test/results/clientpositive/tez/tez_dml.q.out
Modified: hive/branches/tez/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/pom.xml?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/pom.xml (original)
+++ hive/branches/tez/itests/qtest/pom.xml Mon Feb 17 07:58:47 2014
@@ -39,7 +39,7 @@
<minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q</minimr.query.files>
<minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q</minimr.query.negative.files>
<minitez.query.files>tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q</minitez.query.files>
- <minitez.query.files.shared>join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q</minitez.query.files.shared>
+ <minitez.query.files.shared>join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
<beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>
</properties>
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Feb 17 07:58:47 2014
@@ -777,6 +777,14 @@ public final class Utilities {
}
}
+ public static Set<Operator<?>> cloneOperatorTree(Configuration conf, Set<Operator<?>> roots) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializePlan(roots, baos, conf, true);
+ Set<Operator<?>> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ roots.getClass(), conf, true);
+ return result;
+ }
+
private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Feb 17 07:58:47 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -82,6 +83,7 @@ import org.apache.tez.dag.api.EdgeProper
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -90,6 +92,7 @@ import org.apache.tez.dag.api.VertexLoca
import org.apache.tez.dag.api.TezException;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -98,6 +101,8 @@ import org.apache.tez.mapreduce.hadoop.M
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -189,9 +194,56 @@ public class DagUtils {
}
/**
+ * Given a Vertex group and a vertex createEdge will create an
+ * Edge between them.
+ *
+ * @param group The parent VertexGroup
+ * @param wConf The job conf of the child vertex
+ * @param w The child vertex
+ * @param edgeType the type of connection between the two
+ * endpoints.
+ */
+ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+ Vertex w, EdgeType edgeType)
+ throws IOException {
+
+ Class mergeInputClass;
+
+ LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
+ w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+
+ switch (edgeType) {
+ case BROADCAST_EDGE:
+ mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+ break;
+
+ case SIMPLE_EDGE:
+ default:
+ mergeInputClass = TezMergedLogicalInput.class;
+ break;
+ }
+
+ return new GroupInputEdge(group, w, createEdgeProperty(edgeType),
+ new InputDescriptor(mergeInputClass.getName()));
+ }
+
+ /**
+ * Given two vertices a, b update their configurations to be used in an Edge a-b
+ */
+ public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
+ throws IOException {
+
+ // Tez needs to setup output subsequent input pairs correctly
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+
+ // update payloads (configuration for the vertices might have changed)
+ v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
+ w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ }
+
+ /**
* Given two vertices and their respective configuration objects createEdge
- * will create an Edge object that connects the two. Currently the edge will
- * always be a stable bi-partite edge.
+ * will create an Edge object that connects the two.
*
* @param vConf JobConf of the first vertex
* @param v The first vertex (source)
@@ -203,13 +255,15 @@ public class DagUtils {
EdgeType edgeType)
throws IOException {
- // Tez needs to setup output subsequent input pairs correctly
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+ updateConfigurationForEdge(vConf, v, wConf, w);
- // update payloads (configuration for the vertices might have changed)
- v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
- w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ return new Edge(v, w, createEdgeProperty(edgeType));
+ }
+ /*
+ * Helper function to create an edge property from an edge type.
+ */
+ private EdgeProperty createEdgeProperty(EdgeType edgeType) {
DataMovementType dataMovementType;
Class logicalInputClass;
Class logicalOutputClass;
@@ -235,7 +289,8 @@ public class DagUtils {
SchedulingType.SEQUENTIAL,
new OutputDescriptor(logicalOutputClass.getName()),
new InputDescriptor(logicalInputClass.getName()));
- return new Edge(v, w, edgeProperty);
+
+ return edgeProperty;
}
/*
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Feb 17 07:58:47 2014
@@ -55,7 +55,6 @@ import org.apache.hadoop.util.StringUtil
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
/**
* Process input from tez LogicalInput and write output - for a map plan
@@ -184,15 +183,19 @@ public class ReduceRecordProcessor exte
@Override
void run() throws IOException{
- List<ShuffledMergedInput> shuffleInputs = getShuffleInputs(inputs);
+ List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
KeyValuesReader kvsReader;
- if(shuffleInputs.size() == 1){
- //no merging of inputs required
- kvsReader = shuffleInputs.get(0).getReader();
- }else {
- //get a sort merged input
- kvsReader = new InputMerger(shuffleInputs);
+ try {
+ if(shuffleInputs.size() == 1){
+ //no merging of inputs required
+ kvsReader = (KeyValuesReader) shuffleInputs.get(0).getReader();
+ }else {
+ //get a sort merged input
+ kvsReader = new InputMerger(shuffleInputs);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
}
while(kvsReader.next()){
@@ -211,12 +214,12 @@ public class ReduceRecordProcessor exte
* @param inputs
* @return
*/
- private List<ShuffledMergedInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
+ private List<LogicalInput> getShuffleInputs(Map<String, LogicalInput> inputs) {
//the reduce plan inputs have tags, add all inputs that have tags
Map<Integer, String> tag2input = redWork.getTagToInput();
- ArrayList<ShuffledMergedInput> shuffleInputs = new ArrayList<ShuffledMergedInput>();
+ ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
for(String inpStr : tag2input.values()){
- shuffleInputs.add((ShuffledMergedInput)inputs.get(inpStr));
+ shuffleInputs.add((LogicalInput)inputs.get(inpStr));
}
return shuffleInputs;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Feb 17 07:58:47 2014
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.log.Per
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -54,9 +56,11 @@ import org.apache.tez.common.counters.Te
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -97,9 +101,6 @@ public class TezTask extends Task<TezWor
DAGClient client = null;
TezSessionState session = null;
- // Tez requires us to use RPC for the query plan
- HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
-
try {
// Get or create Context object. If we create it we have to clean
// it later as well.
@@ -216,24 +217,68 @@ public class TezTask extends Task<TezWor
// translate work to vertex
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
- JobConf wxConf = utils.initializeVertexConf(conf, w);
- Vertex wx = utils.createVertex(wxConf, w, tezDir,
- appJarLr, additionalLr, fs, ctx, !isFinal);
- dag.addVertex(wx);
- utils.addCredentials(w, dag);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
- workToVertex.put(w, wx);
- workToConf.put(w, wxConf);
-
- // add all dependencies (i.e.: edges) to the graph
- for (BaseWork v: work.getChildren(w)) {
- assert workToVertex.containsKey(v);
- Edge e = null;
- EdgeType edgeType = work.getEdgeProperty(w, v);
+ if (w instanceof UnionWork) {
+ // Special case for unions. These items translate to VertexGroups
+
+ List<BaseWork> unionWorkItems = new LinkedList<BaseWork>();
+ List<BaseWork> children = new LinkedList<BaseWork>();
+
+ // split the children into vertices that make up the union and vertices that are
+ // proper children of the union
+ for (BaseWork v: work.getChildren(w)) {
+ EdgeType type = work.getEdgeProperty(w, v);
+ if (type == EdgeType.CONTAINS) {
+ unionWorkItems.add(v);
+ } else {
+ children.add(v);
+ }
+ }
+
+ // create VertexGroup
+ Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
- e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
- dag.addEdge(e);
+ int i = 0;
+ for (BaseWork v: unionWorkItems) {
+ vertexArray[i++] = workToVertex.get(v);
+ }
+ VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+
+ // now hook up the children
+ for (BaseWork v: children) {
+ // need to pairwise patch up the configuration of the vertices
+ for (BaseWork part: unionWorkItems) {
+ utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
+ workToConf.get(v), workToVertex.get(v));
+ }
+
+ // finally we can create the grouped edge
+ GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
+ workToVertex.get(v), work.getEdgeProperty(w, v));
+
+ dag.addEdge(e);
+ }
+ } else {
+ // Regular vertices
+ JobConf wxConf = utils.initializeVertexConf(conf, w);
+ Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr,
+ additionalLr, fs, ctx, !isFinal);
+ dag.addVertex(wx);
+ utils.addCredentials(w, dag);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
+ workToVertex.put(w, wx);
+ workToConf.put(w, wxConf);
+
+ // add all dependencies (i.e.: edges) to the graph
+ for (BaseWork v: work.getChildren(w)) {
+ assert workToVertex.containsKey(v);
+ Edge e = null;
+
+ EdgeType edgeType = work.getEdgeProperty(w, v);
+
+ e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
+ dag.addEdge(e);
+ }
}
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java Mon Feb 17 07:58:47 2014
@@ -26,12 +26,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor;
import org.apache.hadoop.io.BinaryComparable;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
/**
* A KeyValuesReader implementation that returns a sorted stream of key-values
- * by doing a sorted merge of the key-value in ShuffledMergedInputs.
+ * by doing a sorted merge of the key-value in LogicalInputs.
* Tags are in the last byte of the key, so no special handling for tags is required.
* Uses a priority queue to pick the KeyValuesReader of the input that is next in
* sort order.
@@ -42,12 +43,12 @@ public class InputMerger implements KeyV
private PriorityQueue<KeyValuesReader> pQueue = null;
private KeyValuesReader nextKVReader = null;
- public InputMerger(List<ShuffledMergedInput> shuffleInputs) throws IOException {
- //get KeyValuesReaders from the ShuffledMergedInput and add them to priority queue
+ public InputMerger(List<? extends Input> shuffleInputs) throws Exception {
+ //get KeyValuesReaders from the LogicalInput and add them to priority queue
int initialCapacity = shuffleInputs.size();
pQueue = new PriorityQueue<KeyValuesReader>(initialCapacity, new KVReaderComparator());
- for(ShuffledMergedInput input : shuffleInputs){
- addToQueue(input.getReader());
+ for(Input input : shuffleInputs){
+ addToQueue((KeyValuesReader)input.getReader());
}
}
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1568894&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Mon Feb 17 07:58:47 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.tools;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * TezMergedLogicalInput is an adapter to make union input look like
+ * a single input in tez.
+ */
+public class TezMergedLogicalInput extends MergedLogicalInput {
+
+ @Override
+ public Reader getReader() throws Exception {
+ return new InputMerger(getInputs());
+ }
+}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Mon Feb 17 07:58:47 2014
@@ -51,28 +51,9 @@ public class FileSinkProcessor implement
GenTezProcContext context = (GenTezProcContext) procCtx;
FileSinkOperator fileSink = (FileSinkOperator) nd;
- ParseContext parseContext = context.parseContext;
-
-
- boolean isInsertTable = // is INSERT OVERWRITE TABLE
- GenMapRedUtils.isInsertInto(parseContext, fileSink);
- HiveConf hconf = parseContext.getConf();
-
- boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
- hconf, fileSink, context.currentTask, isInsertTable);
-
- Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
- chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
-
- if (chDir) {
- // Merge the files in the destination table/partitions by creating Map-only merge job
- // If underlying data is RCFile a RCFileBlockMerge task would be created.
- LOG.info("using CombineHiveInputformat for the merge job");
- GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
- context.dependencyTask, context.moveTask,
- hconf, context.currentTask);
- }
-
+
+ // just remember it for later processing
+ context.fileSinkSet.add(fileSink);
return true;
}
}
\ No newline at end of file
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Mon Feb 17 07:58:47 2014
@@ -20,14 +20,19 @@ package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.Ba
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -105,6 +111,15 @@ public class GenTezProcContext implement
// used to group dependent tasks for multi table inserts
public final DependencyCollectionTask dependencyTask;
+ // used to hook up unions
+ public final Map<Operator<?>, BaseWork> unionWorkMap;
+ public final List<UnionOperator> currentUnionOperators;
+ public final Set<BaseWork> workWithUnionOperators;
+
+ // we link filesink that will write to the same final location
+ public final Map<Path, List<FileSinkDesc>> linkedFileSinks;
+ public final Set<FileSinkOperator> fileSinkSet;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -126,6 +141,11 @@ public class GenTezProcContext implement
this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
this.dependencyTask = (DependencyCollectionTask)
TaskFactory.get(new DependencyCollectionWork(), conf);
+ this.unionWorkMap = new HashMap<Operator<?>, BaseWork>();
+ this.currentUnionOperators = new LinkedList<UnionOperator>();
+ this.workWithUnionOperators = new HashSet<BaseWork>();
+ this.linkedFileSinks = new HashMap<Path, List<FileSinkDesc>>();
+ this.fileSinkSet = new HashSet<FileSinkOperator>();
rootTasks.add(currentTask);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Mon Feb 17 07:58:47 2014
@@ -18,17 +18,35 @@
package org.apache.hadoop.hive.ql.parse;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
/**
@@ -59,6 +77,13 @@ public class GenTezUtils {
sequenceNumber = 0;
}
+ public UnionWork createUnionWork(GenTezProcContext context, Operator<?> operator, TezWork tezWork) {
+ UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
+ context.unionWorkMap.put(operator, unionWork);
+ tezWork.add(unionWork);
+ return unionWork;
+ }
+
public ReduceWork createReduceWork(GenTezProcContext context, Operator<?> root, TezWork tezWork) {
assert !root.getParentOperators().isEmpty();
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
@@ -128,4 +153,111 @@ public class GenTezUtils {
GenMapRedUtils.setMapWork(mapWork, context.parseContext,
context.inputs, partitions, root, alias, context.conf, false);
}
+
+ // removes any union operator and clones the plan
+ public void removeUnionOperators(Configuration conf, GenTezProcContext context,
+ BaseWork work)
+ throws SemanticException {
+
+ Set<Operator<?>> roots = work.getAllRootOperators();
+
+ // need to clone the plan.
+ Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+
+ Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+
+ Iterator<Operator<?>> it = newRoots.iterator();
+ for (Operator<?> orig: roots) {
+ replacementMap.put(orig,it.next());
+ }
+
+ // now we remove all the unions. we throw away any branch that's not reachable from
+ // the current set of roots. The reason is that those branches will be handled in
+ // different tasks.
+ Deque<Operator<?>> operators = new LinkedList<Operator<?>>();
+ operators.addAll(newRoots);
+
+ Set<Operator<?>> seen = new HashSet<Operator<?>>();
+
+ while(!operators.isEmpty()) {
+ Operator<?> current = operators.pop();
+ seen.add(current);
+
+ if (current instanceof FileSinkOperator) {
+ FileSinkOperator fileSink = (FileSinkOperator)current;
+
+ // remember it for additional processing later
+ context.fileSinkSet.add(fileSink);
+
+ FileSinkDesc desc = fileSink.getConf();
+ Path path = desc.getDirName();
+ List<FileSinkDesc> linked;
+
+ if (!context.linkedFileSinks.containsKey(path)) {
+ linked = new ArrayList<FileSinkDesc>();
+ context.linkedFileSinks.put(path, linked);
+ }
+ linked = context.linkedFileSinks.get(path);
+ linked.add(desc);
+
+ desc.setDirName(new Path(path, ""+linked.size()));
+ desc.setLinkedFileSinkDesc(linked);
+ }
+
+ if (current instanceof UnionOperator) {
+ Operator<?> parent = null;
+ int count = 0;
+
+ for (Operator<?> op: current.getParentOperators()) {
+ if (seen.contains(op)) {
+ ++count;
+ parent = op;
+ }
+ }
+
+ // we should have been able to reach the union from only one side.
+ assert count <= 1;
+
+ if (parent == null) {
+ // root operator is union (can happen in reducers)
+ replacementMap.put(current, current.getChildOperators().get(0));
+ } else {
+ parent.removeChildAndAdoptItsChildren(current);
+ }
+ }
+
+ if (current instanceof FileSinkOperator
+ || current instanceof ReduceSinkOperator) {
+ current.setChildOperators(null);
+ } else {
+ operators.addAll(current.getChildOperators());
+ }
+ }
+ work.replaceRoots(replacementMap);
+ }
+
+ public void processFileSink(GenTezProcContext context, FileSinkOperator fileSink)
+ throws SemanticException {
+
+ ParseContext parseContext = context.parseContext;
+
+ boolean isInsertTable = // is INSERT OVERWRITE TABLE
+ GenMapRedUtils.isInsertInto(parseContext, fileSink);
+ HiveConf hconf = parseContext.getConf();
+
+ boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
+ hconf, fileSink, context.currentTask, isInsertTable);
+
+ Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+ chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
+
+ if (chDir) {
+ // Merge the files in the destination table/partitions by creating Map-only merge job
+ // If underlying data is RCFile a RCFileBlockMerge task would be created.
+ LOG.info("using CombineHiveInputformat for the merge job");
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
+ context.dependencyTask, context.moveTask,
+ hconf, context.currentTask);
+ }
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Feb 17 07:58:47 2014
@@ -19,15 +19,21 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -37,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
/**
@@ -106,6 +113,41 @@ public class GenTezWork implements NodeP
context.rootToWorkMap.put(root, work);
}
+ // This is where we cut the tree as described above. We also remember that
+ // we might have to connect parent work with this work later.
+ for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
+ context.leafOperatorToFollowingWork.put(parent, work);
+ LOG.debug("Removing " + parent + " as parent from " + root);
+ root.removeParent(parent);
+ }
+
+ if (!context.currentUnionOperators.isEmpty()) {
+ // if there are union all operators we need to add the work to the set
+ // of union operators.
+
+ UnionWork unionWork;
+ if (context.unionWorkMap.containsKey(operator)) {
+ // we've seen this terminal before and have created a union work object.
+ // just need to add this work to it. There will be no children of this one
+ // since we've passed this operator before.
+ assert operator.getChildOperators().isEmpty();
+ unionWork = (UnionWork) context.unionWorkMap.get(operator);
+
+ } else {
+ // first time through. we need to create a union work object and add this
+ // work to it. Subsequent work should reference the union and not the actual
+ // work.
+ unionWork = utils.createUnionWork(context, operator, tezWork);
+ }
+
+ // finally hook everything up
+ tezWork.connect(unionWork, work, EdgeType.CONTAINS);
+ unionWork.addUnionOperators(context.currentUnionOperators);
+ context.currentUnionOperators.clear();
+ context.workWithUnionOperators.add(work);
+ work = unionWork;
+ }
+
// We're scanning a tree from roots to leaf (this is not technically
// correct, demux and mux operators might form a diamond shape, but
// we will only scan one path and ignore the others, because the
@@ -134,16 +176,10 @@ public class GenTezWork implements NodeP
// remember the output name of the reduce sink
rs.getConf().setOutputName(rWork.getName());
- // add dependency between the two work items
- tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
- }
-
- // This is where we cut the tree as described above. We also remember that
- // we might have to connect parent work with this work later.
- for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
- context.leafOperatorToFollowingWork.put(parent, work);
- LOG.debug("Removing " + parent + " as parent from " + root);
- root.removeParent(parent);
+ if (!context.unionWorkMap.containsKey(operator)) {
+ // add dependency between the two work items
+ tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
+ }
}
// No children means we're at the bottom. If there are more operators to scan
@@ -182,7 +218,7 @@ public class GenTezWork implements NodeP
for (BaseWork parentWork : linkWorkList) {
tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
- // need to set up output name for reduce sink not that we know the name
+ // need to set up output name for reduce sink now that we know the name
// of the downstream work
for (ReduceSinkOperator r:
context.linkWorkWithReduceSinkMap.get(parentWork)) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Feb 17 07:58:47 2014
@@ -31,6 +31,7 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
/**
* TezCompiler translates the operator plan into TezTasks.
@@ -75,6 +78,22 @@ public class TezCompiler extends TaskCom
}
@Override
+ public void init(HiveConf conf, LogHelper console, Hive db) {
+ super.init(conf, console, db);
+
+ // Tez requires us to use RPC for the query plan
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
+
+ // We require the use of recursive input dirs for union processing
+ conf.setBoolean("mapred.input.dir.recursive", true);
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
+
+ // Don't auto-merge files in tez
+ HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPFILES, false);
+ HiveConf.setBoolVar(conf, ConfVars.HIVEMERGEMAPREDFILES, false);
+ }
+
+ @Override
protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
Set<WriteEntity> outputs) throws SemanticException {
@@ -138,14 +157,18 @@ public class TezCompiler extends TaskCom
TableScanOperator.getOperatorName() + "%"),
new ProcessAnalyzeTable(GenTezUtils.getUtils()));
- opRules.put(new RuleRegExp("Bail on Union",
+ opRules.put(new RuleRegExp("Handle union",
UnionOperator.getOperatorName() + "%"), new NodeProcessor()
{
@Override
public Object process(Node n, Stack<Node> s,
NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- throw new SemanticException("Unions not yet supported on Tez."
- +" Please use MR for this query");
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ UnionOperator union = (UnionOperator) n;
+
+ // simply need to remember that we've seen a union.
+ context.currentUnionOperators.add(union);
+ return null;
}
});
@@ -156,20 +179,31 @@ public class TezCompiler extends TaskCom
topNodes.addAll(pCtx.getTopOps().values());
GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
+
+ // we need to clone some operator plans and remove union operators still
+ for (BaseWork w: procCtx.workWithUnionOperators) {
+ GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
+ }
+
+ // finally make sure the file sink operators are set up right
+ for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
+ GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
+ }
}
@Override
protected void setInputFormat(Task<? extends Serializable> task) {
if (task instanceof TezTask) {
TezWork work = ((TezTask)task).getWork();
- Set<BaseWork> roots = work.getRoots();
- for (BaseWork w: roots) {
- assert w instanceof MapWork;
- MapWork mapWork = (MapWork)w;
- HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork();
- if (!opMap.isEmpty()) {
- for (Operator<? extends OperatorDesc> op : opMap.values()) {
- setInputFormat(mapWork, op);
+ List<BaseWork> all = work.getAllWork();
+ for (BaseWork w: all) {
+ if (w instanceof MapWork) {
+ MapWork mapWork = (MapWork) w;
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = mapWork.getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ setInputFormat(mapWork, op);
+ }
}
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Mon Feb 17 07:58:47 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.Stack;
@@ -82,7 +83,9 @@ public abstract class BaseWork extends A
dummyOps.add(dummyOp);
}
- protected abstract Set<Operator<?>> getAllRootOperators();
+ public abstract void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap);
+
+ public abstract Set<Operator<?>> getAllRootOperators();
public Set<Operator<?>> getAllOperators() {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Mon Feb 17 07:58:47 2014
@@ -305,6 +305,17 @@ public class MapWork extends BaseWork {
}
@Override
+ public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
+ LinkedHashMap<String, Operator<?>> newAliasToWork = new LinkedHashMap<String, Operator<?>>();
+
+ for (Map.Entry<String, Operator<?>> entry: aliasToWork.entrySet()) {
+ newAliasToWork.put(entry.getKey(), replacementMap.get(entry.getValue()));
+ }
+
+ setAliasToWork(newAliasToWork);
+ }
+
+ @Override
@Explain(displayName = "Map Operator Tree")
public Set<Operator<?>> getAllRootOperators() {
Set<Operator<?>> opSet = new LinkedHashSet<Operator<?>>();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Mon Feb 17 07:58:47 2014
@@ -129,7 +129,13 @@ public class ReduceWork extends BaseWork
}
@Override
- protected Set<Operator<?>> getAllRootOperators() {
+ public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
+ assert replacementMap.size() == 1;
+ setReducer(replacementMap.get(getReducer()));
+ }
+
+ @Override
+ public Set<Operator<?>> getAllRootOperators() {
Set<Operator<?>> opSet = new LinkedHashSet<Operator<?>>();
opSet.add(getReducer());
return opSet;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Mon Feb 17 07:58:47 2014
@@ -45,7 +45,8 @@ public class TezWork extends AbstractOpe
public enum EdgeType {
SIMPLE_EDGE,
- BROADCAST_EDGE
+ BROADCAST_EDGE,
+ CONTAINS
}
private static transient final Log LOG = LogFactory.getLog(TezWork.class);
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java?rev=1568894&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java Mon Feb 17 07:58:47 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+
+/**
+ * Simple wrapper for union all cases. All contributing work for a union all
+ * is collected here. Downstream work will connect to the union not the individual
+ * work.
+ */
+public class UnionWork extends BaseWork {
+
+ private final Set<UnionOperator> unionOperators = new HashSet<UnionOperator>();
+
+ public UnionWork() {
+ super();
+ }
+
+ public UnionWork(String name) {
+ super(name);
+ }
+
+ @Explain(displayName = "Vertex")
+ @Override
+ public String getName() {
+ return super.getName();
+ }
+
+ @Override
+ public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
+ }
+
+ @Override
+ public Set<Operator<?>> getAllRootOperators() {
+ return new HashSet<Operator<?>>();
+ }
+
+ public void addUnionOperators(Collection<UnionOperator> unions) {
+ unionOperators.addAll(unions);
+ }
+
+ public Set<UnionOperator> getUnionOperators() {
+ return unionOperators;
+ }
+}
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/auto_join1.q.out Mon Feb 17 07:58:47 2014
@@ -13,14 +13,9 @@ INSERT OVERWRITE TABLE dest_j1 SELECT sr
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+ Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
STAGE PLANS:
Stage: Stage-1
@@ -68,15 +63,6 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest_j1
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
@@ -93,40 +79,6 @@ STAGE PLANS:
Stage: Stage-3
Stats-Aggr Operator
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest_j1
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest_j1
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key)
INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value
PREHOOK: type: QUERY
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/ctas.q.out Mon Feb 17 07:58:47 2014
@@ -21,15 +21,10 @@ POSTHOOK: query: explain create table nz
POSTHOOK: type: CREATETABLE_AS_SELECT
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-9 depends on stages: Stage-2, Stage-0
- Stage-3 depends on stages: Stage-9
- Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-2, Stage-0
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
@@ -80,19 +75,10 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.nzhang_CTAS1
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
- Stage: Stage-9
+ Stage: Stage-4
Create Table Operator:
Create Table
columns: k string, value string
@@ -109,40 +95,6 @@ STAGE PLANS:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_CTAS1
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_CTAS1
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: create table nzhang_CTAS1 as select key k, value from src sort by k, value limit 10
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@src
@@ -208,15 +160,10 @@ POSTHOOK: query: explain create table nz
POSTHOOK: type: CREATETABLE_AS_SELECT
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-9 depends on stages: Stage-2, Stage-0
- Stage-3 depends on stages: Stage-9
- Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-2, Stage-0
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
@@ -267,19 +214,10 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.nzhang_ctas2
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
- Stage: Stage-9
+ Stage: Stage-4
Create Table Operator:
Create Table
columns: key string, value string
@@ -296,40 +234,6 @@ STAGE PLANS:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas2
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas2
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: create table nzhang_ctas2 as select * from src sort by key, value limit 10
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@src
@@ -395,15 +299,10 @@ POSTHOOK: query: explain create table nz
POSTHOOK: type: CREATETABLE_AS_SELECT
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-9 depends on stages: Stage-2, Stage-0
- Stage-3 depends on stages: Stage-9
- Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-2, Stage-0
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
@@ -454,19 +353,10 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
name: default.nzhang_ctas3
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
- Stage: Stage-9
+ Stage: Stage-4
Create Table Operator:
Create Table
columns: half_key double, conb string
@@ -484,18 +374,6 @@ STAGE PLANS:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-4
- Block level merge
-
- Stage: Stage-6
- Block level merge
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: create table nzhang_ctas3 row format serde "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" stored as RCFile as select key/2 half_key, concat(value, "_con") conb from src sort by half_key, conb limit 10
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@src
@@ -624,15 +502,10 @@ POSTHOOK: query: explain create table nz
POSTHOOK: type: CREATETABLE_AS_SELECT
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-9 depends on stages: Stage-2, Stage-0
- Stage-3 depends on stages: Stage-9
- Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-2, Stage-0
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
@@ -683,19 +556,10 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.nzhang_ctas4
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
- Stage: Stage-9
+ Stage: Stage-4
Create Table Operator:
Create Table
columns: key string, value string
@@ -713,40 +577,6 @@ STAGE PLANS:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas4
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas4
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: create table nzhang_ctas4 row format delimited fields terminated by ',' stored as textfile as select key, value from src sort by key, value limit 10
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@src
@@ -853,15 +683,10 @@ TOK_CREATETABLE
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-9 depends on stages: Stage-2, Stage-0
- Stage-3 depends on stages: Stage-9
- Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-2, Stage-0
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
@@ -979,19 +804,10 @@ STAGE PLANS:
GatherStats: true
MultiFileSpray: false
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
- Stage: Stage-9
+ Stage: Stage-4
Create Table Operator:
Create Table
columns: key string, value string
@@ -1012,140 +828,6 @@ STAGE PLANS:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- GatherStats: false
- File Output Operator
- compressed: false
- GlobalTableId: 0
-#### A masked pattern was here ####
- NumFilesPerFileSink: 1
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- properties:
- columns _col0,_col1
- columns.types string:string
- field.delim ,
- line.delim
-
- name default.nzhang_ctas5
- serialization.format ,
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas5
- TotalFiles: 1
- GatherStats: false
- MultiFileSpray: false
- Path -> Alias:
-#### A masked pattern was here ####
- Path -> Partition:
-#### A masked pattern was here ####
- Partition
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- properties:
- columns _col0,_col1
- columns.types string:string
- field.delim ,
- line.delim
-
- name default.nzhang_ctas5
- serialization.format ,
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- properties:
- columns _col0,_col1
- columns.types string:string
- field.delim ,
- line.delim
-
- name default.nzhang_ctas5
- serialization.format ,
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas5
- name: default.nzhang_ctas5
- Truncated Path -> Alias:
-#### A masked pattern was here ####
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- GatherStats: false
- File Output Operator
- compressed: false
- GlobalTableId: 0
-#### A masked pattern was here ####
- NumFilesPerFileSink: 1
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- properties:
- columns _col0,_col1
- columns.types string:string
- field.delim ,
- line.delim
-
- name default.nzhang_ctas5
- serialization.format ,
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas5
- TotalFiles: 1
- GatherStats: false
- MultiFileSpray: false
- Path -> Alias:
-#### A masked pattern was here ####
- Path -> Partition:
-#### A masked pattern was here ####
- Partition
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- properties:
- columns _col0,_col1
- columns.types string:string
- field.delim ,
- line.delim
-
- name default.nzhang_ctas5
- serialization.format ,
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- properties:
- columns _col0,_col1
- columns.types string:string
- field.delim ,
- line.delim
-
- name default.nzhang_ctas5
- serialization.format ,
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.nzhang_ctas5
- name: default.nzhang_ctas5
- Truncated Path -> Alias:
-#### A masked pattern was here ####
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: create table nzhang_ctas5 row format delimited fields terminated by ',' lines terminated by '\012' stored as textfile as select key, value from src sort by key, value limit 10
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@src
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby1.q.out Mon Feb 17 07:58:47 2014
@@ -11,14 +11,9 @@ FROM src INSERT OVERWRITE TABLE dest_g1
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+ Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
STAGE PLANS:
Stage: Stage-1
@@ -77,15 +72,6 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest_g1
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
@@ -102,40 +88,6 @@ STAGE PLANS:
Stage: Stage-3
Stats-Aggr Operator
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest_g1
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest_g1
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest_g1 SELECT src.key, sum(substr(src.value,5)) GROUP BY src.key
PREHOOK: type: QUERY
PREHOOK: Input: default@src
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby2.q.out Mon Feb 17 07:58:47 2014
@@ -13,14 +13,9 @@ INSERT OVERWRITE TABLE dest_g2 SELECT su
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+ Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
STAGE PLANS:
Stage: Stage-1
@@ -63,15 +58,6 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest_g2
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
@@ -88,40 +74,6 @@ STAGE PLANS:
Stage: Stage-3
Stats-Aggr Operator
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest_g2
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest_g2
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: FROM src
INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))) GROUP BY substr(src.key,1,1)
PREHOOK: type: QUERY
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out?rev=1568894&r1=1568893&r2=1568894&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/groupby3.q.out Mon Feb 17 07:58:47 2014
@@ -31,14 +31,9 @@ INSERT OVERWRITE TABLE dest1 SELECT
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-2 depends on stages: Stage-5, Stage-4, Stage-7
+ Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
- Stage-4
- Stage-6
- Stage-7 depends on stages: Stage-6
STAGE PLANS:
Stage: Stage-1
@@ -92,15 +87,6 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Stage: Stage-8
- Conditional Operator
-
- Stage: Stage-5
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
Stage: Stage-2
Dependency Collection
@@ -117,40 +103,6 @@ STAGE PLANS:
Stage: Stage-3
Stats-Aggr Operator
- Stage: Stage-4
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest1
-
- Stage: Stage-6
- Tez
- Vertices:
- Merge
- Map Operator Tree:
- TableScan
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest1
-
- Stage: Stage-7
- Move Operator
- files:
- hdfs directory: true
-#### A masked pattern was here ####
-
PREHOOK: query: FROM src
INSERT OVERWRITE TABLE dest1 SELECT
sum(substr(src.value,5)),