You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/16 07:46:07 UTC

svn commit: r899891 [1/31] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/...

Author: namit
Date: Sat Jan 16 06:44:01 2010
New Revision: 899891

URL: http://svn.apache.org/viewvc?rev=899891&view=rev
Log:
HIVE-964. Handle skew join
(Yongqiang He via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/build-common.xml
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java
    hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/cluster.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input11.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input12.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input13.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input23.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input25.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input26.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input34.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input36.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input38.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input39.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input9.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_dynamicserde.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_testsequencefile.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_testxpath.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_testxpath2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join0.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join10.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join11.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join12.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join13.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join14.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join15.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join16.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join17.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join18.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join19.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join20.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join21.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join22.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join23.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join25.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join26.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join27.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join28.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join29.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join30.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join31.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join34.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join36.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join37.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join38.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join40.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join9.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_hive_626.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_rc.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_reorder.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_reorder2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_reorder3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_thrift.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/louter_join_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/merge1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/multi_insert.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/no_hooks.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/outer_join_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_clusterby.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_constant_expr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_gby_join.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_multi_insert.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_random.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_udf_case.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/quote1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_null_value.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/regex_col.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/router_join_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/semijoin.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/subq.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_10_trims.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_case_column_pruning.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_length.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_reverse.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union10.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union11.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union12.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union14.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union15.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union17.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union18.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union19.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union20.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union22.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union7.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sat Jan 16 06:44:01 2010
@@ -6,6 +6,9 @@
 
   NEW FEATURES
 
+    HIVE-964. Handle skew join
+    (Yongqiang He via namit)
+
   IMPROVEMENTS
 
     HIVE-983. Function from_unixtime takes long.

Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Sat Jan 16 06:44:01 2010
@@ -54,7 +54,7 @@
   <property name="test.include" value="Test*"/>
   <property name="test.classpath.id" value="test.classpath"/>
   <property name="test.output" value="true"/>
-  <property name="test.timeout" value="5400000"/>
+  <property name="test.timeout" value="6600000"/>
   <property name="test.junit.output.format" value="xml"/>
   <property name="test.junit.output.usefile" value="true"/>
   <property name="minimr.query.files" value="join1.q,groupby1.q"/>
@@ -209,7 +209,7 @@
       <classpath refid="test.classpath"/>
     </javac>
   </target>
-	
+
   <target name="test-jar" depends="compile-test">
     <delete file="${test.build.dir}/test-udfs.jar"/>
     <jar jarfile="${test.build.dir}/test-udfs.jar">
@@ -219,14 +219,14 @@
   </target>
 
   <target name="test-conditions">
-        
+
     <condition property="qfile" value="${minimr.query.files}">
       <and>
         <not>
           <isset property="qfile"/>
         </not>
-      
-        <equals arg1="${clustermode}" arg2="miniMR" /> 
+
+        <equals arg1="${clustermode}" arg2="miniMR" />
       </and>
     </condition>
 

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Jan 16 06:44:01 2010
@@ -78,6 +78,7 @@
     HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"),
     HADOOPFS("fs.default.name", "file:///"),
     HADOOPMAPFILENAME("map.input.file", null),
+    HADOOPMAPREDINPUTDIR("mapred.input.dir", null),
     HADOOPJT("mapred.job.tracker", "local"),
     HADOOPNUMREDUCERS("mapred.reduce.tasks", 1),
     HADOOPJOBNAME("mapred.job.name", null),
@@ -168,6 +169,9 @@
     HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long)(256*1000*1000)),
     HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)),
 
+    HIVESKEWJOIN("hive.optimize.skewjoin", false),
+    HIVESKEWJOINKEY("hive.skewjoin.key", 500000),
+    
     HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
     HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000),
 

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Sat Jan 16 06:44:01 2010
@@ -280,6 +280,20 @@
 </property>
 
 <property>
+  <name>hive.optimize.skewjoin</name>
+  <value>false</value>
+  <description>Whether to enable skew join optimization. </description>
+</property>
+
+<property>
+  <name>hive.skewjoin.key</name>
+  <value>100000</value>
+  <description>Determine if we get a skew key in join. If we see more
+	than the specified number of rows with the same key in join operator,
+	we think the key as a skew join key. </description>
+</property>
+
+<property>
   <name>hive.mapred.mode</name>
   <value>nonstrict</value>
   <description>The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run</description>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Jan 16 06:44:01 2010
@@ -38,6 +38,7 @@
 import org.apache.hadoop.hive.ql.parse.ErrorMsg;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -97,9 +98,13 @@
     for (Task<? extends Serializable> task : tasks) {
       if (!seenTasks.contains(task)) {
         seenTasks.add(task);
-        if (task.isMapRedTask()) {
+        
+        if(task instanceof ConditionalTask)
+          jobs +=countJobs(((ConditionalTask)task).getListTasks(), seenTasks);
+        else if (task.isMapRedTask()) { //this may be true for conditional task, but we will not inc the counter 
           jobs++;
         }
+        
         jobs += countJobs(task.getChildTasks(), seenTasks);
       }
     }
@@ -135,7 +140,7 @@
 
         if (!sem.getFetchTaskInit()) {
           sem.setFetchTaskInit(true);
-          sem.getFetchTask().initialize(conf, plan);
+          sem.getFetchTask().initialize(conf, plan, null);
         }
         FetchTask ft = (FetchTask) sem.getFetchTask();
 
@@ -447,10 +452,12 @@
       Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
       Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner> ();
 
+      DriverContext driverCxt = new DriverContext(runnable); 
+
       //Add root Tasks to runnable
 
       for (Task<? extends Serializable> tsk : sem.getRootTasks()) {
-        addToRunnable(runnable,tsk);
+        driverCxt.addToRunnable(tsk);
       }
 
       // Loop while you either have tasks running, or tasks queued up
@@ -459,7 +466,7 @@
         // Launch upto maxthreads tasks
         while(runnable.peek() != null && running.size() < maxthreads) {
           Task<? extends Serializable> tsk = runnable.remove();
-          curJobNo = launchTask(tsk, queryId, noName,running, jobname, jobs, curJobNo);
+          curJobNo = launchTask(tsk, queryId, noName,running, jobname, jobs, curJobNo, driverCxt);
         }
 
         // poll the Tasks to see which one completed
@@ -488,8 +495,8 @@
 
         if (tsk.getChildTasks() != null) {
           for (Task<? extends Serializable> child : tsk.getChildTasks()) {
-            if(isLaunchable(child)) { 
-              addToRunnable(runnable,child);
+            if(DriverContext.isLaunchable(child)) {
+              driverCxt.addToRunnable(child);
             }
           }
         }
@@ -554,13 +561,13 @@
 
   public int launchTask(Task<? extends Serializable> tsk, String queryId, 
     boolean noName, Map<TaskResult,TaskRunner> running, String jobname, 
-    int jobs, int curJobNo) {
+    int jobs, int curJobNo, DriverContext cxt) {
     
     if (SessionState.get() != null) {
       SessionState.get().getHiveHistory().startTask(queryId, tsk,
         tsk.getClass().getName());
     }
-    if (tsk.isMapRedTask()) {
+    if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
       if (noName) {
         conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" 
           + tsk.getId() + ")");
@@ -568,7 +575,7 @@
       curJobNo++;
       console.printInfo("Launching Job " + curJobNo + " out of "+jobs);
     }
-    tsk.initialize(conf, plan);
+    tsk.initialize(conf, plan, cxt);
     TaskResult tskRes = new TaskResult();
     TaskRunner tskRun = new TaskRunner(tsk,tskRes);
 
@@ -627,30 +634,12 @@
     }
   }
 
-  /**
-   * Checks if a task can be launched
-   * 
-   * @param tsk the task to be checked 
-   * @return    true if the task is launchable, false otherwise
-   */
-
-  public boolean isLaunchable(Task<? extends Serializable> tsk) {
-    // A launchable task is one that hasn't been queued, hasn't been initialized, and is runnable.
-    return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
-  }
-
-  public void addToRunnable(Queue<Task<? extends Serializable>> runnable,
-    Task<? extends Serializable> tsk) {
-    runnable.add(tsk);
-    tsk.setQueued();
- }
-
   public boolean getResults(Vector<String> res) throws IOException {
     if (plan != null && plan.getPlan().getFetchTask() != null) {
       BaseSemanticAnalyzer sem = plan.getPlan();
       if (!sem.getFetchTaskInit()) {
         sem.setFetchTaskInit(true);
-        sem.getFetchTask().initialize(conf, plan);
+        sem.getFetchTask().initialize(conf, plan, null);
       }
       FetchTask ft = (FetchTask) sem.getFetchTask();
       ft.setMaxRows(maxRows);

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+
+public class DriverContext {
+  
+  Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
+  
+  public DriverContext( Queue<Task<? extends Serializable>> runnable) {
+    this.runnable = runnable;
+  }
+  
+  public Queue<Task<? extends Serializable>> getRunnable() {
+    return this.runnable;
+  }
+  
+  /**
+   * Checks if a task can be launched
+   * 
+   * @param tsk the task to be checked 
+   * @return    true if the task is launchable, false otherwise
+   */
+
+  public static boolean isLaunchable(Task<? extends Serializable> tsk) {
+    // A launchable task is one that hasn't been queued, hasn't been initialized, and is runnable.
+    return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
+  }
+
+  public void addToRunnable(Task<? extends Serializable> tsk) {
+    runnable.add(tsk);
+    tsk.setQueued();
+ }
+
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Sat Jan 16 06:44:01 2010
@@ -121,6 +121,8 @@
   int joinCacheSize = 0;
   int nextSz = 0;
   transient Byte lastAlias = null;
+  
+  transient boolean handleSkewJoin = false;
 
   protected int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
       Map<Byte, List<exprNodeDesc>> inputMap) {
@@ -193,7 +195,10 @@
     return joinOutputObjectInspector;
   }
 
+  Configuration hconf;
   protected void initializeOp(Configuration hconf) throws HiveException {
+    this.handleSkewJoin = conf.getHandleSkewJoin();
+    this.hconf = hconf;
     LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
     totalSz = 0;
     // Map that contains the rows for each alias
@@ -207,7 +212,7 @@
       order = conf.getTagOrder();
     }
     condn = conf.getConds();
-    noOuterJoin = conf.getNoOuterJoin();
+    noOuterJoin = conf.isNoOuterJoin();
 
     totalSz = populateJoinKeyValue(joinValues, conf.getExprs());
 
@@ -232,30 +237,15 @@
         nr.add(null);
       dummyObj[pos] = nr;
       // there should be only 1 dummy object in the RowContainer
-      RowContainer<ArrayList<Object>> values = new RowContainer<ArrayList<Object>>(1);
+      RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos, alias, 1);
       values.add((ArrayList<Object>) dummyObj[pos]);
       dummyObjVectors[pos] = values;
 
       // if serde is null, the input doesn't need to be spilled out 
       // e.g., the output columns does not contains the input table
-      SerDe serde = getSpillSerDe(pos);
-      RowContainer rc = new RowContainer(joinCacheSize);
-      if ( serde != null ) {
-        
-        // arbitrary column names used internally for serializing to spill table
-        List<String> colList = new ArrayList<String>();
-        for ( int i = 0; i < sz; ++i )
-          colList.add(alias + "_VAL_" + i);
-        
-        // object inspector for serializing input tuples
-        StructObjectInspector rcOI = 
-          ObjectInspectorFactory.getStandardStructObjectInspector(
-                            colList,
-                            joinValuesStandardObjectInspectors.get(pos));
-        
-        rc.setSerDe(serde, rcOI);
-      }
+      RowContainer rc = getRowContainer(hconf, pos, alias, joinCacheSize);
       storage.put(pos, rc);
+
       pos++;
     }
 
@@ -265,9 +255,32 @@
     LOG.info("JOIN " + ((StructObjectInspector)outputObjInspector).getTypeName() + " totalsz = " + totalSz);
     
   }
+
+  RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias, int containerSize)
+      throws HiveException {
+    tableDesc tblDesc = getSpillTableDesc(alias);
+    SerDe serde = getSpillSerDe(alias);
+    
+    if ( serde == null )
+      containerSize = 1;
+      
+    RowContainer rc = new RowContainer(containerSize, hconf);
+    StructObjectInspector rcOI = null;
+    if(tblDesc != null) {
+   // arbitrary column names used internally for serializing to spill table
+      List<String> colNames = Utilities.getColumnNames(tblDesc.getProperties());
+      // object inspector for serializing input tuples
+      rcOI = ObjectInspectorFactory.getStandardStructObjectInspector(colNames,
+              joinValuesStandardObjectInspectors.get(pos));
+    }
+
+    rc.setSerDe(serde, rcOI);
+    rc.setTableDesc(tblDesc);
+    return rc;
+  }
   
-  private SerDe getSpillSerDe(byte pos) {
-    tableDesc desc = getSpillTableDesc(pos);
+  private SerDe getSpillSerDe(byte alias) {
+    tableDesc desc = getSpillTableDesc(alias);
     if ( desc == null )
       return null;
     SerDe sd = (SerDe) ReflectionUtils.newInstance(desc.getDeserializerClass(),  null);
@@ -280,6 +293,20 @@
     return sd;
   }
   
+  transient boolean newGroupStarted = false;
+  
+  public tableDesc getSpillTableDesc(Byte alias) {
+    if(spillTableDesc == null || spillTableDesc.size() == 0)
+      initSpillTables();
+    return spillTableDesc.get(alias);
+  }
+  
+  public Map<Byte, tableDesc> getSpillTableDesc() {
+    if(spillTableDesc == null)
+      initSpillTables();
+    return spillTableDesc;
+  }
+  
   private void initSpillTables() {
     Map<Byte, List<exprNodeDesc>> exprs = conf.getExprs();
     spillTableDesc = new HashMap<Byte, tableDesc>(exprs.size());
@@ -311,14 +338,9 @@
     }
   }
   
-  public tableDesc getSpillTableDesc(Byte alias) {
-    if(spillTableDesc == null || spillTableDesc.size() == 0)
-      initSpillTables();
-    return spillTableDesc.get(alias);
-  }
-  
   public void startGroup() throws HiveException {
     LOG.trace("Join: Starting new group");
+    newGroupStarted = true;
     for (RowContainer<ArrayList<Object>> alw: storage.values()) {
       alw.clear();
     }
@@ -644,7 +666,7 @@
         }
 
         intObj.pushObj(newObj);
-
+        
         // execute the actual join algorithm
         ArrayList<boolean[]> newNulls =  joinObjects(inputNulls, newObj, intObj,
                                                      aliasNum, childFirstRow);
@@ -765,8 +787,10 @@
    */
   public void closeOp(boolean abort) throws HiveException {
     LOG.trace("Join Op close");
-    for ( RowContainer<ArrayList<Object>> alw: storage.values() )
-      alw.clear(); // clean up the temp files
+    for ( RowContainer<ArrayList<Object>> alw: storage.values() ) {
+      if(alw != null) //it maybe null for mapjoins
+        alw.clear(); // clean up the temp files      
+    }
     storage.clear();
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Sat Jan 16 06:44:01 2010
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -35,7 +36,9 @@
 
   private static final long serialVersionUID = 1L;
   private List<Task<? extends Serializable>> listTasks;
-  private Task<? extends Serializable>       resTask;
+  
+  private boolean resolved = false;
+  private List<Task<? extends Serializable>> resTasks;
   
   private ConditionalResolver resolver;
   private Object              resolverCtx;
@@ -60,15 +63,32 @@
     return false;
   }
   
-  public void initialize (HiveConf conf, QueryPlan queryPlan) {
-    super.initialize(conf, queryPlan);
-    resTask = listTasks.get(resolver.getTaskId(conf, resolverCtx));
-    resTask.initialize(conf, queryPlan);
+  public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+    super.initialize(conf, queryPlan, driverContext);
   }
   
   @Override
   public int execute() {
-    return resTask.executeTask();
+    resTasks = resolver.getTasks(conf, resolverCtx);
+    resolved = true;
+    for(Task<? extends Serializable> tsk: getListTasks()) {
+      if(!resTasks.contains(tsk)) {
+        this.driverContext.getRunnable().remove(tsk);
+        console.printInfo(ExecDriver.getJobEndMsg(""
+            + Utilities.randGen.nextInt())
+            + ", job is filtered out (removed at runtime).");
+        if(tsk.getChildTasks() != null) {
+          for(Task<? extends Serializable> child : tsk.getChildTasks()) {
+            child.parentTasks.remove(tsk);
+            if(DriverContext.isLaunchable(child))
+              this.driverContext.addToRunnable(child);
+          }
+        }
+      } else if(!this.driverContext.getRunnable().contains(tsk)){
+        this.driverContext.addToRunnable(tsk);
+      }
+    }
+    return 0;
   }
 
   /**
@@ -91,6 +111,26 @@
   public Object getResolverCtx() {
     return resolverCtx;
   }
+  
+  // used to determine whether child tasks can be run. 
+  public boolean done() {
+    boolean ret = true;
+    List<Task<? extends Serializable>> parentTasks = this.getParentTasks();
+    if (parentTasks != null) {
+      for(Task<? extends Serializable> par: parentTasks)
+        ret = ret && par.done();
+    }
+    List<Task<? extends Serializable>> retTasks;
+    if(resolved)
+      retTasks = this.resTasks;
+    else
+      retTasks = getListTasks();
+    if (ret &&  retTasks != null) {
+      for (Task<? extends Serializable> tsk : retTasks)
+        ret = ret && tsk.done();
+    }
+    return ret;
+  }
 
   /**
    * @param resolverCtx the resolverCtx to set
@@ -116,4 +156,26 @@
   public int getType() {
     return StageType.CONDITIONAL;
   }
+
+  @Override
+  public String getName() {
+    return "CONDITION";
+  }
+
+  /**
+   * Add a dependent task on the current conditional task. The task will not be
+   * a direct child of conditional task. Actually it will be added as child task
+   * of associated tasks.
+   * 
+   * @return true if the task got added false if it already existed
+   */
+  public boolean addDependentTask(Task<? extends Serializable> dependent) {
+    boolean ret = false;
+    if(this.getListTasks() != null) {
+      for(Task<? extends Serializable> tsk: this.getListTasks()) {
+        ret = ret & tsk.addDependentTask(dependent);
+      }
+    }
+    return ret;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Sat Jan 16 06:44:01 2010
@@ -86,4 +86,9 @@
   public int getType() {
     return StageType.COPY;
   }
+
+  @Override
+  public String getName() {
+    return "COPY";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sat Jan 16 06:44:01 2010
@@ -49,6 +49,7 @@
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -102,8 +103,8 @@
     super();
   }
 
-  public void initialize(HiveConf conf, QueryPlan queryPlan) {
-    super.initialize(conf, queryPlan);
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
+    super.initialize(conf, queryPlan, ctx);
     this.conf = conf;
   }
 
@@ -1315,4 +1316,9 @@
     return StageType.DDL;
   }
 
+  @Override
+  public String getName() {
+    return "DDL";
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Sat Jan 16 06:44:01 2010
@@ -108,7 +108,7 @@
    * Initialization when invoked from QL
    */
   public void initialize(HiveConf conf, QueryPlan queryPlan) {
-    super.initialize(conf, queryPlan);
+    super.initialize(conf, queryPlan, null);
     job = new JobConf(conf, ExecDriver.class);
     // NOTE: initialize is only called if it is in non-local mode.
     // In case it's in non-local mode, we need to move the SessionState files
@@ -201,7 +201,7 @@
             SessionState.get().getQueryId(), getId(),
             Keys.TASK_HADOOP_ID, rj.getJobID());
       }
-      console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = "
+      console.printInfo(ExecDriver.getJobEndMsg(rj.getJobID()) + ", Tracking URL = "
           + rj.getTrackingURL());
       console.printInfo("Kill Command = "
           + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
@@ -453,6 +453,9 @@
    * Execute a query plan using Hadoop
    */
   public int execute() {
+    
+    success = true;
+    
     try {
       setNumberOfReducers();
     } catch(IOException e) {
@@ -591,7 +594,7 @@
         success = false;
       } 
 
-      String statusMesg = "Ended Job = " + rj.getJobID();
+      String statusMesg = getJobEndMsg(rj.getJobID());
       if (!success) {
         statusMesg += " with errors";
         returnVal = 2;
@@ -654,6 +657,24 @@
 
     return (returnVal);
   }
+  
+  /**
+   * this msg pattern is used to track when a job is started
+   * @param jobId
+   * @return
+   */
+  public static String getJobStartMsg(String jobId) {
+    return "Starting Job = " + jobId;
+  }
+  
+  /**
+   * this msg pattern is used to track when a job is successfully done.
+   * @param jobId
+   * @return
+   */
+  public static String getJobEndMsg(String jobId) {
+    return "Ended Job = " + jobId;
+  }
 
   private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
 
@@ -1032,4 +1053,9 @@
   public int getType() {
     return StageType.MAPRED;
   }
+
+  @Override
+  public String getName() {
+    return "EXEC";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Sat Jan 16 06:44:01 2010
@@ -170,24 +170,27 @@
       
       if (!keyWritable.equals(groupKey)) {
         // If a operator wants to do some work at the beginning of a group
-        if (groupKey == null) {
+        if (groupKey == null) { //the first group
           groupKey = new BytesWritable();
         } else {
           // If a operator wants to do some work at the end of a group
           l4j.trace("End Group");
           reducer.endGroup();
         }
+        
+        try {
+          keyObject = inputKeyDeserializer.deserialize(keyWritable);
+        } catch (Exception e) {
+          throw new HiveException("Unable to deserialize reduce input key from " + 
+              Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize())
+              + " with properties " + keyTableDesc.getProperties(),
+              e);
+        }
+        
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
         l4j.trace("Start Group");
         reducer.startGroup();
-      }
-      try {
-        keyObject = inputKeyDeserializer.deserialize(keyWritable);
-      } catch (Exception e) {
-        throw new HiveException("Unable to deserialize reduce input key from " + 
-            Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize())
-            + " with properties " + keyTableDesc.getProperties(),
-            e);
+        reducer.setGroupKeyObject(keyObject);
       }
       // System.err.print(keyObject.toString());
       while (values.hasNext()) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Sat Jan 16 06:44:01 2010
@@ -28,6 +28,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hive.ql.plan.explain;
@@ -277,6 +278,11 @@
     // Start by getting the work part of the task and call the output plan for the work
     outputPlan(task.getWork(), out, extended, indent+2);
     out.println();
+    if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+      for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+        outputPlan(con, out, extended, displayedSet, indent);
+      }
+    }
     if (task.getChildTasks() != null) {
       for(Task<? extends Serializable> child: task.getChildTasks()) {
         outputPlan(child, out, extended, displayedSet, indent);
@@ -284,13 +290,19 @@
     }
   }
 
-  private void outputDependencies(Task<? extends Serializable> task, PrintStream out, int indent) 
+  private Set<Task<? extends Serializable>> dependeciesTaskSet = new HashSet<Task<? extends Serializable>>();
+  private void outputDependencies(Task<? extends Serializable> task, PrintStream out, int indent, boolean rootTskCandidate) 
     throws Exception {
     
+    if(dependeciesTaskSet.contains(task))
+      return;
+    dependeciesTaskSet.add(task);
+    
     out.print(indentString(indent));
     out.printf("%s", task.getId());
-    if (task.getParentTasks() == null || task.getParentTasks().isEmpty()) {
-      out.print(" is a root stage");
+    if ((task.getParentTasks() == null || task.getParentTasks().isEmpty())) {
+      if(rootTskCandidate)
+        out.print(" is a root stage");
     }
     else {
       out.print(" depends on stages: ");
@@ -302,14 +314,34 @@
         first = false;
         out.print(parent.getId());
       }
+      
+      if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+        out.print(" , consists of ");
+        first = true;
+        for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+          if (!first) {
+            out.print(", ");
+          }
+          first = false;
+          out.print(con.getId());
+        }
+      }
+      
     }
     out.println();
     
+    if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+      for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+        outputDependencies(con, out, indent, false);
+      }
+    }
+    
     if (task.getChildTasks() != null) {
       for(Task<? extends Serializable> child: task.getChildTasks()) {
-        outputDependencies(child, out, indent);
+        outputDependencies(child, out, indent, true);
       }
     }
+    
   }
 
   public void outputAST(String treeString, PrintStream out, int indent) {
@@ -326,7 +358,7 @@
     out.print(indentString(indent));
     out.println("STAGE DEPENDENCIES:");
     for(Task<? extends Serializable> rootTask: rootTasks) {
-      outputDependencies(rootTask, out, indent+2);
+      outputDependencies(rootTask, out, indent+2, true);
     }
   }
 
@@ -354,4 +386,9 @@
   public int getType() {
     return StageType.EXPLAIN;
   }
+
+  @Override
+  public String getName() {
+    return "EXPLAIN";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Sat Jan 16 06:44:01 2010
@@ -24,6 +24,7 @@
 import java.util.Vector;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.plan.fetchWork;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -50,8 +51,8 @@
  	  super();
  	}
  	
-  public void initialize (HiveConf conf, QueryPlan queryPlan) {
-    super.initialize(conf, queryPlan);
+  public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
+    super.initialize(conf, queryPlan, ctx);
     
    	try {
        // Create a file system handle  
@@ -135,4 +136,9 @@
   public int getType() {
     return StageType.FETCH;
   }
+
+  @Override
+  public String getName() {
+    return "FETCH";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Jan 16 06:44:01 2010
@@ -20,8 +20,8 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Properties;
 
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -29,7 +29,6 @@
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
@@ -37,11 +36,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
 /**
  * File Sink operator implementation
@@ -108,7 +103,6 @@
       LOG.info("Writing to temp file: FS " + outPath);
 
       HiveOutputFormat<?, ?> hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
-      final Class<? extends Writable> outputClass = serializer.getSerializedClass();
       boolean isCompressed = conf.getCompressed();
 
       // The reason to keep these instead of using
@@ -117,22 +111,8 @@
       // we create.
       Path parent = Utilities.toTempPath(specPath);
       finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc, hiveOutputFormat, isCompressed, finalPath);
-      tableDesc tableInfo = conf.getTableInfo();
-      JobConf jc_output = jc;
-      if (isCompressed) {
-        jc_output = new JobConf(jc);
-        String codecStr = conf.getCompressCodec();
-        if (codecStr != null && !codecStr.trim().equals("")) {
-          Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
-          FileOutputFormat.setOutputCompressorClass(jc_output, codec);
-        }
-        String type = conf.getCompressType();
-        if(type !=null && !type.trim().equals("")) {
-          CompressionType style = CompressionType.valueOf(type);
-          SequenceFileOutputFormat.setOutputCompressionType(jc, style);
-        }
-      }
-      outWriter = getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
+      final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+      outWriter = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), outputClass, conf, outPath);
 
       // in recent hadoop versions, use deleteOnExit to clean tmp files.
       autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath);
@@ -146,15 +126,6 @@
     }
   }
 
-  public static RecordWriter getRecordWriter(JobConf jc, HiveOutputFormat<?, ?> hiveOutputFormat,
-      final Class<? extends Writable> valueClass, boolean isCompressed,
-      Properties tableProp, Path outPath) throws IOException, HiveException {
-    if (hiveOutputFormat != null) {
-      return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass, isCompressed, tableProp, null);
-    }
-    return null;
-  }
-
   Writable recordValue; 
   public void processOp(Object row, int tag) throws HiveException {
     // Since File Sink is a terminal operator, forward is not called - so, maintain the number of output rows explicitly
@@ -220,26 +191,7 @@
     try {
       if(conf != null) {
         String specPath = conf.getDirName();
-        fs = (new Path(specPath)).getFileSystem(hconf);
-        Path tmpPath = Utilities.toTempPath(specPath);
-        Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate");
-        Path finalPath = new Path(specPath);
-        if(success) {
-          if(fs.exists(tmpPath)) {
-            // Step1: rename tmp output folder to intermediate path. After this
-            // point, updates from speculative tasks still writing to tmpPath 
-            // will not appear in finalPath.
-            LOG.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
-            Utilities.rename(fs, tmpPath, intermediatePath);
-            // Step2: remove any tmp file or double-committed output files
-            Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
-            // Step3: move to the file destination
-            LOG.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
-            Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
-          }
-        } else {
-          fs.delete(tmpPath, true);
-        }
+        FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
       }
     } catch (IOException e) {
       throw new HiveException (e);
@@ -247,6 +199,31 @@
     super.jobClose(hconf, success);
   }
   
+  public static void mvFileToFinalPath(String specPath, Configuration hconf, boolean success, Log LOG) throws IOException, HiveException{
+    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+    Path tmpPath = Utilities.toTempPath(specPath);
+    Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+        + ".intermediate");
+    Path finalPath = new Path(specPath);
+    if (success) {
+      if (fs.exists(tmpPath)) {
+        // Step1: rename tmp output folder to intermediate path. After this
+        // point, updates from speculative tasks still writing to tmpPath
+        // will not appear in finalPath.
+        LOG.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+        Utilities.rename(fs, tmpPath, intermediatePath);
+        // Step2: remove any tmp file or double-committed output files
+        Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+        // Step3: move to the file destination
+        LOG.info("Moving tmp dir: " + intermediatePath + " to: "
+            + finalPath);
+        Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+      }
+    } else {
+      fs.delete(tmpPath, true);
+    }
+  }
+  
   public int getType() {
     return OperatorType.FILESINK;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Sat Jan 16 06:44:01 2010
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FunctionWork;
@@ -44,8 +45,8 @@
     super();
   }
   
-  public void initialize(HiveConf conf, QueryPlan queryPlan) {
-    super.initialize(conf, queryPlan);
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
+    super.initialize(conf, queryPlan, ctx);
     this.conf = conf;
   }
   
@@ -114,4 +115,9 @@
   public int getType() {
     return StageType.FUNC;
   }
+
+  @Override
+  public String getName() {
+    return "FUNCTION";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Sat Jan 16 06:44:01 2010
@@ -18,14 +18,18 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.joinDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
@@ -36,15 +40,22 @@
 public class JoinOperator extends CommonJoinOperator<joinDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
   
+  private transient SkewJoinHandler skewJoinKeyContext = null;
+  
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
     initializeChildren(hconf);
+    if(this.handleSkewJoin) {
+      skewJoinKeyContext = new SkewJoinHandler(this);
+      skewJoinKeyContext.initiliaze(hconf);
+    }
   }
   
   public void processOp(Object row, int tag)
       throws HiveException {
     try {
+      
       // get alias
       alias = (byte)tag;
     
@@ -53,6 +64,9 @@
       
       ArrayList<Object> nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
       
+      if(this.handleSkewJoin)
+        skewJoinKeyContext.handleSkew(tag);
+      
       // number of rows for the key in the given table
       int sz = storage.get(alias).size();
     
@@ -92,6 +106,87 @@
     return OperatorType.JOIN;
   }
 
+  /**
+   * All done
+   *
+   */
+  public void closeOp(boolean abort) throws HiveException {
+    if (this.handleSkewJoin) {
+      skewJoinKeyContext.close(abort);
+    }
+    super.closeOp(abort);
+  }
+  
+  @Override
+  public void jobClose(Configuration hconf, boolean success) throws HiveException {
+    if(this.handleSkewJoin) {
+      try {
+        for (int i = 0; i < numAliases; i++) {
+          String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+          FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+          for (int j = 0; j < numAliases; j++) {
+            if(j == i) continue;
+            specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+            FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+          }
+        }
+        
+        if(success) {
+          //move up files
+          for (int i = 0; i < numAliases; i++) {
+            String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+            moveUpFiles(specPath, hconf, LOG);
+            for (int j = 0; j < numAliases; j++) {
+              if(j == i) continue;
+              specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+              moveUpFiles(specPath, hconf, LOG);
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new HiveException (e);
+      }
+    }
+    super.jobClose(hconf, success);
+  }
+  
+  
+  
+  private void moveUpFiles(String specPath, Configuration hconf, Log log) throws IOException, HiveException {
+    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+    Path finalPath = new Path(specPath);
+    
+    if(fs.exists(finalPath)) {
+      FileStatus[] taskOutputDirs = fs.listStatus(finalPath);
+      if(taskOutputDirs != null ) {
+        for (FileStatus dir : taskOutputDirs) {
+          Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath);
+          fs.delete(dir.getPath(), true);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Forward a record of join results.
+   *
+   * @throws HiveException
+   */
+  public void endGroup() throws HiveException {
+    //if this is a skew key, we need to handle it in a separate map reduce job.
+    if(this.handleSkewJoin && skewJoinKeyContext.currBigKeyTag >=0) {
+      try {
+        skewJoinKeyContext.endGroup();
+      } catch (IOException e) {
+        LOG.error(e.getMessage(),e);
+        throw new HiveException(e);
+      }
+      return;
+    }
+    else {
+      checkAndGenObject();
+    }
+  }
   
 }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sat Jan 16 06:44:01 2010
@@ -72,6 +72,8 @@
 
   transient protected Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
   
+  transient protected RowContainer<ArrayList<Object>> emptyList = null;
+  
   transient static final private String[] fatalErrMsg = { 
     null,  // counter value 0 means no error
     "Mapside join size exceeds hive.mapjoin.maxsize. Please increase that or remove the mapjoin hint." // counter value 1
@@ -80,14 +82,18 @@
   public static class MapJoinObjectCtx {
     ObjectInspector standardOI;
     SerDe      serde;
+    tableDesc tblDesc;
+    Configuration conf;
 
     /**
      * @param standardOI
      * @param serde
      */
-    public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde) {
+    public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde, tableDesc tblDesc, Configuration conf) {
       this.standardOI = standardOI;
       this.serde = serde;
+      this.tblDesc = tblDesc;
+      this.conf = conf;
     }
 
     /**
@@ -104,6 +110,14 @@
       return serde;
     }
 
+    public tableDesc getTblDesc() {
+      return tblDesc;
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
   }
 
   transient static Map<Integer, MapJoinObjectCtx> mapMetadata = new HashMap<Integer, MapJoinObjectCtx>();
@@ -159,7 +173,9 @@
       mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
     }
     
-    storage.put((byte)posBigTable, new RowContainer());
+    emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
+    RowContainer bigPosRC = getRowContainer(hconf, (byte)posBigTable, order[posBigTable], joinCacheSize);
+    storage.put((byte)posBigTable, bigPosRC);
     
     mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
     
@@ -214,7 +230,7 @@
               new MapJoinObjectCtx(
                   ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
                       ObjectInspectorCopyOption.WRITABLE),
-                  keySerializer));
+                  keySerializer, keyTableDesc, hconf));
 
           firstRow = false;
         }
@@ -240,7 +256,7 @@
 
         boolean needNewKey = true;
         if (o == null) {
-          res = new RowContainer();
+          res = getRowContainer(this.hconf, (byte)tag, order[tag], joinCacheSize);
         	res.add(value);
         } else {
           res = o.getObj();
@@ -266,14 +282,14 @@
                           new MapJoinObjectCtx(
                                 ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
                                   ObjectInspectorCopyOption.WRITABLE),
-                                valueSerDe));
+                                valueSerDe, valueTableDesc, hconf));
         }
         
         // Construct externalizable objects for key and value
         if ( needNewKey ) {
           MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
 	        MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
-          
+	        valueObj.setConf(this.hconf); 
           // This may potentially increase the size of the hashmap on the mapper
   	      if (res.size() > mapJoinRowsKey) {
             if ( res.size() % 100 == 0 ) {
@@ -295,7 +311,10 @@
           MapJoinObjectValue o = (MapJoinObjectValue)mapJoinTables.get(pos).get(keyMap);
 
           if (o == null) {
-            storage.put(pos, dummyObjVectors[pos.intValue()]);
+            if(noOuterJoin)
+              storage.put(pos, emptyList);
+            else
+              storage.put(pos, dummyObjVectors[pos.intValue()]);
           }
           else {
             storage.put(pos, o.getObj());
@@ -323,6 +342,7 @@
     for (HashMapWrapper hashTable: mapJoinTables.values()) {
       hashTable.close();
     }
+    super.closeOp(abort);
   }
   /**
    * Implements the getName function for the Node Interface.

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Sat Jan 16 06:44:01 2010
@@ -194,4 +194,9 @@
   public int getType() {
     return StageType.MAPREDLOCAL;
   }
+
+  @Override
+  public String getName() {
+    return "MAPRED";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Jan 16 06:44:01 2010
@@ -180,4 +180,9 @@
   public int getType() {
     return StageType.MOVE;
   }
+
+  @Override
+  public String getName() {
+    return "MOVE";
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sat Jan 16 06:44:01 2010
@@ -799,6 +799,8 @@
   transient protected long beginTime = 0;
   transient protected long totalTime = 0;
 
+  transient protected Object groupKeyObject;
+
   /**
    * this is called before operator process to buffer some counters
    */
@@ -1010,4 +1012,12 @@
      assert false;
      return -1;
    }
+
+  public void setGroupKeyObject(Object keyObject) {
+    this.groupKeyObject = keyObject;
+  }
+
+  public Object getGroupKeyObject() {
+    return groupKeyObject;
+  }
 }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * At runtime in Join, we output big keys in one table into one corresponding
+ * directories, and all same keys in other tables into different dirs(one for
+ * each table). The directories will look like:
+ * <ul>
+ * <li>
+ * dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys which
+ * is big in T1),dir-T3-keys(containing keys which is big in T1), ...
+ * <li>
+ * dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing
+ * big keys in T2),dir-T3-keys(containing keys which is big in T2), ...
+ * <li>
+ * dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big
+ * keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... .....
+ * </ul>
+ * 
+ * <p>
+ * For each skew key, we first write all values to a local tmp file. At the time
+ * of ending the current group, the local tmp file will be uploaded to hdfs.
+ * Right now, we use one file per skew key.
+ * 
+ * <p>
+ * For more info, please see
+ * https://issues.apache.org/jira/browse/HIVE-964.
+ * 
+ */
+public class SkewJoinHandler {
+  
+  static final protected Log LOG = LogFactory.getLog(SkewJoinHandler.class.getName());
+  
+  public int currBigKeyTag = -1;
+  
+  private int rowNumber = 0;
+  private int currTag = -1;
+  
+  private int skewKeyDefinition = -1;
+  private Map<Byte,StructObjectInspector> skewKeysTableObjectInspector = null;
+  private Map<Byte,SerDe> tblSerializers = null;
+  private Map<Byte, tableDesc> tblDesc = null;
+  
+  private Map<Byte, Boolean> bigKeysExistingMap = null;
+  
+  Configuration hconf = null;
+  List<Object> dummyKey = null;
+  String taskId;
+  
+  private CommonJoinOperator<? extends Serializable> joinOp;
+  private int numAliases;
+  private joinDesc conf;
+  
+  public SkewJoinHandler(CommonJoinOperator<? extends Serializable> joinOp) {
+    this.joinOp = joinOp;
+    this.numAliases = joinOp.numAliases;
+    this.conf = joinOp.getConf();
+  }
+  
+  public void initiliaze(Configuration hconf) {
+    this.hconf = hconf;
+    joinDesc desc = joinOp.getConf();
+    skewKeyDefinition = desc.getSkewKeyDefinition();
+    skewKeysTableObjectInspector = new HashMap<Byte, StructObjectInspector>(
+        numAliases);
+    tblDesc = desc.getSkewKeysValuesTables();
+    tblSerializers = new HashMap<Byte, SerDe>(numAliases);
+    bigKeysExistingMap = new HashMap<Byte, Boolean>(numAliases);
+    taskId = Utilities.getTaskId(hconf);
+
+    for (int i = 0; i < numAliases; i++) {
+      Byte alias = conf.getTagOrder()[i];
+      List<ObjectInspector> skewTableKeyInspectors = new ArrayList<ObjectInspector>();
+      StructObjectInspector soi = (StructObjectInspector) this.joinOp.inputObjInspectors[alias];
+      StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
+          .toString());
+      List<? extends StructField> keyFields = ((StructObjectInspector) sf
+          .getFieldObjectInspector()).getAllStructFieldRefs();
+      int keyFieldSize = keyFields.size();
+      for (int k = 0; k < keyFieldSize; k++) {
+        skewTableKeyInspectors.add(keyFields.get(k).getFieldObjectInspector());
+      }
+      tableDesc joinKeyDesc = desc.getKeyTableDesc();
+      List<String> keyColNames = Utilities.getColumnNames(joinKeyDesc.getProperties());
+      StructObjectInspector structTblKeyInpector = ObjectInspectorFactory
+          .getStandardStructObjectInspector(keyColNames, skewTableKeyInspectors);
+
+      try {
+        SerDe serializer = (SerDe) ReflectionUtils.newInstance(tblDesc.get(
+            alias).getDeserializerClass(), null);
+        serializer.initialize(null, tblDesc.get(alias).getProperties());
+        tblSerializers.put((byte) i, serializer);
+      } catch (SerDeException e) {
+        LOG.error("Skewjoin will be disabled due to " + e.getMessage(), e);
+        this.joinOp.handleSkewJoin = false;
+        break;
+      }
+
+      tableDesc valTblDesc = this.joinOp.getSpillTableDesc(alias);
+      List<String> valColNames = new ArrayList<String>();
+      if (valTblDesc != null)
+        valColNames = Utilities.getColumnNames(valTblDesc.getProperties());
+      StructObjectInspector structTblValInpector = ObjectInspectorFactory
+          .getStandardStructObjectInspector(valColNames,
+              this.joinOp.joinValuesStandardObjectInspectors.get((byte) i));
+
+      StructObjectInspector structTblInpector = ObjectInspectorFactory
+          .getUnionStructObjectInspector(Arrays.asList(new StructObjectInspector[] {
+                  structTblValInpector,structTblKeyInpector }));
+      skewKeysTableObjectInspector.put((byte) i, structTblInpector);
+    }
+
+    // reset rowcontainer's serde, objectinspector, and tableDesc.
+    for (int i = 0; i < numAliases; i++) {
+      Byte alias = conf.getTagOrder()[i];
+      RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+          .valueOf((byte) i));
+      if (rc != null) {
+        rc.setSerDe(tblSerializers.get((byte) i), skewKeysTableObjectInspector
+            .get((byte) i));
+        rc.setTableDesc(tblDesc.get(alias));
+      }
+    }
+  }
+  
+  void endGroup() throws IOException, HiveException {
+    if(skewKeyInCurrentGroup) {
+      
+      String specPath = conf.getBigKeysDirMap().get((byte)currBigKeyTag);
+      RowContainer<ArrayList<Object>> bigKey =  joinOp.storage.get(Byte.valueOf((byte)currBigKeyTag));
+      Path outputPath =  getOperatorOutputPath(specPath);
+      FileSystem destFs = outputPath.getFileSystem(hconf);
+      bigKey.copyToDFSDirecory(destFs, outputPath);
+      
+      for (int i = 0; i < numAliases; i++) {
+        if (((byte)i) == currBigKeyTag) continue;
+        RowContainer<ArrayList<Object>> values =  joinOp.storage.get(Byte.valueOf((byte)i));
+        if(values != null) {
+          specPath = conf.getSmallKeysDirMap().get((byte)currBigKeyTag).get((byte)i);
+          values.copyToDFSDirecory(destFs, getOperatorOutputPath(specPath));
+        }
+      }
+    }
+    skewKeyInCurrentGroup = false;
+  }
+  
+  boolean skewKeyInCurrentGroup = false;
+  public void handleSkew(int tag) throws HiveException {
+
+    if(joinOp.newGroupStarted || tag != currTag) {
+      rowNumber = 0;
+      currTag = tag;
+    }
+    
+    if(joinOp.newGroupStarted) {
+      currBigKeyTag = -1;
+      joinOp.newGroupStarted = false;
+      dummyKey = (List<Object>)joinOp.getGroupKeyObject();
+      skewKeyInCurrentGroup = false;
+      
+      for (int i = 0; i < numAliases; i++) {
+        RowContainer<ArrayList<Object>> rc =  joinOp.storage.get(Byte.valueOf((byte)i));
+        if(rc != null) {
+          rc.setKeyObject(dummyKey);
+        }
+      }
+    }
+    
+    rowNumber++;
+    if (currBigKeyTag == -1 && (tag < numAliases - 1)
+        && rowNumber >= skewKeyDefinition) {
+      // the first time we see a big key. If this key is not in the last
+      // table (the last table can always be streamed), we define that we get
+      // a skew key now.
+      currBigKeyTag = tag;
+      
+      // right now we assume that the group by is an ArrayList object. It may
+      // change in future.
+      if(! (dummyKey instanceof List)) 
+        throw new RuntimeException("Bug in handle skew key in a seperate job.");
+      
+      skewKeyInCurrentGroup = true;
+      bigKeysExistingMap.put(Byte.valueOf((byte)currBigKeyTag), Boolean.TRUE);
+    }
+  }
+
+  public void close(boolean abort) throws HiveException {
+    if (!abort) {
+      try {
+        endGroup();
+        commit();
+      } catch (IOException e) {
+        throw new HiveException(e);
+      }
+    } else {
+      for (int bigKeyTbl = 0; bigKeyTbl < numAliases; bigKeyTbl++) {
+
+        // if we did not see a skew key in this table, continue to next
+        // table
+        if (!bigKeysExistingMap.get((byte) bigKeyTbl))
+          continue;
+
+        try {
+          String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl);
+          Path bigKeyPath = getOperatorOutputPath(specPath);
+          FileSystem fs = bigKeyPath.getFileSystem(hconf);
+          delete(bigKeyPath, fs);
+          for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
+            if (((byte) smallKeyTbl) == bigKeyTbl)
+              continue;
+            specPath = conf.getSmallKeysDirMap().get((byte) bigKeyTbl).get(
+                (byte) smallKeyTbl);
+            delete(getOperatorOutputPath(specPath), fs);
+          }
+        } catch (IOException e) {
+          throw new HiveException(e);
+        }
+      }
+    }
+  }
+
+  private void delete(Path operatorOutputPath, FileSystem fs) {
+    try {
+      fs.delete(operatorOutputPath, true);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  private void commit() throws IOException {
+    for (int bigKeyTbl = 0; bigKeyTbl < numAliases; bigKeyTbl++) {
+      
+      // if we did not see a skew key in this table, continue to next table
+      // we are trying to avoid an extra call of FileSystem.exists()
+      Boolean existing = bigKeysExistingMap.get(Byte.valueOf((byte)bigKeyTbl));
+      if (existing == null || !existing)
+        continue;
+      
+      String specPath = conf.getBigKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl));
+      commitOutputPathToFinalPath(specPath, false);
+      for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
+        if ( smallKeyTbl == bigKeyTbl)
+          continue;
+        specPath = conf.getSmallKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl)).get(
+            Byte.valueOf((byte) smallKeyTbl));
+        // the file may not exist, and we just ignore this
+        commitOutputPathToFinalPath(specPath, true);
+      }
+    }
+  }
+  
+  private void commitOutputPathToFinalPath(String specPath,
+      boolean ignoreNonExisting) throws IOException {
+    Path outPath = getOperatorOutputPath(specPath);
+    Path finalPath = getOperatorFinalPath(specPath);
+    FileSystem fs = outPath.getFileSystem(hconf);
+    // for local file system in Hadoop-0.17.2.1, it will throw IOException when
+    // file not existing.
+    try {
+      if (!fs.rename(outPath, finalPath)) {
+        throw new IOException("Unable to rename output to: " + finalPath);
+      }
+    } catch (FileNotFoundException e) {
+      if (!ignoreNonExisting)
+        throw e;
+    } catch (IOException e) {
+      if (!fs.exists(outPath) && ignoreNonExisting)
+        return;
+      throw e;
+    }
+  }
+  
+  private Path getOperatorOutputPath(String specPath) throws IOException {
+    Path tmpPath = Utilities.toTempPath(specPath);
+    return new Path(tmpPath, Utilities.toTempPath(taskId));
+  }
+  
+  private Path getOperatorFinalPath(String specPath) throws IOException {
+    Path tmpPath = Utilities.toTempPath(specPath);
+    return new Path(tmpPath, taskId);
+  }
+  
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Sat Jan 16 06:44:01 2010
@@ -20,8 +20,11 @@
 
 import java.io.*;
 import java.util.*;
+
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -36,7 +39,7 @@
  * Task implementation
  **/
 
-public abstract class Task <T extends Serializable> implements Serializable {
+public abstract class Task <T extends Serializable> implements Serializable, Node {
 
   private static final long serialVersionUID = 1L;
   transient protected boolean started;
@@ -50,7 +53,7 @@
   transient protected QueryPlan queryPlan;
   transient protected TaskHandle taskHandle;
   transient protected Map<String, Long> taskCounters;
-
+  transient protected DriverContext driverContext;
   // Bean methods
 
   protected List<Task<? extends Serializable>> childTasks;
@@ -65,7 +68,7 @@
     this.taskCounters = new HashMap<String, Long>();
   }
 
-  public void initialize (HiveConf conf, QueryPlan queryPlan) {
+  public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     this.queryPlan = queryPlan;
     isdone = false;
     started = false;
@@ -80,7 +83,8 @@
       LOG.error(StringUtils.stringifyException(e));
       throw new RuntimeException (e);
     }
-
+    this.driverContext = driverContext;
+    
     console = new LogHelper(LOG);
   }
 
@@ -133,6 +137,10 @@
   public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
     this.childTasks = childTasks;
   }
+  
+  public List<? extends Node> getChildren() {
+    return getChildTasks();
+  }
 
   public List<Task<? extends Serializable>> getChildTasks() {
     return childTasks;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Jan 16 06:44:01 2010
@@ -54,12 +54,16 @@
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.commons.logging.LogFactory;
@@ -71,7 +75,9 @@
   /**
    * The object in the reducer are composed of these top level fields
    */
-
+  
+  public static String HADOOP_LOCAL_FS = "file:///";
+  
   public static enum ReduceField { KEY, VALUE, ALIAS };
   private static Map<String, mapredWork> gWorkMap=
     Collections.synchronizedMap(new HashMap<String, mapredWork>());
@@ -337,6 +343,15 @@
   public static tableDesc getTableDesc(Table tbl) {
     return (new tableDesc (tbl.getDeserializer().getClass(), tbl.getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema()));
   }
+  
+  //column names and column types are all delimited by comma
+  public static tableDesc getTableDesc(String cols, String colTypes) {
+    return (new tableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
+        HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
+            org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
+            org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+            org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
+  }
 
 
   public static partitionDesc getPartitionDesc(Partition part) throws HiveException {
@@ -835,6 +850,32 @@
     }
     return names;
   }
+  
+  public static List<String> getColumnNames(Properties props) {
+    List<String> names = new ArrayList<String>();
+    String colNames = props.getProperty(Constants.LIST_COLUMNS);
+    String[] cols = colNames.trim().split(",");
+    if (cols != null) {
+      for(String col : cols) {
+        if(col!=null && !col.trim().equals(""))
+          names.add(col);
+      }
+    }
+    return names;
+  }
+  
+  public static List<String> getColumnTypes(Properties props) {
+    List<String> names = new ArrayList<String>();
+    String colNames = props.getProperty(Constants.LIST_COLUMN_TYPES);
+    String[] cols = colNames.trim().split(",");
+    if (cols != null) {
+      for(String col : cols) {
+        if(col!=null && !col.trim().equals(""))
+          names.add(col);
+      }
+    }
+    return names;
+  }
 
   public static void validateColumnNames(List<String> colNames,
       List<String> checkCols) throws SemanticException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Sat Jan 16 06:44:01 2010
@@ -24,8 +24,10 @@
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -39,6 +41,7 @@
 
   transient protected int     metadataTag;
   transient protected RowContainer obj;
+  transient protected Configuration conf;
 
   public MapJoinObjectValue() {
   }
@@ -80,7 +83,9 @@
       MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
       int sz = in.readInt();
 
-      RowContainer res = new RowContainer();
+      RowContainer res = new RowContainer(ctx.getConf());
+      res.setSerDe(ctx.getSerDe(), ctx.getStandardOI());
+      res.setTableDesc(ctx.getTblDesc());
       for (int pos = 0; pos < sz; pos++) {
         Writable val = ctx.getSerDe().getSerializedClass().newInstance();
         val.readFields(in);
@@ -122,6 +127,9 @@
     catch (SerDeException e) {
       throw new IOException(e);
     }
+    catch (HiveException e) {
+      throw new IOException(e);
+    }
   }
 
   /**
@@ -152,4 +160,8 @@
     this.obj = obj;
   }
 
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
 }