You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/29 17:50:17 UTC

svn commit: r1508111 [1/27] - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/index/compact/ java/org/apache/hadoop/hive/ql/io/ java/org/...

Author: hashutosh
Date: Mon Jul 29 15:50:12 2013
New Revision: 1508111

URL: http://svn.apache.org/r1508111
Log:
HIVE-4825 : Separate MapredWork into MapWork and ReduceWork (Gunther Hagleitner via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/Explain.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
    hive/trunk/ql/src/test/results/clientpositive/alter_partition_coltype.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket3.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket4.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket5.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin10.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin11.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin12.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin13.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin5.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin7.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin8.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin9.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative3.q.out
    hive/trunk/ql/src/test/results/clientpositive/columnstats_partlvl.q.out
    hive/trunk/ql/src/test/results/clientpositive/columnstats_tbllvl.q.out
    hive/trunk/ql/src/test/results/clientpositive/combine2.q.out
    hive/trunk/ql/src/test/results/clientpositive/combine2_hadoop20.q.out
    hive/trunk/ql/src/test/results/clientpositive/ctas.q.out
    hive/trunk/ql/src/test/results/clientpositive/ctas_hadoop20.q.out
    hive/trunk/ql/src/test/results/clientpositive/disable_merge_for_bucketing.q.out
    hive/trunk/ql/src/test/results/clientpositive/dynamic_partition_skip_default.q.out
    hive/trunk/ql/src/test/results/clientpositive/filter_join_breaktask.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_ppr_multi_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_skew_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/input23.q.out
    hive/trunk/ql/src/test/results/clientpositive/input4.q.out
    hive/trunk/ql/src/test/results/clientpositive/input42.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part7.q.out
    hive/trunk/ql/src/test/results/clientpositive/input_part9.q.out
    hive/trunk/ql/src/test/results/clientpositive/join17.q.out
    hive/trunk/ql/src/test/results/clientpositive/join26.q.out
    hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hive/trunk/ql/src/test/results/clientpositive/join32_lessSize.q.out
    hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hive/trunk/ql/src/test/results/clientpositive/join34.q.out
    hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hive/trunk/ql/src/test/results/clientpositive/join9.q.out
    hive/trunk/ql/src/test/results/clientpositive/join_filters_overlap.q.out
    hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_13.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_oneskew_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_oneskew_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_oneskew_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part8.q.out
    hive/trunk/ql/src/test/results/clientpositive/louter_join_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/macro.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/metadataonly1.q.out
    hive/trunk/ql/src/test/results/clientpositive/outer_join_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/parallel_orderby.q.out
    hive/trunk/ql/src/test/results/clientpositive/pcr.q.out
    hive/trunk/ql/src/test/results/clientpositive/plan_json.q.out
    hive/trunk/ql/src/test/results/clientpositive/ppd_join_filter.q.out
    hive/trunk/ql/src/test/results/clientpositive/ppd_union_view.q.out
    hive/trunk/ql/src/test/results/clientpositive/ppr_allchildsarenull.q.out
    hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
    hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner3.q.out
    hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out
    hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
    hive/trunk/ql/src/test/results/clientpositive/router_join_ppr.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample10.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample9.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin9.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_13.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_15.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats0.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats11.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats12.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats13.q.out
    hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
    hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
    hive/trunk/ql/src/test/results/clientpositive/truncate_column_list_bucket.q.out
    hive/trunk/ql/src/test/results/clientpositive/udf_explode.q.out
    hive/trunk/ql/src/test/results/clientpositive/udf_java_method.q.out
    hive/trunk/ql/src/test/results/clientpositive/udf_reflect.q.out
    hive/trunk/ql/src/test/results/clientpositive/udf_reflect2.q.out
    hive/trunk/ql/src/test/results/clientpositive/udtf_explode.q.out
    hive/trunk/ql/src/test/results/clientpositive/union22.q.out
    hive/trunk/ql/src/test/results/clientpositive/union24.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_ppr.q.out
    hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/union.q.xml

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jul 29 15:50:12 2013
@@ -460,12 +460,12 @@ public class Driver implements CommandPr
 
         // serialize the queryPlan
         FileOutputStream fos = new FileOutputStream(queryPlanFileName);
-        Utilities.serializeQueryPlan(plan, fos);
+        Utilities.serializeObject(plan, fos);
         fos.close();
 
         // deserialize the queryPlan
         FileInputStream fis = new FileInputStream(queryPlanFileName);
-        QueryPlan newPlan = Utilities.deserializeQueryPlan(fis, conf);
+        QueryPlan newPlan = Utilities.deserializeObject(fis);
         fis.close();
 
         // Use the deserialized plan
@@ -878,14 +878,17 @@ public class Driver implements CommandPr
 
   public CommandProcessorResponse run(String command) throws CommandNeedRetryException {
     CommandProcessorResponse cpr = runInternal(command);
-    if(cpr.getResponseCode() == 0) 
+    if(cpr.getResponseCode() == 0) {
       return cpr;
+    }
     SessionState ss = SessionState.get();
-    if(ss == null) 
+    if(ss == null) {
       return cpr;
+    }
     MetaDataFormatter mdf = MetaDataFormatUtils.getFormatter(ss.getConf());
-    if(!(mdf instanceof JsonMetaDataFormatter)) 
+    if(!(mdf instanceof JsonMetaDataFormatter)) {
       return cpr;
+    }
     /*Here we want to encode the error in machine readable way (e.g. JSON)
      * Ideally, errorCode would always be set to a canonical error defined in ErrorMsg.
      * In practice that is rarely the case, so the messy logic below tries to tease

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Mon Jul 29 15:50:12 2013
@@ -233,7 +233,7 @@ public class QueryPlan implements Serial
         mapTask.setTaskId(stage.getStageId() + "_MAP");
         mapTask.setTaskType(TaskType.MAP);
         stage.addToTaskList(mapTask);
-        populateOperatorGraph(mapTask, mrTask.getWork().getAliasToWork()
+        populateOperatorGraph(mapTask, mrTask.getWork().getMapWork().getAliasToWork()
             .values());
 
         // populate reduce task
@@ -245,7 +245,7 @@ public class QueryPlan implements Serial
           stage.addToTaskList(reduceTask);
           Collection<Operator<? extends OperatorDesc>> reducerTopOps =
             new ArrayList<Operator<? extends OperatorDesc>>();
-          reducerTopOps.add(mrTask.getWork().getReducer());
+          reducerTopOps.add(mrTask.getWork().getReduceWork().getReducer());
           populateOperatorGraph(reduceTask, reducerTopOps);
         }
       } else {
@@ -382,7 +382,7 @@ public class QueryPlan implements Serial
       }
       if (task instanceof ExecDriver) {
         ExecDriver mrTask = (ExecDriver) task;
-        extractOperatorCounters(mrTask.getWork().getAliasToWork().values(),
+        extractOperatorCounters(mrTask.getWork().getMapWork().getAliasToWork().values(),
             task.getId() + "_MAP");
         if (mrTask.mapStarted()) {
           started.add(task.getId() + "_MAP");
@@ -393,7 +393,7 @@ public class QueryPlan implements Serial
         if (mrTask.hasReduce()) {
           Collection<Operator<? extends OperatorDesc>> reducerTopOps =
             new ArrayList<Operator<? extends OperatorDesc>>();
-          reducerTopOps.add(mrTask.getWork().getReducer());
+          reducerTopOps.add(mrTask.getWork().getReduceWork().getReducer());
           extractOperatorCounters(reducerTopOps, task.getId() + "_REDUCE");
           if (mrTask.reduceStarted()) {
             started.add(task.getId() + "_REDUCE");

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Mon Jul 29 15:50:12 2013
@@ -121,8 +121,9 @@ public class ExplainTask extends Task<Ex
     }
 
     if (work.getParseContext() != null) {
-      JSONObject jsonPlan = outputMap(work.getParseContext().getTopOps(),
-          "LOGICAL PLAN", out, jsonOutput, work.getExtended(), 0);
+      out.print("LOGICAL PLAN");
+      JSONObject jsonPlan = outputMap(work.getParseContext().getTopOps(), true,
+                                      out, jsonOutput, work.getExtended(), 0);
       if (out != null) {
         out.println();
       }
@@ -228,19 +229,16 @@ public class ExplainTask extends Task<Ex
     return sb.toString();
   }
 
-  private JSONObject outputMap(Map<?, ?> mp, String header, PrintStream out,
+  private JSONObject outputMap(Map<?, ?> mp, boolean hasHeader, PrintStream out,
       boolean extended, boolean jsonOutput, int indent) throws Exception {
 
-    boolean first_el = true;
     TreeMap<Object, Object> tree = new TreeMap<Object, Object>();
     tree.putAll(mp);
     JSONObject json = jsonOutput ? new JSONObject() : null;
+    if (out != null && hasHeader && !mp.isEmpty()) {
+      out.println();
+    }
     for (Entry<?, ?> ent : tree.entrySet()) {
-      if (first_el && (out != null)) {
-        out.println(header);
-      }
-      first_el = false;
-
       // Print the key
       if (out != null) {
         out.print(indentString(indent));
@@ -286,7 +284,7 @@ public class ExplainTask extends Task<Ex
     return jsonOutput ? json : null;
   }
 
-  private JSONArray outputList(List<?> l, String header, PrintStream out,
+  private JSONArray outputList(List<?> l, PrintStream out, boolean hasHeader,
       boolean extended, boolean jsonOutput, int indent) throws Exception {
 
     boolean first_el = true;
@@ -294,10 +292,6 @@ public class ExplainTask extends Task<Ex
     JSONArray outputArray = new JSONArray();
 
     for (Object o : l) {
-      if (first_el && (out != null)) {
-        out.print(header);
-      }
-
       if (isPrintable(o)) {
         String delim = first_el ? " " : ", ";
         if (out != null) {
@@ -311,11 +305,11 @@ public class ExplainTask extends Task<Ex
         nl = true;
       }
       else if (o instanceof Serializable) {
-        if (first_el && (out != null)) {
+        if (first_el && (out != null) && hasHeader) {
           out.println();
         }
         JSONObject jsonOut = outputPlan((Serializable) o, out, extended,
-            jsonOutput, jsonOutput ? 0 : indent + 2);
+            jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent));
         if (jsonOutput) {
           outputArray.put(jsonOut);
         }
@@ -439,10 +433,14 @@ public class ExplainTask extends Task<Ex
           }
 
           String header = null;
+          boolean skipHeader = xpl_note.skipHeader();
+          boolean emptyHeader = false;
+
           if (!xpl_note.displayName().equals("")) {
             header = indentString(prop_indents) + xpl_note.displayName() + ":";
           }
           else {
+            emptyHeader = true;
             prop_indents = indent;
             header = indentString(prop_indents);
           }
@@ -450,7 +448,9 @@ public class ExplainTask extends Task<Ex
           // Try the output as a primitive object
           if (isPrintable(val)) {
             if (out != null && shouldPrint(xpl_note, val)) {
-              out.printf("%s ", header);
+              if (!skipHeader) {
+                out.printf("%s ", header);
+              }
               out.println(val);
             }
             if (jsonOutput) {
@@ -458,12 +458,26 @@ public class ExplainTask extends Task<Ex
             }
             continue;
           }
+
+          int ind = 0;
+          if (!jsonOutput) {
+            if (!skipHeader) {
+              ind = prop_indents + 2;
+            } else {
+              ind = indent;
+            }
+          }
+
           // Try this as a map
           try {
             // Go through the map and print out the stuff
             Map<?, ?> mp = (Map<?, ?>) val;
-            JSONObject jsonOut = outputMap(mp, header, out, extended, jsonOutput,
-                jsonOutput ? 0 : prop_indents + 2);
+
+            if (out != null && !skipHeader && mp != null && !mp.isEmpty()) {
+              out.print(header);
+            }
+
+            JSONObject jsonOut = outputMap(mp, !skipHeader && !emptyHeader, out, extended, jsonOutput, ind);
             if (jsonOutput) {
               json.put(header, jsonOut);
             }
@@ -476,8 +490,12 @@ public class ExplainTask extends Task<Ex
           // Try this as a list
           try {
             List<?> l = (List<?>) val;
-            JSONArray jsonOut = outputList(l, header, out, extended, jsonOutput,
-                jsonOutput ? 0 : prop_indents + 2);
+
+            if (out != null && !skipHeader && l != null && !l.isEmpty()) {
+              out.print(header);
+            }
+
+            JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, jsonOutput, ind);
 
             if (jsonOutput) {
               json.put(header, jsonOut);
@@ -492,11 +510,11 @@ public class ExplainTask extends Task<Ex
           // Finally check if it is serializable
           try {
             Serializable s = (Serializable) val;
-            if (out != null) {
+
+            if (!skipHeader && out != null) {
               out.println(header);
             }
-            JSONObject jsonOut = outputPlan(s, out, extended, jsonOutput,
-                jsonOutput ? 0 : prop_indents + 2);
+            JSONObject jsonOut = outputPlan(s, out, extended, jsonOutput, ind);
             if (jsonOutput) {
               json.put(header, jsonOut);
             }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Mon Jul 29 15:50:12 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -63,7 +63,7 @@ import org.apache.hadoop.util.StringUtil
  * different from regular operators in that it starts off by processing a
  * Writable data structure from a Table (instead of a Hive Object).
  **/
-public class MapOperator extends Operator<MapredWork> implements Serializable, Cloneable {
+public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
 
   private static final long serialVersionUID = 1L;
 
@@ -229,14 +229,14 @@ public class MapOperator extends Operato
    * @param mrwork
    * @throws HiveException
    */
-  public void initializeAsRoot(Configuration hconf, MapredWork mrwork)
+  public void initializeAsRoot(Configuration hconf, MapWork mapWork)
       throws HiveException {
-    setConf(mrwork);
+    setConf(mapWork);
     setChildren(hconf);
     initialize(hconf, null);
   }
 
-  private MapOpCtx initObjectInspector(MapredWork conf,
+  private MapOpCtx initObjectInspector(MapWork conf,
       Configuration hconf, String onefile, Map<TableDesc, StructObjectInspector> convertedOI)
           throws HiveException,
       ClassNotFoundException, InstantiationException, IllegalAccessException,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Jul 29 15:50:12 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.Dy
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -306,9 +307,13 @@ public class MoveTask extends Task<MoveW
             // the directory this move task is moving
             if (task instanceof MapRedTask) {
               MapredWork work = (MapredWork)task.getWork();
-              bucketCols = work.getBucketedColsByDirectory().get(path);
-              sortCols = work.getSortedColsByDirectory().get(path);
-              numBuckets = work.getNumReduceTasks();
+              MapWork mapWork = work.getMapWork();
+              bucketCols = mapWork.getBucketedColsByDirectory().get(path);
+              sortCols = mapWork.getSortedColsByDirectory().get(path);
+              if (work.getReduceWork() != null) {
+                numBuckets = work.getReduceWork().getNumReduceTasks();
+              }
+
               if (bucketCols != null || sortCols != null) {
                 // This must be a final map reduce task (the task containing the file sink
                 // operator that writes the final output)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jul 29 15:50:12 2013
@@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -121,11 +122,13 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.Adjacency;
 import org.apache.hadoop.hive.ql.plan.api.Graph;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -169,6 +172,8 @@ public final class Utilities {
    */
 
   public static String HADOOP_LOCAL_FS = "file:///";
+  public static String MAP_PLAN_NAME = "map.xml";
+  public static String REDUCE_PLAN_NAME = "reduce.xml";
 
   /**
    * ReduceField:
@@ -191,56 +196,85 @@ public final class Utilities {
     // prevent instantiation
   }
 
-  private static Map<String, MapredWork> gWorkMap = Collections
-      .synchronizedMap(new HashMap<String, MapredWork>());
+  private static Map<Path, BaseWork> gWorkMap = Collections
+      .synchronizedMap(new HashMap<Path, BaseWork>());
   private static final Log LOG = LogFactory.getLog(Utilities.class.getName());
 
-  public static void clearMapRedWork(Configuration job) {
+  public static void clearWork(Configuration conf) {
+    Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
+    Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME);
+
+    // if the plan path hasn't been initialized just return, nothing to clean.
+    if (mapPath == null || reducePath == null) {
+      return;
+    }
+
     try {
-      Path planPath = new Path(HiveConf.getVar(job, HiveConf.ConfVars.PLAN));
-      FileSystem fs = planPath.getFileSystem(job);
-      if (fs.exists(planPath)) {
-        try {
-          fs.delete(planPath, true);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
+      FileSystem fs = mapPath.getFileSystem(conf);
+      if (fs.exists(mapPath)) {
+        fs.delete(mapPath, true);
+      }
+      if (fs.exists(reducePath)) {
+        fs.delete(reducePath, true);
       }
+
     } catch (Exception e) {
+      LOG.warn("Failed to clean-up tmp directories.", e);
     } finally {
       // where a single process works with multiple plans - we must clear
       // the cache before working with the next plan.
-      String jobID = getHiveJobID(job);
-      if (jobID != null) {
-        gWorkMap.remove(jobID);
+      if (mapPath != null) {
+        gWorkMap.remove(mapPath);
+      }
+      if (reducePath != null) {
+        gWorkMap.remove(reducePath);
       }
     }
   }
 
-  public static MapredWork getMapRedWork(Configuration job) {
-    MapredWork gWork = null;
+  public static MapredWork getMapRedWork(Configuration conf) {
+    MapredWork w = new MapredWork();
+    w.setMapWork(getMapWork(conf));
+    w.setReduceWork(getReduceWork(conf));
+    return w;
+  }
+
+  public static MapWork getMapWork(Configuration conf) {
+    return (MapWork) getBaseWork(conf, MAP_PLAN_NAME);
+  }
+
+  public static ReduceWork getReduceWork(Configuration conf) {
+    return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
+  }
+
+  public static BaseWork getBaseWork(Configuration conf, String name) {
+    BaseWork gWork = null;
+    Path path = null;
     try {
-      String jobID = getHiveJobID(job);
-      assert jobID != null;
-      gWork = gWorkMap.get(jobID);
+      path = getPlanPath(conf, name);
+      assert path != null;
+      gWork = gWorkMap.get(path);
       if (gWork == null) {
-        String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job);
-        String path;
+        String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf);
+        Path localPath;
         if (jtConf.equals("local")) {
-          String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
-          path = new Path(planPath).toUri().getPath();
+          localPath = path;
         } else {
-          path = "HIVE_PLAN" + jobID;
+          localPath = new Path(name);
         }
-        InputStream in = new FileInputStream(path);
-        MapredWork ret = deserializeMapRedWork(in, job);
+        InputStream in = new FileInputStream(localPath.toUri().getPath());
+        BaseWork ret = deserializeObject(in);
         gWork = ret;
-        gWork.initialize();
-        gWorkMap.put(jobID, gWork);
+        gWorkMap.put(path, gWork);
       }
-      return (gWork);
+      return gWork;
+    } catch (FileNotFoundException fnf) {
+      // happens. e.g.: no reduce work.
+      LOG.debug("No plan file found: "+path);
+      return null;
     } catch (Exception e) {
       e.printStackTrace();
+      LOG.error("Failed to load plan: "+path, e);
       throw new RuntimeException(e);
     }
   }
@@ -414,47 +448,80 @@ public final class Utilities {
     }
   }
 
-  public static void setMapRedWork(Configuration job, MapredWork w, String hiveScratchDir) {
+  public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
+    setMapWork(conf, w.getMapWork(), hiveScratchDir);
+    if (w.getReduceWork() != null) {
+      setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+    }
+  }
+
+  public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
+    setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+  }
+
+  public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
+    setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+  }
+
+  private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
     try {
+      setPlanPath(conf, hiveScratchDir);
 
-      // this is the unique job ID, which is kept in JobConf as part of the plan file name
-      String jobID = UUID.randomUUID().toString();
-      Path planPath = new Path(hiveScratchDir, jobID);
-      HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+      Path planPath = getPlanPath(conf, name);
 
-      // use the default file system of the job
-      FileSystem fs = planPath.getFileSystem(job);
+      // use the default file system of the conf
+      FileSystem fs = planPath.getFileSystem(conf);
       FSDataOutputStream out = fs.create(planPath);
-      serializeMapRedWork(w, out);
+      serializeObject(w, out);
 
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
       // able to get the plan directly from the cache
-      if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
+      if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
         // Set up distributed cache
-        DistributedCache.createSymlink(job);
-        String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID;
-        DistributedCache.addCacheFile(new URI(uriWithLink), job);
+        if (!DistributedCache.getSymlink(conf)) {
+          DistributedCache.createSymlink(conf);
+        }
+        String uriWithLink = planPath.toUri().toString() + "#" + name;
+        DistributedCache.addCacheFile(new URI(uriWithLink), conf);
 
         // set replication of the plan file to a high number. we use the same
         // replication factor as used by the hadoop jobclient for job.xml etc.
-        short replication = (short) job.getInt("mapred.submit.replication", 10);
+        short replication = (short) conf.getInt("mapred.submit.replication", 10);
         fs.setReplication(planPath, replication);
       }
 
       // Cache the plan in this process
-      w.initialize();
-      gWorkMap.put(jobID, w);
+      gWorkMap.put(planPath, w);
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);
     }
   }
 
-  public static String getHiveJobID(Configuration job) {
-    String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
-    if (planPath != null && !planPath.isEmpty()) {
-      return (new Path(planPath)).getName();
+  private static Path getPlanPath(Configuration conf, String name) {
+    Path planPath = getPlanPath(conf);
+    if (planPath == null) {
+      return null;
+    }
+    return new Path(planPath, name);
+  }
+
+  private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException {
+    if (getPlanPath(conf) == null) {
+      // this is the unique conf ID, which is kept in JobConf as part of the plan file name
+      String jobID = UUID.randomUUID().toString();
+      Path planPath = new Path(hiveScratchDir, jobID);
+      FileSystem fs = planPath.getFileSystem(conf);
+      fs.mkdirs(planPath);
+      HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+    }
+  }
+
+  private static Path getPlanPath(Configuration conf) {
+    String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
+    if (plan != null && !plan.isEmpty()) {
+      return new Path(plan);
     }
     return null;
   }
@@ -495,27 +562,6 @@ public final class Utilities {
     }
   }
 
-  /**
-   * Serialize a single Task.
-   */
-  public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
-    XMLEncoder e = null;
-    try {
-      e = new XMLEncoder(out);
-      // workaround for java 1.5
-      e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-      e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-      e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
-      e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
-      e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
-      e.writeObject(t);
-    } finally {
-      if (null != e) {
-        e.close();
-      }
-    }
-  }
-
   public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
@@ -532,14 +578,15 @@ public final class Utilities {
   }
 
   /**
-   * Serialize the whole query plan.
+   * Serialize the object. This helper function mainly makes sure that enums,
+   * counters, etc are handled properly.
    */
-  public static void serializeQueryPlan(QueryPlan plan, OutputStream out) {
+  public static void serializeObject(Object plan, OutputStream out) {
     XMLEncoder e = new XMLEncoder(out);
     e.setExceptionListener(new ExceptionListener() {
       public void exceptionThrown(Exception e) {
         LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
-        throw new RuntimeException("Cannot serialize the query plan", e);
+        throw new RuntimeException("Cannot serialize object", e);
       }
     });
     // workaround for java 1.5
@@ -557,83 +604,14 @@ public final class Utilities {
   }
 
   /**
-   * Deserialize the whole query plan.
-   */
-  public static QueryPlan deserializeQueryPlan(InputStream in, Configuration conf) {
-    XMLDecoder d = null;
-    try {
-      d = new XMLDecoder(in, null, null);
-      QueryPlan ret = (QueryPlan) d.readObject();
-      return (ret);
-    } finally {
-      if (null != d) {
-        d.close();
-      }
-    }
-  }
-
-  /**
-   * Serialize the mapredWork object to an output stream. DO NOT use this to write to standard
-   * output since it closes the output stream. DO USE mapredWork.toXML() instead.
-   */
-  public static void serializeMapRedWork(MapredWork w, OutputStream out) {
-    XMLEncoder e = null;
-    try {
-      e = new XMLEncoder(out);
-      // workaround for java 1.5
-      e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-      e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-      e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
-      e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
-      e.writeObject(w);
-    } finally {
-      if (null != e) {
-        e.close();
-      }
-    }
-
-  }
-
-  public static MapredWork deserializeMapRedWork(InputStream in, Configuration conf) {
-    XMLDecoder d = null;
-    try {
-      d = new XMLDecoder(in, null, null);
-      MapredWork ret = (MapredWork) d.readObject();
-      return (ret);
-    } finally {
-      if (null != d) {
-        d.close();
-      }
-    }
-  }
-
-  /**
-   * Serialize the mapredLocalWork object to an output stream. DO NOT use this to write to standard
-   * output since it closes the output stream. DO USE mapredWork.toXML() instead.
+   * De-serialize an object. This helper function mainly makes sure that enums,
+   * counters, etc are handled properly.
    */
-  public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) {
-    XMLEncoder e = null;
-    try {
-      e = new XMLEncoder(out);
-      // workaround for java 1.5
-      e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-      e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-      e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
-      e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
-      e.writeObject(w);
-    } finally {
-      if (null != e) {
-        e.close();
-      }
-    }
-  }
-
-  public static MapredLocalWork deserializeMapRedLocalWork(InputStream in, Configuration conf) {
+  public static <T> T deserializeObject(InputStream in) {
     XMLDecoder d = null;
     try {
       d = new XMLDecoder(in, null, null);
-      MapredLocalWork ret = (MapredLocalWork) d.readObject();
-      return (ret);
+      return (T) d.readObject();
     } finally {
       if (null != d) {
         d.close();
@@ -1812,7 +1790,7 @@ public final class Utilities {
    * @return the summary of all the input paths.
    * @throws IOException
    */
-  public static ContentSummary getInputSummary(Context ctx, MapredWork work, PathFilter filter)
+  public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
       throws IOException {
 
     long[] summary = {0, 0, 0};
@@ -2273,7 +2251,7 @@ public final class Utilities {
       try {
         MapredWork mapredWork = ((MapRedTask) task).getWork();
         Set<Class<? extends InputFormat>> reworkInputFormats = new HashSet<Class<? extends InputFormat>>();
-        for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) {
+        for (PartitionDesc part : mapredWork.getMapWork().getPathToPartitionInfo().values()) {
           Class<? extends InputFormat> inputFormatCls = part
               .getInputFileFormatClass();
           if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Jul 29 15:50:12 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
 import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -63,7 +64,6 @@ import org.apache.hadoop.hive.ql.exec.Pa
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -73,10 +73,12 @@ import org.apache.hadoop.hive.ql.io.OneN
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -102,7 +104,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.varia.NullAppender;
 
 /**
- * ExecDriver is the central class in co-ordinating execution of any map-reduce task. 
+ * ExecDriver is the central class in co-ordinating execution of any map-reduce task.
  * It's main responsabilities are:
  *
  * - Converting the plan (MapredWork) into a MR Job (JobConf)
@@ -196,13 +198,13 @@ public class ExecDriver extends Task<Map
    * @return true if fatal errors happened during job execution, false otherwise.
    */
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
-    for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+    for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
       if (op.checkFatalErrors(ctrs, errMsg)) {
         return true;
       }
     }
-    if (work.getReducer() != null) {
-      if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
+    if (work.getReduceWork() != null) {
+      if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) {
         return true;
       }
     }
@@ -211,18 +213,18 @@ public class ExecDriver extends Task<Map
 
   protected void createTmpDirs() throws IOException {
     // fix up outputs
-    Map<String, ArrayList<String>> pa = work.getPathToAliases();
+    Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
     if (pa != null) {
       List<Operator<? extends OperatorDesc>> opList =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
-      if (work.getReducer() != null) {
-        opList.add(work.getReducer());
+      if (work.getReduceWork() != null) {
+        opList.add(work.getReduceWork().getReducer());
       }
 
       for (List<String> ls : pa.values()) {
         for (String a : ls) {
-          opList.add(work.getAliasToWork().get(a));
+          opList.add(work.getMapWork().getAliasToWork().get(a));
 
           while (!opList.isEmpty()) {
             Operator<? extends OperatorDesc> op = opList.remove(0);
@@ -251,6 +253,7 @@ public class ExecDriver extends Task<Map
    /**
    * Execute a query plan using Hadoop.
    */
+  @SuppressWarnings("deprecation")
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -259,16 +262,14 @@ public class ExecDriver extends Task<Map
 
     boolean success = true;
 
-    String invalidReason = work.isInvalid();
-    if (invalidReason != null) {
-      throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
-    }
-
     Context ctx = driverContext.getCtx();
     boolean ctxCreated = false;
     String emptyScratchDirStr;
     Path emptyScratchDir;
 
+    MapWork mWork = work.getMapWork();
+    ReduceWork rWork = work.getReduceWork();
+
     try {
       if (ctx == null) {
         ctx = new Context(job);
@@ -301,27 +302,27 @@ public class ExecDriver extends Task<Map
       throw new RuntimeException(e.getMessage());
     }
 
-    if (work.getNumMapTasks() != null) {
-      job.setNumMapTasks(work.getNumMapTasks().intValue());
+    if (mWork.getNumMapTasks() != null) {
+      job.setNumMapTasks(mWork.getNumMapTasks().intValue());
     }
 
-    if (work.getMaxSplitSize() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
+    if (mWork.getMaxSplitSize() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, mWork.getMaxSplitSize().longValue());
     }
 
-    if (work.getMinSplitSize() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
+    if (mWork.getMinSplitSize() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, mWork.getMinSplitSize().longValue());
     }
 
-    if (work.getMinSplitSizePerNode() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
+    if (mWork.getMinSplitSizePerNode() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, mWork.getMinSplitSizePerNode().longValue());
     }
 
-    if (work.getMinSplitSizePerRack() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
+    if (mWork.getMinSplitSizePerRack() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, mWork.getMinSplitSizePerRack().longValue());
     }
 
-    job.setNumReduceTasks(work.getNumReduceTasks().intValue());
+    job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
     job.setReducerClass(ExecReducer.class);
 
     // set input format information if necessary
@@ -338,7 +339,7 @@ public class ExecDriver extends Task<Map
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
     }
 
-    if (getWork().isUseBucketizedHiveInputFormat()) {
+    if (mWork.isUseBucketizedHiveInputFormat()) {
       inpFormat = BucketizedHiveInputFormat.class.getName();
     }
 
@@ -387,11 +388,11 @@ public class ExecDriver extends Task<Map
     }
 
     try{
-      MapredLocalWork localwork = work.getMapLocalWork();
+      MapredLocalWork localwork = mWork.getMapLocalWork();
       if (localwork != null) {
         if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
           Path localPath = new Path(localwork.getTmpFileURI());
-          Path hdfsPath = new Path(work.getTmpHDFSFileURI());
+          Path hdfsPath = new Path(mWork.getTmpHDFSFileURI());
 
           FileSystem hdfs = hdfsPath.getFileSystem(job);
           FileSystem localFS = localPath.getFileSystem(job);
@@ -429,17 +430,17 @@ public class ExecDriver extends Task<Map
         }
       }
       work.configureJobConf(job);
-      addInputPaths(job, work, emptyScratchDirStr, ctx);
+      addInputPaths(job, mWork, emptyScratchDirStr, ctx);
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
 
-      if (work.getSamplingType() > 0 && work.getNumReduceTasks() > 1) {
+      if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
         try {
-          handleSampling(driverContext, work, job, new HiveConf(conf));
+          handleSampling(driverContext, mWork, job, conf);
           job.setPartitionerClass(HiveTotalOrderPartitioner.class);
         } catch (Exception e) {
           console.printInfo("Not enough sampling data.. Rolling back to single reducer task");
-          work.setNumReduceTasks(1);
+          rWork.setNumReduceTasks(1);
           job.setNumReduceTasks(1);
         }
       }
@@ -454,7 +455,7 @@ public class ExecDriver extends Task<Map
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
-      if (work.isGatheringStats()) {
+      if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
         // initialize stats publishing table
         StatsPublisher statsPublisher;
         String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS);
@@ -496,7 +497,7 @@ public class ExecDriver extends Task<Map
       success = false;
       returnVal = 1;
     } finally {
-      Utilities.clearMapRedWork(job);
+      Utilities.clearWork(job);
       try {
         if (ctxCreated) {
           ctx.clear();
@@ -517,13 +518,13 @@ public class ExecDriver extends Task<Map
     try {
       if (rj != null) {
         JobCloseFeedBack feedBack = new JobCloseFeedBack();
-        if (work.getAliasToWork() != null) {
-          for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+        if (mWork.getAliasToWork() != null) {
+          for (Operator<? extends OperatorDesc> op : mWork.getAliasToWork().values()) {
             op.jobClose(job, success, feedBack);
           }
         }
-        if (work.getReducer() != null) {
-          work.getReducer().jobClose(job, success, feedBack);
+        if (rWork != null) {
+          rWork.getReducer().jobClose(job, success, feedBack);
         }
       }
     } catch (Exception e) {
@@ -539,16 +540,16 @@ public class ExecDriver extends Task<Map
     return (returnVal);
   }
 
-  private void handleSampling(DriverContext context, MapredWork work, JobConf job, HiveConf conf)
+  private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf)
       throws Exception {
-    assert work.getAliasToWork().keySet().size() == 1;
+    assert mWork.getAliasToWork().keySet().size() == 1;
 
-    String alias = work.getAliases().get(0);
-    Operator<?> topOp = work.getAliasToWork().get(alias);
-    PartitionDesc partDesc = work.getAliasToPartnInfo().get(alias);
+    String alias = mWork.getAliases().get(0);
+    Operator<?> topOp = mWork.getAliasToWork().get(alias);
+    PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias);
 
-    ArrayList<String> paths = work.getPaths();
-    ArrayList<PartitionDesc> parts = work.getPartitionDescs();
+    ArrayList<String> paths = mWork.getPaths();
+    ArrayList<PartitionDesc> parts = mWork.getPartitionDescs();
 
     Path onePath = new Path(paths.get(0));
     String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
@@ -558,7 +559,7 @@ public class ExecDriver extends Task<Map
 
     PartitionKeySampler sampler = new PartitionKeySampler();
 
-    if (work.getSamplingType() == MapredWork.SAMPLING_ON_PREV_MR) {
+    if (mWork.getSamplingType() == MapWork.SAMPLING_ON_PREV_MR) {
       console.printInfo("Use sampling data created in previous MR");
       // merges sampling data from previous MR and make paritition keys for total sort
       for (String path : paths) {
@@ -568,7 +569,7 @@ public class ExecDriver extends Task<Map
           sampler.addSampleFile(status.getPath(), job);
         }
       }
-    } else if (work.getSamplingType() == MapredWork.SAMPLING_ON_START) {
+    } else if (mWork.getSamplingType() == MapWork.SAMPLING_ON_START) {
       console.printInfo("Creating sampling data..");
       assert topOp instanceof TableScanOperator;
       TableScanOperator ts = (TableScanOperator) topOp;
@@ -592,7 +593,7 @@ public class ExecDriver extends Task<Map
         fetcher.clearFetchContext();
       }
     } else {
-      throw new IllegalArgumentException("Invalid sampling type " + work.getSamplingType());
+      throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
     }
     sampler.writePartitionKeys(partitionFile, job);
   }
@@ -601,16 +602,17 @@ public class ExecDriver extends Task<Map
    * Set hive input format, and input format file if necessary.
    */
   protected void setInputAttributes(Configuration conf) {
-    if (work.getInputformat() != null) {
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
-    }
-    if (work.getIndexIntermediateFile() != null) {
-      conf.set("hive.index.compact.file", work.getIndexIntermediateFile());
-      conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile());
+    MapWork mWork = work.getMapWork();
+    if (mWork.getInputformat() != null) {
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+    }
+    if (mWork.getIndexIntermediateFile() != null) {
+      conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+      conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
     }
 
     // Intentionally overwrites anything the user may have put here
-    conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted());
+    conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
   }
 
   public boolean mapStarted() {
@@ -757,12 +759,12 @@ public class ExecDriver extends Task<Map
     int ret;
     if (localtask) {
       memoryMXBean = ManagementFactory.getMemoryMXBean();
-      MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf);
+      MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
       MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
       ret = ed.executeFromChildJVM(new DriverContext());
 
     } else {
-      MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
+      MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
       ExecDriver ed = new ExecDriver(plan, conf, isSilent);
       ret = ed.execute(new DriverContext());
     }
@@ -823,19 +825,19 @@ public class ExecDriver extends Task<Map
 
   @Override
   public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
-    return getWork().getAliasToWork().values();
+    return getWork().getMapWork().getAliasToWork().values();
   }
 
   @Override
   public boolean hasReduce() {
     MapredWork w = getWork();
-    return w.getReducer() != null;
+    return w.getReduceWork() != null;
   }
 
   /**
    * Handle a empty/null path for a given alias.
    */
-  private static int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir,
+  private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
       int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
     // either the directory does not exist or it is empty
     assert path == null || isEmptyPath;
@@ -919,7 +921,7 @@ public class ExecDriver extends Task<Map
     return numEmptyPaths;
   }
 
-  public static void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx)
+  public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
       throws Exception {
     int numEmptyPaths = 0;
 
@@ -1002,11 +1004,11 @@ public class ExecDriver extends Task<Map
 
   @Override
   public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
-    for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+    for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
       op.updateCounters(ctrs);
     }
-    if (work.getReducer() != null) {
-      work.getReducer().updateCounters(ctrs);
+    if (work.getReduceWork() != null) {
+      work.getReduceWork().getReducer().updateCounters(ctrs);
     }
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Jul 29 15:50:12 2013
@@ -30,11 +30,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -45,10 +45,10 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.StringUtils;
 
 /**
- * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is 
+ * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
  * the bridge between the map-reduce framework and the Hive operator pipeline at
  * execution time. It's main responsabilities are:
- * 
+ *
  * - Load and setup the operator pipeline from XML
  * - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
  * - Stop execution when the "limit" is reached
@@ -96,7 +96,7 @@ public class ExecMapper extends MapReduc
       jc = job;
       execContext.setJc(jc);
       // create map and fetch operators
-      MapredWork mrwork = Utilities.getMapRedWork(job);
+      MapWork mrwork = Utilities.getMapWork(job);
       mo = new MapOperator();
       mo.setConf(mrwork);
       // initialize map operator

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Mon Jul 29 15:50:12 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
@@ -112,7 +112,7 @@ public class ExecReducer extends MapRedu
       l4j.info("cannot get classpath: " + e.getMessage());
     }
     jc = job;
-    MapredWork gWork = Utilities.getMapRedWork(job);
+    ReduceWork gWork = Utilities.getReduceWork(job);
     reducer = gWork.getReducer();
     reducer.setParentOperators(null); // clear out any parents as reducer is the
     // root

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Mon Jul 29 15:50:12 2013
@@ -40,8 +40,10 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
@@ -101,7 +103,7 @@ public class MapRedTask extends ExecDriv
           conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
 
         if (inputSummary == null) {
-          inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+          inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
         }
 
         // set the values of totalInputFileSize and totalInputNumFiles, estimating them
@@ -109,7 +111,7 @@ public class MapRedTask extends ExecDriv
         estimateInputSize();
 
         // at this point the number of reducers is precisely defined in the plan
-        int numReducers = work.getNumReduceTasks();
+        int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Task: " + getId() + ", Summary: " +
@@ -177,7 +179,7 @@ public class MapRedTask extends ExecDriv
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeMapRedWork(plan, out);
+      Utilities.serializeObject(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System
           .getProperty("test.silent")) ? "-nolog" : "";
@@ -383,26 +385,26 @@ public class MapRedTask extends ExecDriv
    * Set the number of reducers for the mapred work.
    */
   private void setNumberOfReducers() throws IOException {
+    ReduceWork rWork = work.getReduceWork();
     // this is a temporary hack to fix things that are not fixed in the compiler
-    Integer numReducersFromWork = work.getNumReduceTasks();
+    Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();
 
-    if (work.getReducer() == null) {
+    if (rWork == null) {
       console
           .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
-      work.setNumReduceTasks(Integer.valueOf(0));
     } else {
       if (numReducersFromWork >= 0) {
         console.printInfo("Number of reduce tasks determined at compile time: "
-            + work.getNumReduceTasks());
+            + rWork.getNumReduceTasks());
       } else if (job.getNumReduceTasks() > 0) {
         int reducers = job.getNumReduceTasks();
-        work.setNumReduceTasks(reducers);
+        rWork.setNumReduceTasks(reducers);
         console
             .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
             + reducers);
       } else {
         int reducers = estimateNumberOfReducers();
-        work.setNumReduceTasks(reducers);
+        rWork.setNumReduceTasks(reducers);
         console
             .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
             + reducers);
@@ -437,7 +439,7 @@ public class MapRedTask extends ExecDriv
 
     if(inputSummary == null) {
       // compute the summary and stash it away
-      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work, null);
+      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
     }
 
     // if all inputs are sampled, we should shrink the size of reducers accordingly.
@@ -459,7 +461,7 @@ public class MapRedTask extends ExecDriv
     // and the user has configured Hive to do this, make sure the number of reducers is a
     // power of two
     if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
-        work.isFinalMapRed() && !work.getBucketedColsByDirectory().isEmpty()) {
+        work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
 
       int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
       int reducersPowerTwo = (int)Math.pow(2, reducersLog);
@@ -497,11 +499,13 @@ public class MapRedTask extends ExecDriv
       return;
     }
 
+    MapWork mWork = work.getMapWork();
+
     // Initialize the values to be those taken from the input summary
     totalInputFileSize = inputSummary.getLength();
     totalInputNumFiles = inputSummary.getFileCount();
 
-    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+    if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
       // If percentage block sampling wasn't used, we don't need to do any estimation
       inputSizeEstimated = true;
       return;
@@ -510,10 +514,10 @@ public class MapRedTask extends ExecDriv
     // if all inputs are sampled, we should shrink the size of the input accordingly
     double highestSamplePercentage = 0;
     boolean allSample = false;
-    for (String alias : work.getAliasToWork().keySet()) {
-      if (work.getNameToSplitSample().containsKey(alias)) {
+    for (String alias : mWork.getAliasToWork().keySet()) {
+      if (mWork.getNameToSplitSample().containsKey(alias)) {
         allSample = true;
-        Double rate = work.getNameToSplitSample().get(alias).getPercent();
+        Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
         if (rate != null && rate > highestSamplePercentage) {
           highestSamplePercentage = rate;
         }
@@ -580,7 +584,7 @@ public class MapRedTask extends ExecDriv
 
   @Override
   public Operator<? extends OperatorDesc> getReducer() {
-    return getWork().getReducer();
+    return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer();
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Mon Jul 29 15:50:12 2013
@@ -141,7 +141,7 @@ public class MapredLocalTask extends Tas
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredLocalWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeMapRedLocalWork(plan, out);
+      Utilities.serializeObject(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Mon Jul 29 15:50:12 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -188,7 +189,7 @@ public class CompactIndexHandler extends
 
     if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
       // For now, only works if the predicate is a single condition
-      MapredWork work = null;
+      MapWork work = null;
       String originalInputFormat = null;
       for (Task task : driver.getPlan().getRootTasks()) {
         // The index query should have one and only one map reduce task in the root tasks
@@ -202,7 +203,9 @@ public class CompactIndexHandler extends
             work.setInputFormatSorted(false);
             break;
           }
-          work = (MapredWork)task.getWork();
+          if (task.getWork() != null) {
+            work = ((MapredWork)task.getWork()).getMapWork();
+          }
           String inputFormat = work.getInputformat();
           originalInputFormat = inputFormat;
           if (inputFormat == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Mon Jul 29 15:50:12 2013
@@ -95,7 +95,7 @@ public class CombineHiveInputFormat<K ex
       this.inputSplitShim = inputSplitShim;
       if (job != null) {
         Map<String, PartitionDesc> pathToPartitionInfo = Utilities
-            .getMapRedWork(job).getPathToPartitionInfo();
+            .getMapWork(job).getPathToPartitionInfo();
 
         // extract all the inputFormatClass names for each chunk in the
         // CombinedSplit.
@@ -200,7 +200,7 @@ public class CombineHiveInputFormat<K ex
 
       if (inputFormatClassName == null) {
         Map<String, PartitionDesc> pathToPartitionInfo = Utilities
-            .getMapRedWork(getJob()).getPathToPartitionInfo();
+            .getMapWork(getJob()).getPathToPartitionInfo();
 
         // extract all the inputFormatClass names for each chunk in the
         // CombinedSplit.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Mon Jul 29 15:50:12 2013
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -249,10 +249,10 @@ public class HiveInputFormat<K extends W
   }
 
   protected Map<String, PartitionDesc> pathToPartitionInfo;
-  MapredWork mrwork = null;
+  MapWork mrwork = null;
 
   protected void init(JobConf job) {
-    mrwork = Utilities.getMapRedWork(job);
+    mrwork = Utilities.getMapWork(job);
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java Mon Jul 29 15:50:12 2013
@@ -37,10 +37,10 @@ import org.apache.hadoop.mapred.TextInpu
 public class SymbolicInputFormat implements ReworkMapredInputFormat {
 
   public void rework(HiveConf job, MapredWork work) throws IOException {
-    Map<String, PartitionDesc> pathToParts = work.getPathToPartitionInfo();
+    Map<String, PartitionDesc> pathToParts = work.getMapWork().getPathToPartitionInfo();
     List<String> toRemovePaths = new ArrayList<String>();
     Map<String, PartitionDesc> toAddPathToPart = new HashMap<String, PartitionDesc>();
-    Map<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    Map<String, ArrayList<String>> pathToAliases = work.getMapWork().getPathToAliases();
 
     for (Map.Entry<String, PartitionDesc> pathPartEntry : pathToParts
         .entrySet()) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java Mon Jul 29 15:50:12 2013
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hive.ql.io.avro;
 
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
@@ -29,18 +33,17 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.*;
-
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 
 /**
  * RecordReader optimized against Avro GenericRecords that returns to record
@@ -67,7 +70,9 @@ public class AvroGenericRecordReader imp
 
     GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>();
 
-    if(latest != null) gdr.setExpected(latest);
+    if(latest != null) {
+      gdr.setExpected(latest);
+    }
 
     this.reader = new DataFileReader<GenericRecord>(new FsInput(split.getPath(), job), gdr);
     this.reader.sync(split.getStart());
@@ -86,11 +91,11 @@ public class AvroGenericRecordReader imp
     FileSystem fs = split.getPath().getFileSystem(job);
     // Inside of a MR job, we can pull out the actual properties
     if(AvroSerdeUtils.insideMRJob(job)) {
-      MapredWork mapRedWork = Utilities.getMapRedWork(job);
+      MapWork mapWork = Utilities.getMapWork(job);
 
       // Iterate over the Path -> Partition descriptions to find the partition
       // that matches our input split.
-      for (Map.Entry<String,PartitionDesc> pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){
+      for (Map.Entry<String,PartitionDesc> pathsAndParts: mapWork.getPathToPartitionInfo().entrySet()){
         String partitionPath = pathsAndParts.getKey();
         if(pathIsInPartition(split.getPath(), partitionPath)) {
           if(LOG.isInfoEnabled()) {
@@ -101,11 +106,15 @@ public class AvroGenericRecordReader imp
           Properties props = pathsAndParts.getValue().getProperties();
           if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) {
             return AvroSerdeUtils.determineSchemaOrThrowException(props);
-          } else
+          }
+          else {
             return null; // If it's not in this property, it won't be in any others
+          }
         }
       }
-      if(LOG.isInfoEnabled()) LOG.info("Unable to match filesplit " + split + " with a partition.");
+      if(LOG.isInfoEnabled()) {
+        LOG.info("Unable to match filesplit " + split + " with a partition.");
+      }
     }
 
     // In "select * from table" situations (non-MR), we can add things to the job

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Mon Jul 29 15:50:12 2013
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Mon Jul 29 15:50:12 2013
@@ -33,14 +33,14 @@ import org.apache.hadoop.hive.ql.io.Comb
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.Mapper;
 
 @Explain(displayName = "Block level merge")
-public class MergeWork extends MapredWork implements Serializable {
+public class MergeWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -70,9 +70,6 @@ public class MergeWork extends MapredWor
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     for(String path: this.inputPaths) {
       this.getPathToPartitionInfo().put(path, partDesc);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Mon Jul 29 15:50:12 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -181,7 +182,9 @@ public class PartialScanTask extends Tas
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      MapredWork mrWork = new MapredWork();
+      mrWork.setMapWork(work);
+      Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs