You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/29 17:50:17 UTC
svn commit: r1508111 [1/27] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/index/compact/
java/org/apache/hadoop/hive/ql/io/ java/org/...
Author: hashutosh
Date: Mon Jul 29 15:50:12 2013
New Revision: 1508111
URL: http://svn.apache.org/r1508111
Log:
HIVE-4825 : Separate MapredWork into MapWork and ReduceWork (Gunther Hagleitner via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/Explain.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
hive/trunk/ql/src/test/results/clientpositive/alter_partition_coltype.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_1.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_12.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_3.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_4.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_5.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_7.q.out
hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket2.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket3.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket4.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket5.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_1.q.out
hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_2.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_1.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_6.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_7.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketcontext_8.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin10.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin11.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin12.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin13.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin5.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin7.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin8.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin9.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out
hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative3.q.out
hive/trunk/ql/src/test/results/clientpositive/columnstats_partlvl.q.out
hive/trunk/ql/src/test/results/clientpositive/columnstats_tbllvl.q.out
hive/trunk/ql/src/test/results/clientpositive/combine2.q.out
hive/trunk/ql/src/test/results/clientpositive/combine2_hadoop20.q.out
hive/trunk/ql/src/test/results/clientpositive/ctas.q.out
hive/trunk/ql/src/test/results/clientpositive/ctas_hadoop20.q.out
hive/trunk/ql/src/test/results/clientpositive/disable_merge_for_bucketing.q.out
hive/trunk/ql/src/test/results/clientpositive/dynamic_partition_skip_default.q.out
hive/trunk/ql/src/test/results/clientpositive/filter_join_breaktask.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr_multi_distinct.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_ppr.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_ppr_multi_distinct.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_sort_1.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_sort_6.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_sort_skew_1.q.out
hive/trunk/ql/src/test/results/clientpositive/input23.q.out
hive/trunk/ql/src/test/results/clientpositive/input4.q.out
hive/trunk/ql/src/test/results/clientpositive/input42.q.out
hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
hive/trunk/ql/src/test/results/clientpositive/input_part7.q.out
hive/trunk/ql/src/test/results/clientpositive/input_part9.q.out
hive/trunk/ql/src/test/results/clientpositive/join17.q.out
hive/trunk/ql/src/test/results/clientpositive/join26.q.out
hive/trunk/ql/src/test/results/clientpositive/join32.q.out
hive/trunk/ql/src/test/results/clientpositive/join32_lessSize.q.out
hive/trunk/ql/src/test/results/clientpositive/join33.q.out
hive/trunk/ql/src/test/results/clientpositive/join34.q.out
hive/trunk/ql/src/test/results/clientpositive/join35.q.out
hive/trunk/ql/src/test/results/clientpositive/join9.q.out
hive/trunk/ql/src/test/results/clientpositive/join_filters_overlap.q.out
hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_1.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_11.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_12.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_13.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_2.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_3.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_5.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_8.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_1.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_2.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_3.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_oneskew_1.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_oneskew_2.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_oneskew_3.q.out
hive/trunk/ql/src/test/results/clientpositive/load_dyn_part8.q.out
hive/trunk/ql/src/test/results/clientpositive/louter_join_ppr.q.out
hive/trunk/ql/src/test/results/clientpositive/macro.q.out
hive/trunk/ql/src/test/results/clientpositive/merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/metadataonly1.q.out
hive/trunk/ql/src/test/results/clientpositive/outer_join_ppr.q.out
hive/trunk/ql/src/test/results/clientpositive/parallel_orderby.q.out
hive/trunk/ql/src/test/results/clientpositive/pcr.q.out
hive/trunk/ql/src/test/results/clientpositive/plan_json.q.out
hive/trunk/ql/src/test/results/clientpositive/ppd_join_filter.q.out
hive/trunk/ql/src/test/results/clientpositive/ppd_union_view.q.out
hive/trunk/ql/src/test/results/clientpositive/ppr_allchildsarenull.q.out
hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner1.q.out
hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner3.q.out
hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out
hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
hive/trunk/ql/src/test/results/clientpositive/router_join_ppr.q.out
hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
hive/trunk/ql/src/test/results/clientpositive/sample10.q.out
hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
hive/trunk/ql/src/test/results/clientpositive/sample9.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin9.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_11.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_12.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_13.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_15.q.out
hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_5.q.out
hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_6.q.out
hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_7.q.out
hive/trunk/ql/src/test/results/clientpositive/stats0.q.out
hive/trunk/ql/src/test/results/clientpositive/stats11.q.out
hive/trunk/ql/src/test/results/clientpositive/stats12.q.out
hive/trunk/ql/src/test/results/clientpositive/stats13.q.out
hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
hive/trunk/ql/src/test/results/clientpositive/truncate_column_list_bucket.q.out
hive/trunk/ql/src/test/results/clientpositive/udf_explode.q.out
hive/trunk/ql/src/test/results/clientpositive/udf_java_method.q.out
hive/trunk/ql/src/test/results/clientpositive/udf_reflect.q.out
hive/trunk/ql/src/test/results/clientpositive/udf_reflect2.q.out
hive/trunk/ql/src/test/results/clientpositive/udtf_explode.q.out
hive/trunk/ql/src/test/results/clientpositive/union22.q.out
hive/trunk/ql/src/test/results/clientpositive/union24.q.out
hive/trunk/ql/src/test/results/clientpositive/union_ppr.q.out
hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jul 29 15:50:12 2013
@@ -460,12 +460,12 @@ public class Driver implements CommandPr
// serialize the queryPlan
FileOutputStream fos = new FileOutputStream(queryPlanFileName);
- Utilities.serializeQueryPlan(plan, fos);
+ Utilities.serializeObject(plan, fos);
fos.close();
// deserialize the queryPlan
FileInputStream fis = new FileInputStream(queryPlanFileName);
- QueryPlan newPlan = Utilities.deserializeQueryPlan(fis, conf);
+ QueryPlan newPlan = Utilities.deserializeObject(fis);
fis.close();
// Use the deserialized plan
@@ -878,14 +878,17 @@ public class Driver implements CommandPr
public CommandProcessorResponse run(String command) throws CommandNeedRetryException {
CommandProcessorResponse cpr = runInternal(command);
- if(cpr.getResponseCode() == 0)
+ if(cpr.getResponseCode() == 0) {
return cpr;
+ }
SessionState ss = SessionState.get();
- if(ss == null)
+ if(ss == null) {
return cpr;
+ }
MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
- if(!(mdf instanceof JsonMetaDataFormatter))
+ if(!(mdf instanceof JsonMetaDataFormatter)) {
return cpr;
+ }
/*Here we want to encode the error in machine readable way (e.g. JSON)
* Ideally, errorCode would always be set to a canonical error defined in ErrorMsg.
* In practice that is rarely the case, so the messy logic below tries to tease
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Mon Jul 29 15:50:12 2013
@@ -233,7 +233,7 @@ public class QueryPlan implements Serial
mapTask.setTaskId(stage.getStageId() + "_MAP");
mapTask.setTaskType(TaskType.MAP);
stage.addToTaskList(mapTask);
- populateOperatorGraph(mapTask, mrTask.getWork().getAliasToWork()
+ populateOperatorGraph(mapTask, mrTask.getWork().getMapWork().getAliasToWork()
.values());
// populate reduce task
@@ -245,7 +245,7 @@ public class QueryPlan implements Serial
stage.addToTaskList(reduceTask);
Collection<Operator<? extends OperatorDesc>> reducerTopOps =
new ArrayList<Operator<? extends OperatorDesc>>();
- reducerTopOps.add(mrTask.getWork().getReducer());
+ reducerTopOps.add(mrTask.getWork().getReduceWork().getReducer());
populateOperatorGraph(reduceTask, reducerTopOps);
}
} else {
@@ -382,7 +382,7 @@ public class QueryPlan implements Serial
}
if (task instanceof ExecDriver) {
ExecDriver mrTask = (ExecDriver) task;
- extractOperatorCounters(mrTask.getWork().getAliasToWork().values(),
+ extractOperatorCounters(mrTask.getWork().getMapWork().getAliasToWork().values(),
task.getId() + "_MAP");
if (mrTask.mapStarted()) {
started.add(task.getId() + "_MAP");
@@ -393,7 +393,7 @@ public class QueryPlan implements Serial
if (mrTask.hasReduce()) {
Collection<Operator<? extends OperatorDesc>> reducerTopOps =
new ArrayList<Operator<? extends OperatorDesc>>();
- reducerTopOps.add(mrTask.getWork().getReducer());
+ reducerTopOps.add(mrTask.getWork().getReduceWork().getReducer());
extractOperatorCounters(reducerTopOps, task.getId() + "_REDUCE");
if (mrTask.reduceStarted()) {
started.add(task.getId() + "_REDUCE");
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Mon Jul 29 15:50:12 2013
@@ -121,8 +121,9 @@ public class ExplainTask extends Task<Ex
}
if (work.getParseContext() != null) {
- JSONObject jsonPlan = outputMap(work.getParseContext().getTopOps(),
- "LOGICAL PLAN", out, jsonOutput, work.getExtended(), 0);
+ out.print("LOGICAL PLAN");
+ JSONObject jsonPlan = outputMap(work.getParseContext().getTopOps(), true,
+ out, jsonOutput, work.getExtended(), 0);
if (out != null) {
out.println();
}
@@ -228,19 +229,16 @@ public class ExplainTask extends Task<Ex
return sb.toString();
}
- private JSONObject outputMap(Map<?, ?> mp, String header, PrintStream out,
+ private JSONObject outputMap(Map<?, ?> mp, boolean hasHeader, PrintStream out,
boolean extended, boolean jsonOutput, int indent) throws Exception {
- boolean first_el = true;
TreeMap<Object, Object> tree = new TreeMap<Object, Object>();
tree.putAll(mp);
JSONObject json = jsonOutput ? new JSONObject() : null;
+ if (out != null && hasHeader && !mp.isEmpty()) {
+ out.println();
+ }
for (Entry<?, ?> ent : tree.entrySet()) {
- if (first_el && (out != null)) {
- out.println(header);
- }
- first_el = false;
-
// Print the key
if (out != null) {
out.print(indentString(indent));
@@ -286,7 +284,7 @@ public class ExplainTask extends Task<Ex
return jsonOutput ? json : null;
}
- private JSONArray outputList(List<?> l, String header, PrintStream out,
+ private JSONArray outputList(List<?> l, PrintStream out, boolean hasHeader,
boolean extended, boolean jsonOutput, int indent) throws Exception {
boolean first_el = true;
@@ -294,10 +292,6 @@ public class ExplainTask extends Task<Ex
JSONArray outputArray = new JSONArray();
for (Object o : l) {
- if (first_el && (out != null)) {
- out.print(header);
- }
-
if (isPrintable(o)) {
String delim = first_el ? " " : ", ";
if (out != null) {
@@ -311,11 +305,11 @@ public class ExplainTask extends Task<Ex
nl = true;
}
else if (o instanceof Serializable) {
- if (first_el && (out != null)) {
+ if (first_el && (out != null) && hasHeader) {
out.println();
}
JSONObject jsonOut = outputPlan((Serializable) o, out, extended,
- jsonOutput, jsonOutput ? 0 : indent + 2);
+ jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent));
if (jsonOutput) {
outputArray.put(jsonOut);
}
@@ -439,10 +433,14 @@ public class ExplainTask extends Task<Ex
}
String header = null;
+ boolean skipHeader = xpl_note.skipHeader();
+ boolean emptyHeader = false;
+
if (!xpl_note.displayName().equals("")) {
header = indentString(prop_indents) + xpl_note.displayName() + ":";
}
else {
+ emptyHeader = true;
prop_indents = indent;
header = indentString(prop_indents);
}
@@ -450,7 +448,9 @@ public class ExplainTask extends Task<Ex
// Try the output as a primitive object
if (isPrintable(val)) {
if (out != null && shouldPrint(xpl_note, val)) {
- out.printf("%s ", header);
+ if (!skipHeader) {
+ out.printf("%s ", header);
+ }
out.println(val);
}
if (jsonOutput) {
@@ -458,12 +458,26 @@ public class ExplainTask extends Task<Ex
}
continue;
}
+
+ int ind = 0;
+ if (!jsonOutput) {
+ if (!skipHeader) {
+ ind = prop_indents + 2;
+ } else {
+ ind = indent;
+ }
+ }
+
// Try this as a map
try {
// Go through the map and print out the stuff
Map<?, ?> mp = (Map<?, ?>) val;
- JSONObject jsonOut = outputMap(mp, header, out, extended, jsonOutput,
- jsonOutput ? 0 : prop_indents + 2);
+
+ if (out != null && !skipHeader && mp != null && !mp.isEmpty()) {
+ out.print(header);
+ }
+
+ JSONObject jsonOut = outputMap(mp, !skipHeader && !emptyHeader, out, extended, jsonOutput, ind);
if (jsonOutput) {
json.put(header, jsonOut);
}
@@ -476,8 +490,12 @@ public class ExplainTask extends Task<Ex
// Try this as a list
try {
List<?> l = (List<?>) val;
- JSONArray jsonOut = outputList(l, header, out, extended, jsonOutput,
- jsonOutput ? 0 : prop_indents + 2);
+
+ if (out != null && !skipHeader && l != null && !l.isEmpty()) {
+ out.print(header);
+ }
+
+ JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, jsonOutput, ind);
if (jsonOutput) {
json.put(header, jsonOut);
@@ -492,11 +510,11 @@ public class ExplainTask extends Task<Ex
// Finally check if it is serializable
try {
Serializable s = (Serializable) val;
- if (out != null) {
+
+ if (!skipHeader && out != null) {
out.println(header);
}
- JSONObject jsonOut = outputPlan(s, out, extended, jsonOutput,
- jsonOutput ? 0 : prop_indents + 2);
+ JSONObject jsonOut = outputPlan(s, out, extended, jsonOutput, ind);
if (jsonOutput) {
json.put(header, jsonOut);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Mon Jul 29 15:50:12 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -63,7 +63,7 @@ import org.apache.hadoop.util.StringUtil
* different from regular operators in that it starts off by processing a
* Writable data structure from a Table (instead of a Hive Object).
**/
-public class MapOperator extends Operator<MapredWork> implements Serializable, Cloneable {
+public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@@ -229,14 +229,14 @@ public class MapOperator extends Operato
* @param mrwork
* @throws HiveException
*/
- public void initializeAsRoot(Configuration hconf, MapredWork mrwork)
+ public void initializeAsRoot(Configuration hconf, MapWork mapWork)
throws HiveException {
- setConf(mrwork);
+ setConf(mapWork);
setChildren(hconf);
initialize(hconf, null);
}
- private MapOpCtx initObjectInspector(MapredWork conf,
+ private MapOpCtx initObjectInspector(MapWork conf,
Configuration hconf, String onefile, Map<TableDesc, StructObjectInspector> convertedOI)
throws HiveException,
ClassNotFoundException, InstantiationException, IllegalAccessException,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Jul 29 15:50:12 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.Dy
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -306,9 +307,13 @@ public class MoveTask extends Task<MoveW
// the directory this move task is moving
if (task instanceof MapRedTask) {
MapredWork work = (MapredWork)task.getWork();
- bucketCols = work.getBucketedColsByDirectory().get(path);
- sortCols = work.getSortedColsByDirectory().get(path);
- numBuckets = work.getNumReduceTasks();
+ MapWork mapWork = work.getMapWork();
+ bucketCols = mapWork.getBucketedColsByDirectory().get(path);
+ sortCols = mapWork.getSortedColsByDirectory().get(path);
+ if (work.getReduceWork() != null) {
+ numBuckets = work.getReduceWork().getNumReduceTasks();
+ }
+
if (bucketCols != null || sortCols != null) {
// This must be a final map reduce task (the task containing the file sink
// operator that writes the final output)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jul 29 15:50:12 2013
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -121,11 +122,13 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
import org.apache.hadoop.hive.ql.plan.api.Graph;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -169,6 +172,8 @@ public final class Utilities {
*/
public static String HADOOP_LOCAL_FS = "file:///";
+ public static String MAP_PLAN_NAME = "map.xml";
+ public static String REDUCE_PLAN_NAME = "reduce.xml";
/**
* ReduceField:
@@ -191,56 +196,85 @@ public final class Utilities {
// prevent instantiation
}
- private static Map<String, MapredWork> gWorkMap = Collections
- .synchronizedMap(new HashMap<String, MapredWork>());
+ private static Map<Path, BaseWork> gWorkMap = Collections
+ .synchronizedMap(new HashMap<Path, BaseWork>());
private static final Log LOG = LogFactory.getLog(Utilities.class.getName());
- public static void clearMapRedWork(Configuration job) {
+ public static void clearWork(Configuration conf) {
+ Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
+ Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME);
+
+ // if the plan path hasn't been initialized just return, nothing to clean.
+ if (mapPath == null || reducePath == null) {
+ return;
+ }
+
try {
- Path planPath = new Path(HiveConf.getVar(job, HiveConf.ConfVars.PLAN));
- FileSystem fs = planPath.getFileSystem(job);
- if (fs.exists(planPath)) {
- try {
- fs.delete(planPath, true);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ FileSystem fs = mapPath.getFileSystem(conf);
+ if (fs.exists(mapPath)) {
+ fs.delete(mapPath, true);
+ }
+ if (fs.exists(reducePath)) {
+ fs.delete(reducePath, true);
}
+
} catch (Exception e) {
+ LOG.warn("Failed to clean-up tmp directories.", e);
} finally {
// where a single process works with multiple plans - we must clear
// the cache before working with the next plan.
- String jobID = getHiveJobID(job);
- if (jobID != null) {
- gWorkMap.remove(jobID);
+ if (mapPath != null) {
+ gWorkMap.remove(mapPath);
+ }
+ if (reducePath != null) {
+ gWorkMap.remove(reducePath);
}
}
}
- public static MapredWork getMapRedWork(Configuration job) {
- MapredWork gWork = null;
+ public static MapredWork getMapRedWork(Configuration conf) {
+ MapredWork w = new MapredWork();
+ w.setMapWork(getMapWork(conf));
+ w.setReduceWork(getReduceWork(conf));
+ return w;
+ }
+
+ public static MapWork getMapWork(Configuration conf) {
+ return (MapWork) getBaseWork(conf, MAP_PLAN_NAME);
+ }
+
+ public static ReduceWork getReduceWork(Configuration conf) {
+ return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
+ }
+
+ public static BaseWork getBaseWork(Configuration conf, String name) {
+ BaseWork gWork = null;
+ Path path = null;
try {
- String jobID = getHiveJobID(job);
- assert jobID != null;
- gWork = gWorkMap.get(jobID);
+ path = getPlanPath(conf, name);
+ assert path != null;
+ gWork = gWorkMap.get(path);
if (gWork == null) {
- String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job);
- String path;
+ String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf);
+ Path localPath;
if (jtConf.equals("local")) {
- String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
- path = new Path(planPath).toUri().getPath();
+ localPath = path;
} else {
- path = "HIVE_PLAN" + jobID;
+ localPath = new Path(name);
}
- InputStream in = new FileInputStream(path);
- MapredWork ret = deserializeMapRedWork(in, job);
+ InputStream in = new FileInputStream(localPath.toUri().getPath());
+ BaseWork ret = deserializeObject(in);
gWork = ret;
- gWork.initialize();
- gWorkMap.put(jobID, gWork);
+ gWorkMap.put(path, gWork);
}
- return (gWork);
+ return gWork;
+ } catch (FileNotFoundException fnf) {
+ // happens. e.g.: no reduce work.
+ LOG.debug("No plan file found: "+path);
+ return null;
} catch (Exception e) {
e.printStackTrace();
+ LOG.error("Failed to load plan: "+path, e);
throw new RuntimeException(e);
}
}
@@ -414,47 +448,80 @@ public final class Utilities {
}
}
- public static void setMapRedWork(Configuration job, MapredWork w, String hiveScratchDir) {
+ public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
+ setMapWork(conf, w.getMapWork(), hiveScratchDir);
+ if (w.getReduceWork() != null) {
+ setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+ }
+ }
+
+ public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
+ setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+ }
+
+ public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
+ setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+ }
+
+ private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
try {
+ setPlanPath(conf, hiveScratchDir);
- // this is the unique job ID, which is kept in JobConf as part of the plan file name
- String jobID = UUID.randomUUID().toString();
- Path planPath = new Path(hiveScratchDir, jobID);
- HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+ Path planPath = getPlanPath(conf, name);
- // use the default file system of the job
- FileSystem fs = planPath.getFileSystem(job);
+ // use the default file system of the conf
+ FileSystem fs = planPath.getFileSystem(conf);
FSDataOutputStream out = fs.create(planPath);
- serializeMapRedWork(w, out);
+ serializeObject(w, out);
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
// able to get the plan directly from the cache
- if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
+ if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
// Set up distributed cache
- DistributedCache.createSymlink(job);
- String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID;
- DistributedCache.addCacheFile(new URI(uriWithLink), job);
+ if (!DistributedCache.getSymlink(conf)) {
+ DistributedCache.createSymlink(conf);
+ }
+ String uriWithLink = planPath.toUri().toString() + "#" + name;
+ DistributedCache.addCacheFile(new URI(uriWithLink), conf);
// set replication of the plan file to a high number. we use the same
// replication factor as used by the hadoop jobclient for job.xml etc.
- short replication = (short) job.getInt("mapred.submit.replication", 10);
+ short replication = (short) conf.getInt("mapred.submit.replication", 10);
fs.setReplication(planPath, replication);
}
// Cache the plan in this process
- w.initialize();
- gWorkMap.put(jobID, w);
+ gWorkMap.put(planPath, w);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
- public static String getHiveJobID(Configuration job) {
- String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
- if (planPath != null && !planPath.isEmpty()) {
- return (new Path(planPath)).getName();
+ private static Path getPlanPath(Configuration conf, String name) {
+ Path planPath = getPlanPath(conf);
+ if (planPath == null) {
+ return null;
+ }
+ return new Path(planPath, name);
+ }
+
+ private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException {
+ if (getPlanPath(conf) == null) {
+ // this is the unique conf ID, which is kept in JobConf as part of the plan file name
+ String jobID = UUID.randomUUID().toString();
+ Path planPath = new Path(hiveScratchDir, jobID);
+ FileSystem fs = planPath.getFileSystem(conf);
+ fs.mkdirs(planPath);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+ }
+ }
+
+ private static Path getPlanPath(Configuration conf) {
+ String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
+ if (plan != null && !plan.isEmpty()) {
+ return new Path(plan);
}
return null;
}
@@ -495,27 +562,6 @@ public final class Utilities {
}
}
- /**
- * Serialize a single Task.
- */
- public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
- XMLEncoder e = null;
- try {
- e = new XMLEncoder(out);
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
- e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
- e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
- e.writeObject(t);
- } finally {
- if (null != e) {
- e.close();
- }
- }
- }
-
public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
@@ -532,14 +578,15 @@ public final class Utilities {
}
/**
- * Serialize the whole query plan.
+ * Serialize the object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
*/
- public static void serializeQueryPlan(QueryPlan plan, OutputStream out) {
+ public static void serializeObject(Object plan, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
e.setExceptionListener(new ExceptionListener() {
public void exceptionThrown(Exception e) {
LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new RuntimeException("Cannot serialize the query plan", e);
+ throw new RuntimeException("Cannot serialize object", e);
}
});
// workaround for java 1.5
@@ -557,83 +604,14 @@ public final class Utilities {
}
/**
- * Deserialize the whole query plan.
- */
- public static QueryPlan deserializeQueryPlan(InputStream in, Configuration conf) {
- XMLDecoder d = null;
- try {
- d = new XMLDecoder(in, null, null);
- QueryPlan ret = (QueryPlan) d.readObject();
- return (ret);
- } finally {
- if (null != d) {
- d.close();
- }
- }
- }
-
- /**
- * Serialize the mapredWork object to an output stream. DO NOT use this to write to standard
- * output since it closes the output stream. DO USE mapredWork.toXML() instead.
- */
- public static void serializeMapRedWork(MapredWork w, OutputStream out) {
- XMLEncoder e = null;
- try {
- e = new XMLEncoder(out);
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
- e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
- e.writeObject(w);
- } finally {
- if (null != e) {
- e.close();
- }
- }
-
- }
-
- public static MapredWork deserializeMapRedWork(InputStream in, Configuration conf) {
- XMLDecoder d = null;
- try {
- d = new XMLDecoder(in, null, null);
- MapredWork ret = (MapredWork) d.readObject();
- return (ret);
- } finally {
- if (null != d) {
- d.close();
- }
- }
- }
-
- /**
- * Serialize the mapredLocalWork object to an output stream. DO NOT use this to write to standard
- * output since it closes the output stream. DO USE mapredWork.toXML() instead.
+ * De-serialize an object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
*/
- public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) {
- XMLEncoder e = null;
- try {
- e = new XMLEncoder(out);
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
- e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
- e.writeObject(w);
- } finally {
- if (null != e) {
- e.close();
- }
- }
- }
-
- public static MapredLocalWork deserializeMapRedLocalWork(InputStream in, Configuration conf) {
+ public static <T> T deserializeObject(InputStream in) {
XMLDecoder d = null;
try {
d = new XMLDecoder(in, null, null);
- MapredLocalWork ret = (MapredLocalWork) d.readObject();
- return (ret);
+ return (T) d.readObject();
} finally {
if (null != d) {
d.close();
@@ -1812,7 +1790,7 @@ public final class Utilities {
* @return the summary of all the input paths.
* @throws IOException
*/
- public static ContentSummary getInputSummary(Context ctx, MapredWork work, PathFilter filter)
+ public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
throws IOException {
long[] summary = {0, 0, 0};
@@ -2273,7 +2251,7 @@ public final class Utilities {
try {
MapredWork mapredWork = ((MapRedTask) task).getWork();
Set<Class<? extends InputFormat>> reworkInputFormats = new HashSet<Class<? extends InputFormat>>();
- for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) {
+ for (PartitionDesc part : mapredWork.getMapWork().getPathToPartitionInfo().values()) {
Class<? extends InputFormat> inputFormatCls = part
.getInputFileFormatClass();
if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Jul 29 15:50:12 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -63,7 +64,6 @@ import org.apache.hadoop.hive.ql.exec.Pa
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -73,10 +73,12 @@ import org.apache.hadoop.hive.ql.io.OneN
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -102,7 +104,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.varia.NullAppender;
/**
- * ExecDriver is the central class in co-ordinating execution of any map-reduce task.
+ * ExecDriver is the central class in co-ordinating execution of any map-reduce task.
* It's main responsabilities are:
*
* - Converting the plan (MapredWork) into a MR Job (JobConf)
@@ -196,13 +198,13 @@ public class ExecDriver extends Task<Map
* @return true if fatal errors happened during job execution, false otherwise.
*/
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
return true;
}
}
- if (work.getReducer() != null) {
- if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
+ if (work.getReduceWork() != null) {
+ if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) {
return true;
}
}
@@ -211,18 +213,18 @@ public class ExecDriver extends Task<Map
protected void createTmpDirs() throws IOException {
// fix up outputs
- Map<String, ArrayList<String>> pa = work.getPathToAliases();
+ Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
if (pa != null) {
List<Operator<? extends OperatorDesc>> opList =
new ArrayList<Operator<? extends OperatorDesc>>();
- if (work.getReducer() != null) {
- opList.add(work.getReducer());
+ if (work.getReduceWork() != null) {
+ opList.add(work.getReduceWork().getReducer());
}
for (List<String> ls : pa.values()) {
for (String a : ls) {
- opList.add(work.getAliasToWork().get(a));
+ opList.add(work.getMapWork().getAliasToWork().get(a));
while (!opList.isEmpty()) {
Operator<? extends OperatorDesc> op = opList.remove(0);
@@ -251,6 +253,7 @@ public class ExecDriver extends Task<Map
/**
* Execute a query plan using Hadoop.
*/
+ @SuppressWarnings("deprecation")
@Override
public int execute(DriverContext driverContext) {
@@ -259,16 +262,14 @@ public class ExecDriver extends Task<Map
boolean success = true;
- String invalidReason = work.isInvalid();
- if (invalidReason != null) {
- throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
- }
-
Context ctx = driverContext.getCtx();
boolean ctxCreated = false;
String emptyScratchDirStr;
Path emptyScratchDir;
+ MapWork mWork = work.getMapWork();
+ ReduceWork rWork = work.getReduceWork();
+
try {
if (ctx == null) {
ctx = new Context(job);
@@ -301,27 +302,27 @@ public class ExecDriver extends Task<Map
throw new RuntimeException(e.getMessage());
}
- if (work.getNumMapTasks() != null) {
- job.setNumMapTasks(work.getNumMapTasks().intValue());
+ if (mWork.getNumMapTasks() != null) {
+ job.setNumMapTasks(mWork.getNumMapTasks().intValue());
}
- if (work.getMaxSplitSize() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
+ if (mWork.getMaxSplitSize() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, mWork.getMaxSplitSize().longValue());
}
- if (work.getMinSplitSize() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
+ if (mWork.getMinSplitSize() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, mWork.getMinSplitSize().longValue());
}
- if (work.getMinSplitSizePerNode() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
+ if (mWork.getMinSplitSizePerNode() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, mWork.getMinSplitSizePerNode().longValue());
}
- if (work.getMinSplitSizePerRack() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
+ if (mWork.getMinSplitSizePerRack() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, mWork.getMinSplitSizePerRack().longValue());
}
- job.setNumReduceTasks(work.getNumReduceTasks().intValue());
+ job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
job.setReducerClass(ExecReducer.class);
// set input format information if necessary
@@ -338,7 +339,7 @@ public class ExecDriver extends Task<Map
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
}
- if (getWork().isUseBucketizedHiveInputFormat()) {
+ if (mWork.isUseBucketizedHiveInputFormat()) {
inpFormat = BucketizedHiveInputFormat.class.getName();
}
@@ -387,11 +388,11 @@ public class ExecDriver extends Task<Map
}
try{
- MapredLocalWork localwork = work.getMapLocalWork();
+ MapredLocalWork localwork = mWork.getMapLocalWork();
if (localwork != null) {
if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
Path localPath = new Path(localwork.getTmpFileURI());
- Path hdfsPath = new Path(work.getTmpHDFSFileURI());
+ Path hdfsPath = new Path(mWork.getTmpHDFSFileURI());
FileSystem hdfs = hdfsPath.getFileSystem(job);
FileSystem localFS = localPath.getFileSystem(job);
@@ -429,17 +430,17 @@ public class ExecDriver extends Task<Map
}
}
work.configureJobConf(job);
- addInputPaths(job, work, emptyScratchDirStr, ctx);
+ addInputPaths(job, mWork, emptyScratchDirStr, ctx);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
- if (work.getSamplingType() > 0 && work.getNumReduceTasks() > 1) {
+ if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
try {
- handleSampling(driverContext, work, job, new HiveConf(conf));
+ handleSampling(driverContext, mWork, job, conf);
job.setPartitionerClass(HiveTotalOrderPartitioner.class);
} catch (Exception e) {
console.printInfo("Not enough sampling data.. Rolling back to single reducer task");
- work.setNumReduceTasks(1);
+ rWork.setNumReduceTasks(1);
job.setNumReduceTasks(1);
}
}
@@ -454,7 +455,7 @@ public class ExecDriver extends Task<Map
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
- if (work.isGatheringStats()) {
+ if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
// initialize stats publishing table
StatsPublisher statsPublisher;
String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS);
@@ -496,7 +497,7 @@ public class ExecDriver extends Task<Map
success = false;
returnVal = 1;
} finally {
- Utilities.clearMapRedWork(job);
+ Utilities.clearWork(job);
try {
if (ctxCreated) {
ctx.clear();
@@ -517,13 +518,13 @@ public class ExecDriver extends Task<Map
try {
if (rj != null) {
JobCloseFeedBack feedBack = new JobCloseFeedBack();
- if (work.getAliasToWork() != null) {
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ if (mWork.getAliasToWork() != null) {
+ for (Operator<? extends OperatorDesc> op : mWork.getAliasToWork().values()) {
op.jobClose(job, success, feedBack);
}
}
- if (work.getReducer() != null) {
- work.getReducer().jobClose(job, success, feedBack);
+ if (rWork != null) {
+ rWork.getReducer().jobClose(job, success, feedBack);
}
}
} catch (Exception e) {
@@ -539,16 +540,16 @@ public class ExecDriver extends Task<Map
return (returnVal);
}
- private void handleSampling(DriverContext context, MapredWork work, JobConf job, HiveConf conf)
+ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf)
throws Exception {
- assert work.getAliasToWork().keySet().size() == 1;
+ assert mWork.getAliasToWork().keySet().size() == 1;
- String alias = work.getAliases().get(0);
- Operator<?> topOp = work.getAliasToWork().get(alias);
- PartitionDesc partDesc = work.getAliasToPartnInfo().get(alias);
+ String alias = mWork.getAliases().get(0);
+ Operator<?> topOp = mWork.getAliasToWork().get(alias);
+ PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias);
- ArrayList<String> paths = work.getPaths();
- ArrayList<PartitionDesc> parts = work.getPartitionDescs();
+ ArrayList<String> paths = mWork.getPaths();
+ ArrayList<PartitionDesc> parts = mWork.getPartitionDescs();
Path onePath = new Path(paths.get(0));
String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
@@ -558,7 +559,7 @@ public class ExecDriver extends Task<Map
PartitionKeySampler sampler = new PartitionKeySampler();
- if (work.getSamplingType() == MapredWork.SAMPLING_ON_PREV_MR) {
+ if (mWork.getSamplingType() == MapWork.SAMPLING_ON_PREV_MR) {
console.printInfo("Use sampling data created in previous MR");
// merges sampling data from previous MR and make paritition keys for total sort
for (String path : paths) {
@@ -568,7 +569,7 @@ public class ExecDriver extends Task<Map
sampler.addSampleFile(status.getPath(), job);
}
}
- } else if (work.getSamplingType() == MapredWork.SAMPLING_ON_START) {
+ } else if (mWork.getSamplingType() == MapWork.SAMPLING_ON_START) {
console.printInfo("Creating sampling data..");
assert topOp instanceof TableScanOperator;
TableScanOperator ts = (TableScanOperator) topOp;
@@ -592,7 +593,7 @@ public class ExecDriver extends Task<Map
fetcher.clearFetchContext();
}
} else {
- throw new IllegalArgumentException("Invalid sampling type " + work.getSamplingType());
+ throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
}
sampler.writePartitionKeys(partitionFile, job);
}
@@ -601,16 +602,17 @@ public class ExecDriver extends Task<Map
* Set hive input format, and input format file if necessary.
*/
protected void setInputAttributes(Configuration conf) {
- if (work.getInputformat() != null) {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
- }
- if (work.getIndexIntermediateFile() != null) {
- conf.set("hive.index.compact.file", work.getIndexIntermediateFile());
- conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile());
+ MapWork mWork = work.getMapWork();
+ if (mWork.getInputformat() != null) {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+ }
+ if (mWork.getIndexIntermediateFile() != null) {
+ conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+ conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
}
// Intentionally overwrites anything the user may have put here
- conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted());
+ conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
}
public boolean mapStarted() {
@@ -757,12 +759,12 @@ public class ExecDriver extends Task<Map
int ret;
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
- MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf);
+ MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext());
} else {
- MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
+ MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
@@ -823,19 +825,19 @@ public class ExecDriver extends Task<Map
@Override
public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
- return getWork().getAliasToWork().values();
+ return getWork().getMapWork().getAliasToWork().values();
}
@Override
public boolean hasReduce() {
MapredWork w = getWork();
- return w.getReducer() != null;
+ return w.getReduceWork() != null;
}
/**
* Handle a empty/null path for a given alias.
*/
- private static int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir,
+ private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
// either the directory does not exist or it is empty
assert path == null || isEmptyPath;
@@ -919,7 +921,7 @@ public class ExecDriver extends Task<Map
return numEmptyPaths;
}
- public static void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx)
+ public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
throws Exception {
int numEmptyPaths = 0;
@@ -1002,11 +1004,11 @@ public class ExecDriver extends Task<Map
@Override
public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
op.updateCounters(ctrs);
}
- if (work.getReducer() != null) {
- work.getReducer().updateCounters(ctrs);
+ if (work.getReduceWork() != null) {
+ work.getReduceWork().getReducer().updateCounters(ctrs);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Jul 29 15:50:12 2013
@@ -30,11 +30,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -45,10 +45,10 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.StringUtils;
/**
- * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
+ * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
* the bridge between the map-reduce framework and the Hive operator pipeline at
* execution time. It's main responsabilities are:
- *
+ *
* - Load and setup the operator pipeline from XML
* - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
* - Stop execution when the "limit" is reached
@@ -96,7 +96,7 @@ public class ExecMapper extends MapReduc
jc = job;
execContext.setJc(jc);
// create map and fetch operators
- MapredWork mrwork = Utilities.getMapRedWork(job);
+ MapWork mrwork = Utilities.getMapWork(job);
mo = new MapOperator();
mo.setConf(mrwork);
// initialize map operator
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Mon Jul 29 15:50:12 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
@@ -112,7 +112,7 @@ public class ExecReducer extends MapRedu
l4j.info("cannot get classpath: " + e.getMessage());
}
jc = job;
- MapredWork gWork = Utilities.getMapRedWork(job);
+ ReduceWork gWork = Utilities.getReduceWork(job);
reducer = gWork.getReducer();
reducer.setParentOperators(null); // clear out any parents as reducer is the
// root
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Mon Jul 29 15:50:12 2013
@@ -40,8 +40,10 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
@@ -101,7 +103,7 @@ public class MapRedTask extends ExecDriv
conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
if (inputSummary == null) {
- inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
// set the values of totalInputFileSize and totalInputNumFiles, estimating them
@@ -109,7 +111,7 @@ public class MapRedTask extends ExecDriv
estimateInputSize();
// at this point the number of reducers is precisely defined in the plan
- int numReducers = work.getNumReduceTasks();
+ int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
if (LOG.isDebugEnabled()) {
LOG.debug("Task: " + getId() + ", Summary: " +
@@ -177,7 +179,7 @@ public class MapRedTask extends ExecDriv
OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializeMapRedWork(plan, out);
+ Utilities.serializeObject(plan, out);
String isSilent = "true".equalsIgnoreCase(System
.getProperty("test.silent")) ? "-nolog" : "";
@@ -383,26 +385,26 @@ public class MapRedTask extends ExecDriv
* Set the number of reducers for the mapred work.
*/
private void setNumberOfReducers() throws IOException {
+ ReduceWork rWork = work.getReduceWork();
// this is a temporary hack to fix things that are not fixed in the compiler
- Integer numReducersFromWork = work.getNumReduceTasks();
+ Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();
- if (work.getReducer() == null) {
+ if (rWork == null) {
console
.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
- work.setNumReduceTasks(Integer.valueOf(0));
} else {
if (numReducersFromWork >= 0) {
console.printInfo("Number of reduce tasks determined at compile time: "
- + work.getNumReduceTasks());
+ + rWork.getNumReduceTasks());
} else if (job.getNumReduceTasks() > 0) {
int reducers = job.getNumReduceTasks();
- work.setNumReduceTasks(reducers);
+ rWork.setNumReduceTasks(reducers);
console
.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ reducers);
} else {
int reducers = estimateNumberOfReducers();
- work.setNumReduceTasks(reducers);
+ rWork.setNumReduceTasks(reducers);
console
.printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+ reducers);
@@ -437,7 +439,7 @@ public class MapRedTask extends ExecDriv
if(inputSummary == null) {
// compute the summary and stash it away
- inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
// if all inputs are sampled, we should shrink the size of reducers accordingly.
@@ -459,7 +461,7 @@ public class MapRedTask extends ExecDriv
// and the user has configured Hive to do this, make sure the number of reducers is a
// power of two
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
- work.isFinalMapRed() && !work.getBucketedColsByDirectory().isEmpty()) {
+ work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
int reducersPowerTwo = (int)Math.pow(2, reducersLog);
@@ -497,11 +499,13 @@ public class MapRedTask extends ExecDriv
return;
}
+ MapWork mWork = work.getMapWork();
+
// Initialize the values to be those taken from the input summary
totalInputFileSize = inputSummary.getLength();
totalInputNumFiles = inputSummary.getFileCount();
- if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
// If percentage block sampling wasn't used, we don't need to do any estimation
inputSizeEstimated = true;
return;
@@ -510,10 +514,10 @@ public class MapRedTask extends ExecDriv
// if all inputs are sampled, we should shrink the size of the input accordingly
double highestSamplePercentage = 0;
boolean allSample = false;
- for (String alias : work.getAliasToWork().keySet()) {
- if (work.getNameToSplitSample().containsKey(alias)) {
+ for (String alias : mWork.getAliasToWork().keySet()) {
+ if (mWork.getNameToSplitSample().containsKey(alias)) {
allSample = true;
- Double rate = work.getNameToSplitSample().get(alias).getPercent();
+ Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
if (rate != null && rate > highestSamplePercentage) {
highestSamplePercentage = rate;
}
@@ -580,7 +584,7 @@ public class MapRedTask extends ExecDriv
@Override
public Operator<? extends OperatorDesc> getReducer() {
- return getWork().getReducer();
+ return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer();
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Mon Jul 29 15:50:12 2013
@@ -141,7 +141,7 @@ public class MapredLocalTask extends Tas
OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredLocalWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializeMapRedLocalWork(plan, out);
+ Utilities.serializeObject(plan, out);
String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Mon Jul 29 15:50:12 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -188,7 +189,7 @@ public class CompactIndexHandler extends
if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
// For now, only works if the predicate is a single condition
- MapredWork work = null;
+ MapWork work = null;
String originalInputFormat = null;
for (Task task : driver.getPlan().getRootTasks()) {
// The index query should have one and only one map reduce task in the root tasks
@@ -202,7 +203,9 @@ public class CompactIndexHandler extends
work.setInputFormatSorted(false);
break;
}
- work = (MapredWork)task.getWork();
+ if (task.getWork() != null) {
+ work = ((MapredWork)task.getWork()).getMapWork();
+ }
String inputFormat = work.getInputformat();
originalInputFormat = inputFormat;
if (inputFormat == null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Mon Jul 29 15:50:12 2013
@@ -95,7 +95,7 @@ public class CombineHiveInputFormat<K ex
this.inputSplitShim = inputSplitShim;
if (job != null) {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
- .getMapRedWork(job).getPathToPartitionInfo();
+ .getMapWork(job).getPathToPartitionInfo();
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
@@ -200,7 +200,7 @@ public class CombineHiveInputFormat<K ex
if (inputFormatClassName == null) {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
- .getMapRedWork(getJob()).getPathToPartitionInfo();
+ .getMapWork(getJob()).getPathToPartitionInfo();
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Mon Jul 29 15:50:12 2013
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -249,10 +249,10 @@ public class HiveInputFormat<K extends W
}
protected Map<String, PartitionDesc> pathToPartitionInfo;
- MapredWork mrwork = null;
+ MapWork mrwork = null;
protected void init(JobConf job) {
- mrwork = Utilities.getMapRedWork(job);
+ mrwork = Utilities.getMapWork(job);
pathToPartitionInfo = mrwork.getPathToPartitionInfo();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java Mon Jul 29 15:50:12 2013
@@ -37,10 +37,10 @@ import org.apache.hadoop.mapred.TextInpu
public class SymbolicInputFormat implements ReworkMapredInputFormat {
public void rework(HiveConf job, MapredWork work) throws IOException {
- Map<String, PartitionDesc> pathToParts = work.getPathToPartitionInfo();
+ Map<String, PartitionDesc> pathToParts = work.getMapWork().getPathToPartitionInfo();
List<String> toRemovePaths = new ArrayList<String>();
Map<String, PartitionDesc> toAddPathToPart = new HashMap<String, PartitionDesc>();
- Map<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ Map<String, ArrayList<String>> pathToAliases = work.getMapWork().getPathToAliases();
for (Map.Entry<String, PartitionDesc> pathPartEntry : pathToParts
.entrySet()) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java Mon Jul 29 15:50:12 2013
@@ -18,6 +18,10 @@
package org.apache.hadoop.hive.ql.io.avro;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
@@ -29,18 +33,17 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.*;
-
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
/**
* RecordReader optimized against Avro GenericRecords that returns to record
@@ -67,7 +70,9 @@ public class AvroGenericRecordReader imp
GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>();
- if(latest != null) gdr.setExpected(latest);
+ if(latest != null) {
+ gdr.setExpected(latest);
+ }
this.reader = new DataFileReader<GenericRecord>(new FsInput(split.getPath(), job), gdr);
this.reader.sync(split.getStart());
@@ -86,11 +91,11 @@ public class AvroGenericRecordReader imp
FileSystem fs = split.getPath().getFileSystem(job);
// Inside of a MR job, we can pull out the actual properties
if(AvroSerdeUtils.insideMRJob(job)) {
- MapredWork mapRedWork = Utilities.getMapRedWork(job);
+ MapWork mapWork = Utilities.getMapWork(job);
// Iterate over the Path -> Partition descriptions to find the partition
// that matches our input split.
- for (Map.Entry<String,PartitionDesc> pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){
+ for (Map.Entry<String,PartitionDesc> pathsAndParts: mapWork.getPathToPartitionInfo().entrySet()){
String partitionPath = pathsAndParts.getKey();
if(pathIsInPartition(split.getPath(), partitionPath)) {
if(LOG.isInfoEnabled()) {
@@ -101,11 +106,15 @@ public class AvroGenericRecordReader imp
Properties props = pathsAndParts.getValue().getProperties();
if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) {
return AvroSerdeUtils.determineSchemaOrThrowException(props);
- } else
+ }
+ else {
return null; // If it's not in this property, it won't be in any others
+ }
}
}
- if(LOG.isInfoEnabled()) LOG.info("Unable to match filesplit " + split + " with a partition.");
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Unable to match filesplit " + split + " with a partition.");
+ }
}
// In "select * from table" situations (non-MR), we can add things to the job
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Mon Jul 29 15:50:12 2013
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
try {
addInputPaths(job, work);
- Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+ Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
// logs
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Mon Jul 29 15:50:12 2013
@@ -33,14 +33,14 @@ import org.apache.hadoop.hive.ql.io.Comb
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.Mapper;
@Explain(displayName = "Block level merge")
-public class MergeWork extends MapredWork implements Serializable {
+public class MergeWork extends MapWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -70,9 +70,6 @@ public class MergeWork extends MapredWor
if(this.getPathToPartitionInfo() == null) {
this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
}
- if(this.getNumReduceTasks() == null) {
- this.setNumReduceTasks(0);
- }
for(String path: this.inputPaths) {
this.getPathToPartitionInfo().put(path, partDesc);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Mon Jul 29 15:50:12 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -181,7 +182,9 @@ public class PartialScanTask extends Tas
try {
addInputPaths(job, work);
- Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+ MapredWork mrWork = new MapredWork();
+ mrWork.setMapWork(work);
+ Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
// logs