You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/16 07:46:07 UTC
svn commit: r899891 [1/31] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/...
Author: namit
Date: Sat Jan 16 06:44:01 2010
New Revision: 899891
URL: http://svn.apache.org/viewvc?rev=899891&view=rev
Log:
HIVE-964. Handle skew join
(Yongqiang He via namit)
Added:
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalPlanResolver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
hadoop/hive/trunk/ql/src/test/queries/clientpositive/skewjoin.q
hadoop/hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/build-common.xml
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalWork.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/joinDesc.java
hadoop/hive/trunk/ql/src/test/results/clientpositive/case_sensitivity.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/cast1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/cluster.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input11.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input12.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input13.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input23.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input25.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input26.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input34.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input35.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input36.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input38.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input39.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input6.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input8.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input9.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_dynamicserde.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_testsequencefile.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_testxpath.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_testxpath2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join0.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join10.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join11.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join12.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join13.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join14.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join15.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join16.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join17.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join18.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join19.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join20.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join21.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join22.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join23.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join25.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join26.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join27.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join28.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join29.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join30.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join31.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join34.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join36.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join37.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join38.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join40.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join6.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join8.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join9.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_hive_626.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_rc.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_reorder.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_reorder2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_reorder3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_thrift.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/louter_join_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/merge1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/multi_insert.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/no_hooks.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/outer_join_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_clusterby.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_constant_expr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_gby_join.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_join3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_multi_insert.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_outer_join4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_random.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ppd_udf_case.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/quote1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_null_value.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/regex_col.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/router_join_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/semijoin.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/subq.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_10_trims.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_case_column_pruning.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_length.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_reverse.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union10.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union11.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union12.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union14.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union15.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union17.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union18.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union19.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union20.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union22.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union6.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union7.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sat Jan 16 06:44:01 2010
@@ -6,6 +6,9 @@
NEW FEATURES
+ HIVE-964. Handle skew join
+ (Yongqiang He via namit)
+
IMPROVEMENTS
HIVE-983. Function from_unixtime takes long.
Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Sat Jan 16 06:44:01 2010
@@ -54,7 +54,7 @@
<property name="test.include" value="Test*"/>
<property name="test.classpath.id" value="test.classpath"/>
<property name="test.output" value="true"/>
- <property name="test.timeout" value="5400000"/>
+ <property name="test.timeout" value="6600000"/>
<property name="test.junit.output.format" value="xml"/>
<property name="test.junit.output.usefile" value="true"/>
<property name="minimr.query.files" value="join1.q,groupby1.q"/>
@@ -209,7 +209,7 @@
<classpath refid="test.classpath"/>
</javac>
</target>
-
+
<target name="test-jar" depends="compile-test">
<delete file="${test.build.dir}/test-udfs.jar"/>
<jar jarfile="${test.build.dir}/test-udfs.jar">
@@ -219,14 +219,14 @@
</target>
<target name="test-conditions">
-
+
<condition property="qfile" value="${minimr.query.files}">
<and>
<not>
<isset property="qfile"/>
</not>
-
- <equals arg1="${clustermode}" arg2="miniMR" />
+
+ <equals arg1="${clustermode}" arg2="miniMR" />
</and>
</condition>
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Jan 16 06:44:01 2010
@@ -78,6 +78,7 @@
HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"),
HADOOPFS("fs.default.name", "file:///"),
HADOOPMAPFILENAME("map.input.file", null),
+ HADOOPMAPREDINPUTDIR("mapred.input.dir", null),
HADOOPJT("mapred.job.tracker", "local"),
HADOOPNUMREDUCERS("mapred.reduce.tasks", 1),
HADOOPJOBNAME("mapred.job.name", null),
@@ -168,6 +169,9 @@
HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long)(256*1000*1000)),
HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)),
+ HIVESKEWJOIN("hive.optimize.skewjoin", false),
+ HIVESKEWJOINKEY("hive.skewjoin.key", 500000),
+
HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000),
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Sat Jan 16 06:44:01 2010
@@ -280,6 +280,20 @@
</property>
<property>
+ <name>hive.optimize.skewjoin</name>
+ <value>false</value>
+ <description>Whether to enable skew join optimization. </description>
+</property>
+
+<property>
+ <name>hive.skewjoin.key</name>
+ <value>100000</value>
+ <description>Determine if we get a skew key in join. If we see more
+ than the specified number of rows with the same key in join operator,
+ we think the key as a skew join key. </description>
+</property>
+
+<property>
<name>hive.mapred.mode</name>
<value>nonstrict</value>
<description>The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run</description>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Jan 16 06:44:01 2010
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.ql.parse.ErrorMsg;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -97,9 +98,13 @@
for (Task<? extends Serializable> task : tasks) {
if (!seenTasks.contains(task)) {
seenTasks.add(task);
- if (task.isMapRedTask()) {
+
+ if(task instanceof ConditionalTask)
+ jobs +=countJobs(((ConditionalTask)task).getListTasks(), seenTasks);
+ else if (task.isMapRedTask()) { //this may be true for conditional task, but we will not inc the counter
jobs++;
}
+
jobs += countJobs(task.getChildTasks(), seenTasks);
}
}
@@ -135,7 +140,7 @@
if (!sem.getFetchTaskInit()) {
sem.setFetchTaskInit(true);
- sem.getFetchTask().initialize(conf, plan);
+ sem.getFetchTask().initialize(conf, plan, null);
}
FetchTask ft = (FetchTask) sem.getFetchTask();
@@ -447,10 +452,12 @@
Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner> ();
+ DriverContext driverCxt = new DriverContext(runnable);
+
//Add root Tasks to runnable
for (Task<? extends Serializable> tsk : sem.getRootTasks()) {
- addToRunnable(runnable,tsk);
+ driverCxt.addToRunnable(tsk);
}
// Loop while you either have tasks running, or tasks queued up
@@ -459,7 +466,7 @@
// Launch upto maxthreads tasks
while(runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
- curJobNo = launchTask(tsk, queryId, noName,running, jobname, jobs, curJobNo);
+ curJobNo = launchTask(tsk, queryId, noName,running, jobname, jobs, curJobNo, driverCxt);
}
// poll the Tasks to see which one completed
@@ -488,8 +495,8 @@
if (tsk.getChildTasks() != null) {
for (Task<? extends Serializable> child : tsk.getChildTasks()) {
- if(isLaunchable(child)) {
- addToRunnable(runnable,child);
+ if(DriverContext.isLaunchable(child)) {
+ driverCxt.addToRunnable(child);
}
}
}
@@ -554,13 +561,13 @@
public int launchTask(Task<? extends Serializable> tsk, String queryId,
boolean noName, Map<TaskResult,TaskRunner> running, String jobname,
- int jobs, int curJobNo) {
+ int jobs, int curJobNo, DriverContext cxt) {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startTask(queryId, tsk,
tsk.getClass().getName());
}
- if (tsk.isMapRedTask()) {
+ if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "("
+ tsk.getId() + ")");
@@ -568,7 +575,7 @@
curJobNo++;
console.printInfo("Launching Job " + curJobNo + " out of "+jobs);
}
- tsk.initialize(conf, plan);
+ tsk.initialize(conf, plan, cxt);
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk,tskRes);
@@ -627,30 +634,12 @@
}
}
- /**
- * Checks if a task can be launched
- *
- * @param tsk the task to be checked
- * @return true if the task is launchable, false otherwise
- */
-
- public boolean isLaunchable(Task<? extends Serializable> tsk) {
- // A launchable task is one that hasn't been queued, hasn't been initialized, and is runnable.
- return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
- }
-
- public void addToRunnable(Queue<Task<? extends Serializable>> runnable,
- Task<? extends Serializable> tsk) {
- runnable.add(tsk);
- tsk.setQueued();
- }
-
public boolean getResults(Vector<String> res) throws IOException {
if (plan != null && plan.getPlan().getFetchTask() != null) {
BaseSemanticAnalyzer sem = plan.getPlan();
if (!sem.getFetchTaskInit()) {
sem.setFetchTaskInit(true);
- sem.getFetchTask().initialize(conf, plan);
+ sem.getFetchTask().initialize(conf, plan, null);
}
FetchTask ft = (FetchTask) sem.getFetchTask();
ft.setMaxRows(maxRows);
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+
+public class DriverContext {
+
+ Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
+
+ public DriverContext( Queue<Task<? extends Serializable>> runnable) {
+ this.runnable = runnable;
+ }
+
+ public Queue<Task<? extends Serializable>> getRunnable() {
+ return this.runnable;
+ }
+
+ /**
+ * Checks if a task can be launched
+ *
+ * @param tsk the task to be checked
+ * @return true if the task is launchable, false otherwise
+ */
+
+ public static boolean isLaunchable(Task<? extends Serializable> tsk) {
+ // A launchable task is one that hasn't been queued, hasn't been initialized, and is runnable.
+ return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
+ }
+
+ public void addToRunnable(Task<? extends Serializable> tsk) {
+ runnable.add(tsk);
+ tsk.setQueued();
+ }
+
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Sat Jan 16 06:44:01 2010
@@ -121,6 +121,8 @@
int joinCacheSize = 0;
int nextSz = 0;
transient Byte lastAlias = null;
+
+ transient boolean handleSkewJoin = false;
protected int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
Map<Byte, List<exprNodeDesc>> inputMap) {
@@ -193,7 +195,10 @@
return joinOutputObjectInspector;
}
+ Configuration hconf;
protected void initializeOp(Configuration hconf) throws HiveException {
+ this.handleSkewJoin = conf.getHandleSkewJoin();
+ this.hconf = hconf;
LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
totalSz = 0;
// Map that contains the rows for each alias
@@ -207,7 +212,7 @@
order = conf.getTagOrder();
}
condn = conf.getConds();
- noOuterJoin = conf.getNoOuterJoin();
+ noOuterJoin = conf.isNoOuterJoin();
totalSz = populateJoinKeyValue(joinValues, conf.getExprs());
@@ -232,30 +237,15 @@
nr.add(null);
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
- RowContainer<ArrayList<Object>> values = new RowContainer<ArrayList<Object>>(1);
+ RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos, alias, 1);
values.add((ArrayList<Object>) dummyObj[pos]);
dummyObjVectors[pos] = values;
// if serde is null, the input doesn't need to be spilled out
// e.g., the output columns does not contains the input table
- SerDe serde = getSpillSerDe(pos);
- RowContainer rc = new RowContainer(joinCacheSize);
- if ( serde != null ) {
-
- // arbitrary column names used internally for serializing to spill table
- List<String> colList = new ArrayList<String>();
- for ( int i = 0; i < sz; ++i )
- colList.add(alias + "_VAL_" + i);
-
- // object inspector for serializing input tuples
- StructObjectInspector rcOI =
- ObjectInspectorFactory.getStandardStructObjectInspector(
- colList,
- joinValuesStandardObjectInspectors.get(pos));
-
- rc.setSerDe(serde, rcOI);
- }
+ RowContainer rc = getRowContainer(hconf, pos, alias, joinCacheSize);
storage.put(pos, rc);
+
pos++;
}
@@ -265,9 +255,32 @@
LOG.info("JOIN " + ((StructObjectInspector)outputObjInspector).getTypeName() + " totalsz = " + totalSz);
}
+
+ RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias, int containerSize)
+ throws HiveException {
+ tableDesc tblDesc = getSpillTableDesc(alias);
+ SerDe serde = getSpillSerDe(alias);
+
+ if ( serde == null )
+ containerSize = 1;
+
+ RowContainer rc = new RowContainer(containerSize, hconf);
+ StructObjectInspector rcOI = null;
+ if(tblDesc != null) {
+ // arbitrary column names used internally for serializing to spill table
+ List<String> colNames = Utilities.getColumnNames(tblDesc.getProperties());
+ // object inspector for serializing input tuples
+ rcOI = ObjectInspectorFactory.getStandardStructObjectInspector(colNames,
+ joinValuesStandardObjectInspectors.get(pos));
+ }
+
+ rc.setSerDe(serde, rcOI);
+ rc.setTableDesc(tblDesc);
+ return rc;
+ }
- private SerDe getSpillSerDe(byte pos) {
- tableDesc desc = getSpillTableDesc(pos);
+ private SerDe getSpillSerDe(byte alias) {
+ tableDesc desc = getSpillTableDesc(alias);
if ( desc == null )
return null;
SerDe sd = (SerDe) ReflectionUtils.newInstance(desc.getDeserializerClass(), null);
@@ -280,6 +293,20 @@
return sd;
}
+ transient boolean newGroupStarted = false;
+
+ public tableDesc getSpillTableDesc(Byte alias) {
+ if(spillTableDesc == null || spillTableDesc.size() == 0)
+ initSpillTables();
+ return spillTableDesc.get(alias);
+ }
+
+ public Map<Byte, tableDesc> getSpillTableDesc() {
+ if(spillTableDesc == null)
+ initSpillTables();
+ return spillTableDesc;
+ }
+
private void initSpillTables() {
Map<Byte, List<exprNodeDesc>> exprs = conf.getExprs();
spillTableDesc = new HashMap<Byte, tableDesc>(exprs.size());
@@ -311,14 +338,9 @@
}
}
- public tableDesc getSpillTableDesc(Byte alias) {
- if(spillTableDesc == null || spillTableDesc.size() == 0)
- initSpillTables();
- return spillTableDesc.get(alias);
- }
-
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
+ newGroupStarted = true;
for (RowContainer<ArrayList<Object>> alw: storage.values()) {
alw.clear();
}
@@ -644,7 +666,7 @@
}
intObj.pushObj(newObj);
-
+
// execute the actual join algorithm
ArrayList<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj,
aliasNum, childFirstRow);
@@ -765,8 +787,10 @@
*/
public void closeOp(boolean abort) throws HiveException {
LOG.trace("Join Op close");
- for ( RowContainer<ArrayList<Object>> alw: storage.values() )
- alw.clear(); // clean up the temp files
+ for ( RowContainer<ArrayList<Object>> alw: storage.values() ) {
+ if(alw != null) //it maybe null for mapjoins
+ alw.clear(); // clean up the temp files
+ }
storage.clear();
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Sat Jan 16 06:44:01 2010
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -35,7 +36,9 @@
private static final long serialVersionUID = 1L;
private List<Task<? extends Serializable>> listTasks;
- private Task<? extends Serializable> resTask;
+
+ private boolean resolved = false;
+ private List<Task<? extends Serializable>> resTasks;
private ConditionalResolver resolver;
private Object resolverCtx;
@@ -60,15 +63,32 @@
return false;
}
- public void initialize (HiveConf conf, QueryPlan queryPlan) {
- super.initialize(conf, queryPlan);
- resTask = listTasks.get(resolver.getTaskId(conf, resolverCtx));
- resTask.initialize(conf, queryPlan);
+ public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+ super.initialize(conf, queryPlan, driverContext);
}
@Override
public int execute() {
- return resTask.executeTask();
+ resTasks = resolver.getTasks(conf, resolverCtx);
+ resolved = true;
+ for(Task<? extends Serializable> tsk: getListTasks()) {
+ if(!resTasks.contains(tsk)) {
+ this.driverContext.getRunnable().remove(tsk);
+ console.printInfo(ExecDriver.getJobEndMsg(""
+ + Utilities.randGen.nextInt())
+ + ", job is filtered out (removed at runtime).");
+ if(tsk.getChildTasks() != null) {
+ for(Task<? extends Serializable> child : tsk.getChildTasks()) {
+ child.parentTasks.remove(tsk);
+ if(DriverContext.isLaunchable(child))
+ this.driverContext.addToRunnable(child);
+ }
+ }
+ } else if(!this.driverContext.getRunnable().contains(tsk)){
+ this.driverContext.addToRunnable(tsk);
+ }
+ }
+ return 0;
}
/**
@@ -91,6 +111,26 @@
public Object getResolverCtx() {
return resolverCtx;
}
+
+ // used to determine whether child tasks can be run.
+ public boolean done() {
+ boolean ret = true;
+ List<Task<? extends Serializable>> parentTasks = this.getParentTasks();
+ if (parentTasks != null) {
+ for(Task<? extends Serializable> par: parentTasks)
+ ret = ret && par.done();
+ }
+ List<Task<? extends Serializable>> retTasks;
+ if(resolved)
+ retTasks = this.resTasks;
+ else
+ retTasks = getListTasks();
+ if (ret && retTasks != null) {
+ for (Task<? extends Serializable> tsk : retTasks)
+ ret = ret && tsk.done();
+ }
+ return ret;
+ }
/**
* @param resolverCtx the resolverCtx to set
@@ -116,4 +156,26 @@
public int getType() {
return StageType.CONDITIONAL;
}
+
+ @Override
+ public String getName() {
+ return "CONDITION";
+ }
+
+ /**
+ * Add a dependent task on the current conditional task. The task will not be
+ * a direct child of conditional task. Actually it will be added as child task
+ * of associated tasks.
+ *
+ * @return true if the task got added false if it already existed
+ */
+ public boolean addDependentTask(Task<? extends Serializable> dependent) {
+ boolean ret = false;
+ if(this.getListTasks() != null) {
+ for(Task<? extends Serializable> tsk: this.getListTasks()) {
+ ret = ret & tsk.addDependentTask(dependent);
+ }
+ }
+ return ret;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Sat Jan 16 06:44:01 2010
@@ -86,4 +86,9 @@
public int getType() {
return StageType.COPY;
}
+
+ @Override
+ public String getName() {
+ return "COPY";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sat Jan 16 06:44:01 2010
@@ -49,6 +49,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -102,8 +103,8 @@
super();
}
- public void initialize(HiveConf conf, QueryPlan queryPlan) {
- super.initialize(conf, queryPlan);
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
+ super.initialize(conf, queryPlan, ctx);
this.conf = conf;
}
@@ -1315,4 +1316,9 @@
return StageType.DDL;
}
+ @Override
+ public String getName() {
+ return "DDL";
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Sat Jan 16 06:44:01 2010
@@ -108,7 +108,7 @@
* Initialization when invoked from QL
*/
public void initialize(HiveConf conf, QueryPlan queryPlan) {
- super.initialize(conf, queryPlan);
+ super.initialize(conf, queryPlan, null);
job = new JobConf(conf, ExecDriver.class);
// NOTE: initialize is only called if it is in non-local mode.
// In case it's in non-local mode, we need to move the SessionState files
@@ -201,7 +201,7 @@
SessionState.get().getQueryId(), getId(),
Keys.TASK_HADOOP_ID, rj.getJobID());
}
- console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = "
+ console.printInfo(ExecDriver.getJobEndMsg(rj.getJobID()) + ", Tracking URL = "
+ rj.getTrackingURL());
console.printInfo("Kill Command = "
+ HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
@@ -453,6 +453,9 @@
* Execute a query plan using Hadoop
*/
public int execute() {
+
+ success = true;
+
try {
setNumberOfReducers();
} catch(IOException e) {
@@ -591,7 +594,7 @@
success = false;
}
- String statusMesg = "Ended Job = " + rj.getJobID();
+ String statusMesg = getJobEndMsg(rj.getJobID());
if (!success) {
statusMesg += " with errors";
returnVal = 2;
@@ -654,6 +657,24 @@
return (returnVal);
}
+
+ /**
+ * this msg pattern is used to track when a job is started
+ * @param jobId
+ * @return
+ */
+ public static String getJobStartMsg(String jobId) {
+ return "Starting Job = " + jobId;
+ }
+
+ /**
+ * this msg pattern is used to track when a job is successfully done.
+ * @param jobId
+ * @return
+ */
+ public static String getJobEndMsg(String jobId) {
+ return "Ended Job = " + jobId;
+ }
private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
@@ -1032,4 +1053,9 @@
public int getType() {
return StageType.MAPRED;
}
+
+ @Override
+ public String getName() {
+ return "EXEC";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Sat Jan 16 06:44:01 2010
@@ -170,24 +170,27 @@
if (!keyWritable.equals(groupKey)) {
// If a operator wants to do some work at the beginning of a group
- if (groupKey == null) {
+ if (groupKey == null) { //the first group
groupKey = new BytesWritable();
} else {
// If a operator wants to do some work at the end of a group
l4j.trace("End Group");
reducer.endGroup();
}
+
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException("Unable to deserialize reduce input key from " +
+ Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize())
+ + " with properties " + keyTableDesc.getProperties(),
+ e);
+ }
+
groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
l4j.trace("Start Group");
reducer.startGroup();
- }
- try {
- keyObject = inputKeyDeserializer.deserialize(keyWritable);
- } catch (Exception e) {
- throw new HiveException("Unable to deserialize reduce input key from " +
- Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize())
- + " with properties " + keyTableDesc.getProperties(),
- e);
+ reducer.setGroupKeyObject(keyObject);
}
// System.err.print(keyObject.toString());
while (values.hasNext()) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Sat Jan 16 06:44:01 2010
@@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Map.Entry;
import org.apache.hadoop.hive.ql.plan.explain;
@@ -277,6 +278,11 @@
// Start by getting the work part of the task and call the output plan for the work
outputPlan(task.getWork(), out, extended, indent+2);
out.println();
+ if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+ for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+ outputPlan(con, out, extended, displayedSet, indent);
+ }
+ }
if (task.getChildTasks() != null) {
for(Task<? extends Serializable> child: task.getChildTasks()) {
outputPlan(child, out, extended, displayedSet, indent);
@@ -284,13 +290,19 @@
}
}
- private void outputDependencies(Task<? extends Serializable> task, PrintStream out, int indent)
+ private Set<Task<? extends Serializable>> dependeciesTaskSet = new HashSet<Task<? extends Serializable>>();
+ private void outputDependencies(Task<? extends Serializable> task, PrintStream out, int indent, boolean rootTskCandidate)
throws Exception {
+ if(dependeciesTaskSet.contains(task))
+ return;
+ dependeciesTaskSet.add(task);
+
out.print(indentString(indent));
out.printf("%s", task.getId());
- if (task.getParentTasks() == null || task.getParentTasks().isEmpty()) {
- out.print(" is a root stage");
+ if ((task.getParentTasks() == null || task.getParentTasks().isEmpty())) {
+ if(rootTskCandidate)
+ out.print(" is a root stage");
}
else {
out.print(" depends on stages: ");
@@ -302,14 +314,34 @@
first = false;
out.print(parent.getId());
}
+
+ if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+ out.print(" , consists of ");
+ first = true;
+ for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+ if (!first) {
+ out.print(", ");
+ }
+ first = false;
+ out.print(con.getId());
+ }
+ }
+
}
out.println();
+ if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+ for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+ outputDependencies(con, out, indent, false);
+ }
+ }
+
if (task.getChildTasks() != null) {
for(Task<? extends Serializable> child: task.getChildTasks()) {
- outputDependencies(child, out, indent);
+ outputDependencies(child, out, indent, true);
}
}
+
}
public void outputAST(String treeString, PrintStream out, int indent) {
@@ -326,7 +358,7 @@
out.print(indentString(indent));
out.println("STAGE DEPENDENCIES:");
for(Task<? extends Serializable> rootTask: rootTasks) {
- outputDependencies(rootTask, out, indent+2);
+ outputDependencies(rootTask, out, indent+2, true);
}
}
@@ -354,4 +386,9 @@
public int getType() {
return StageType.EXPLAIN;
}
+
+ @Override
+ public String getName() {
+ return "EXPLAIN";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Sat Jan 16 06:44:01 2010
@@ -24,6 +24,7 @@
import java.util.Vector;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.plan.fetchWork;
import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -50,8 +51,8 @@
super();
}
- public void initialize (HiveConf conf, QueryPlan queryPlan) {
- super.initialize(conf, queryPlan);
+ public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
+ super.initialize(conf, queryPlan, ctx);
try {
// Create a file system handle
@@ -135,4 +136,9 @@
public int getType() {
return StageType.FETCH;
}
+
+ @Override
+ public String getName() {
+ return "FETCH";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Jan 16 06:44:01 2010
@@ -20,8 +20,8 @@
import java.io.IOException;
import java.io.Serializable;
-import java.util.Properties;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -29,7 +29,6 @@
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
@@ -37,11 +36,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
/**
* File Sink operator implementation
@@ -108,7 +103,6 @@
LOG.info("Writing to temp file: FS " + outPath);
HiveOutputFormat<?, ?> hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
- final Class<? extends Writable> outputClass = serializer.getSerializedClass();
boolean isCompressed = conf.getCompressed();
// The reason to keep these instead of using
@@ -117,22 +111,8 @@
// we create.
Path parent = Utilities.toTempPath(specPath);
finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc, hiveOutputFormat, isCompressed, finalPath);
- tableDesc tableInfo = conf.getTableInfo();
- JobConf jc_output = jc;
- if (isCompressed) {
- jc_output = new JobConf(jc);
- String codecStr = conf.getCompressCodec();
- if (codecStr != null && !codecStr.trim().equals("")) {
- Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
- FileOutputFormat.setOutputCompressorClass(jc_output, codec);
- }
- String type = conf.getCompressType();
- if(type !=null && !type.trim().equals("")) {
- CompressionType style = CompressionType.valueOf(type);
- SequenceFileOutputFormat.setOutputCompressionType(jc, style);
- }
- }
- outWriter = getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath);
+ final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+ outWriter = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), outputClass, conf, outPath);
// in recent hadoop versions, use deleteOnExit to clean tmp files.
autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPath);
@@ -146,15 +126,6 @@
}
}
- public static RecordWriter getRecordWriter(JobConf jc, HiveOutputFormat<?, ?> hiveOutputFormat,
- final Class<? extends Writable> valueClass, boolean isCompressed,
- Properties tableProp, Path outPath) throws IOException, HiveException {
- if (hiveOutputFormat != null) {
- return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass, isCompressed, tableProp, null);
- }
- return null;
- }
-
Writable recordValue;
public void processOp(Object row, int tag) throws HiveException {
// Since File Sink is a terminal operator, forward is not called - so, maintain the number of output rows explicitly
@@ -220,26 +191,7 @@
try {
if(conf != null) {
String specPath = conf.getDirName();
- fs = (new Path(specPath)).getFileSystem(hconf);
- Path tmpPath = Utilities.toTempPath(specPath);
- Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate");
- Path finalPath = new Path(specPath);
- if(success) {
- if(fs.exists(tmpPath)) {
- // Step1: rename tmp output folder to intermediate path. After this
- // point, updates from speculative tasks still writing to tmpPath
- // will not appear in finalPath.
- LOG.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
- Utilities.rename(fs, tmpPath, intermediatePath);
- // Step2: remove any tmp file or double-committed output files
- Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
- // Step3: move to the file destination
- LOG.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
- Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
- }
- } else {
- fs.delete(tmpPath, true);
- }
+ FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
}
} catch (IOException e) {
throw new HiveException (e);
@@ -247,6 +199,31 @@
super.jobClose(hconf, success);
}
+ public static void mvFileToFinalPath(String specPath, Configuration hconf, boolean success, Log LOG) throws IOException, HiveException{
+ FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+ Path tmpPath = Utilities.toTempPath(specPath);
+ Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+ + ".intermediate");
+ Path finalPath = new Path(specPath);
+ if (success) {
+ if (fs.exists(tmpPath)) {
+ // Step1: rename tmp output folder to intermediate path. After this
+ // point, updates from speculative tasks still writing to tmpPath
+ // will not appear in finalPath.
+ LOG.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+ Utilities.rename(fs, tmpPath, intermediatePath);
+ // Step2: remove any tmp file or double-committed output files
+ Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+ // Step3: move to the file destination
+ LOG.info("Moving tmp dir: " + intermediatePath + " to: "
+ + finalPath);
+ Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+ }
+ } else {
+ fs.delete(tmpPath, true);
+ }
+ }
+
public int getType() {
return OperatorType.FILESINK;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Sat Jan 16 06:44:01 2010
@@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FunctionWork;
@@ -44,8 +45,8 @@
super();
}
- public void initialize(HiveConf conf, QueryPlan queryPlan) {
- super.initialize(conf, queryPlan);
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
+ super.initialize(conf, queryPlan, ctx);
this.conf = conf;
}
@@ -114,4 +115,9 @@
public int getType() {
return StageType.FUNC;
}
+
+ @Override
+ public String getName() {
+ return "FUNCTION";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Sat Jan 16 06:44:01 2010
@@ -18,14 +18,18 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.joinDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -36,15 +40,22 @@
public class JoinOperator extends CommonJoinOperator<joinDesc> implements Serializable {
private static final long serialVersionUID = 1L;
+ private transient SkewJoinHandler skewJoinKeyContext = null;
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
initializeChildren(hconf);
+ if(this.handleSkewJoin) {
+ skewJoinKeyContext = new SkewJoinHandler(this);
+ skewJoinKeyContext.initiliaze(hconf);
+ }
}
public void processOp(Object row, int tag)
throws HiveException {
try {
+
// get alias
alias = (byte)tag;
@@ -53,6 +64,9 @@
ArrayList<Object> nr = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
+ if(this.handleSkewJoin)
+ skewJoinKeyContext.handleSkew(tag);
+
// number of rows for the key in the given table
int sz = storage.get(alias).size();
@@ -92,6 +106,87 @@
return OperatorType.JOIN;
}
+ /**
+ * All done
+ *
+ */
+ public void closeOp(boolean abort) throws HiveException {
+ if (this.handleSkewJoin) {
+ skewJoinKeyContext.close(abort);
+ }
+ super.closeOp(abort);
+ }
+
+ @Override
+ public void jobClose(Configuration hconf, boolean success) throws HiveException {
+ if(this.handleSkewJoin) {
+ try {
+ for (int i = 0; i < numAliases; i++) {
+ String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+ FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+ for (int j = 0; j < numAliases; j++) {
+ if(j == i) continue;
+ specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+ FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+ }
+ }
+
+ if(success) {
+ //move up files
+ for (int i = 0; i < numAliases; i++) {
+ String specPath = this.conf.getBigKeysDirMap().get((byte)i);
+ moveUpFiles(specPath, hconf, LOG);
+ for (int j = 0; j < numAliases; j++) {
+ if(j == i) continue;
+ specPath = getConf().getSmallKeysDirMap().get((byte)i).get((byte)j);
+ moveUpFiles(specPath, hconf, LOG);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException (e);
+ }
+ }
+ super.jobClose(hconf, success);
+ }
+
+
+
+ private void moveUpFiles(String specPath, Configuration hconf, Log log) throws IOException, HiveException {
+ FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+ Path finalPath = new Path(specPath);
+
+ if(fs.exists(finalPath)) {
+ FileStatus[] taskOutputDirs = fs.listStatus(finalPath);
+ if(taskOutputDirs != null ) {
+ for (FileStatus dir : taskOutputDirs) {
+ Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath);
+ fs.delete(dir.getPath(), true);
+ }
+ }
+ }
+ }
+
+ /**
+ * Forward a record of join results.
+ *
+ * @throws HiveException
+ */
+ public void endGroup() throws HiveException {
+ //if this is a skew key, we need to handle it in a separate map reduce job.
+ if(this.handleSkewJoin && skewJoinKeyContext.currBigKeyTag >=0) {
+ try {
+ skewJoinKeyContext.endGroup();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(),e);
+ throw new HiveException(e);
+ }
+ return;
+ }
+ else {
+ checkAndGenObject();
+ }
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sat Jan 16 06:44:01 2010
@@ -72,6 +72,8 @@
transient protected Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
+ transient protected RowContainer<ArrayList<Object>> emptyList = null;
+
transient static final private String[] fatalErrMsg = {
null, // counter value 0 means no error
"Mapside join size exceeds hive.mapjoin.maxsize. Please increase that or remove the mapjoin hint." // counter value 1
@@ -80,14 +82,18 @@
public static class MapJoinObjectCtx {
ObjectInspector standardOI;
SerDe serde;
+ tableDesc tblDesc;
+ Configuration conf;
/**
* @param standardOI
* @param serde
*/
- public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde) {
+ public MapJoinObjectCtx(ObjectInspector standardOI, SerDe serde, tableDesc tblDesc, Configuration conf) {
this.standardOI = standardOI;
this.serde = serde;
+ this.tblDesc = tblDesc;
+ this.conf = conf;
}
/**
@@ -104,6 +110,14 @@
return serde;
}
+ public tableDesc getTblDesc() {
+ return tblDesc;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
}
transient static Map<Integer, MapJoinObjectCtx> mapMetadata = new HashMap<Integer, MapJoinObjectCtx>();
@@ -159,7 +173,9 @@
mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
}
- storage.put((byte)posBigTable, new RowContainer());
+ emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
+ RowContainer bigPosRC = getRowContainer(hconf, (byte)posBigTable, order[posBigTable], joinCacheSize);
+ storage.put((byte)posBigTable, bigPosRC);
mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
@@ -214,7 +230,7 @@
new MapJoinObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE),
- keySerializer));
+ keySerializer, keyTableDesc, hconf));
firstRow = false;
}
@@ -240,7 +256,7 @@
boolean needNewKey = true;
if (o == null) {
- res = new RowContainer();
+ res = getRowContainer(this.hconf, (byte)tag, order[tag], joinCacheSize);
res.add(value);
} else {
res = o.getObj();
@@ -266,14 +282,14 @@
new MapJoinObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE),
- valueSerDe));
+ valueSerDe, valueTableDesc, hconf));
}
// Construct externalizable objects for key and value
if ( needNewKey ) {
MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
-
+ valueObj.setConf(this.hconf);
// This may potentially increase the size of the hashmap on the mapper
if (res.size() > mapJoinRowsKey) {
if ( res.size() % 100 == 0 ) {
@@ -295,7 +311,10 @@
MapJoinObjectValue o = (MapJoinObjectValue)mapJoinTables.get(pos).get(keyMap);
if (o == null) {
- storage.put(pos, dummyObjVectors[pos.intValue()]);
+ if(noOuterJoin)
+ storage.put(pos, emptyList);
+ else
+ storage.put(pos, dummyObjVectors[pos.intValue()]);
}
else {
storage.put(pos, o.getObj());
@@ -323,6 +342,7 @@
for (HashMapWrapper hashTable: mapJoinTables.values()) {
hashTable.close();
}
+ super.closeOp(abort);
}
/**
* Implements the getName function for the Node Interface.
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Sat Jan 16 06:44:01 2010
@@ -194,4 +194,9 @@
public int getType() {
return StageType.MAPREDLOCAL;
}
+
+ @Override
+ public String getName() {
+ return "MAPRED";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Jan 16 06:44:01 2010
@@ -180,4 +180,9 @@
public int getType() {
return StageType.MOVE;
}
+
+ @Override
+ public String getName() {
+ return "MOVE";
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sat Jan 16 06:44:01 2010
@@ -799,6 +799,8 @@
transient protected long beginTime = 0;
transient protected long totalTime = 0;
+ transient protected Object groupKeyObject;
+
/**
* this is called before operator process to buffer some counters
*/
@@ -1010,4 +1012,12 @@
assert false;
return -1;
}
+
+ public void setGroupKeyObject(Object keyObject) {
+ this.groupKeyObject = keyObject;
+ }
+
+ public Object getGroupKeyObject() {
+ return groupKeyObject;
+ }
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=899891&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Sat Jan 16 06:44:01 2010
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * At runtime in Join, we output big keys in one table into one corresponding
+ * directories, and all same keys in other tables into different dirs(one for
+ * each table). The directories will look like:
+ * <ul>
+ * <li>
+ * dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys which
+ * is big in T1),dir-T3-keys(containing keys which is big in T1), ...
+ * <li>
+ * dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing
+ * big keys in T2),dir-T3-keys(containing keys which is big in T2), ...
+ * <li>
+ * dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big
+ * keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... .....
+ * </ul>
+ *
+ * <p>
+ * For each skew key, we first write all values to a local tmp file. At the time
+ * of ending the current group, the local tmp file will be uploaded to hdfs.
+ * Right now, we use one file per skew key.
+ *
+ * <p>
+ * For more info, please see
+ * https://issues.apache.org/jira/browse/HIVE-964.
+ *
+ */
+public class SkewJoinHandler {
+
+ static final protected Log LOG = LogFactory.getLog(SkewJoinHandler.class.getName());
+
+ public int currBigKeyTag = -1;
+
+ private int rowNumber = 0;
+ private int currTag = -1;
+
+ private int skewKeyDefinition = -1;
+ private Map<Byte,StructObjectInspector> skewKeysTableObjectInspector = null;
+ private Map<Byte,SerDe> tblSerializers = null;
+ private Map<Byte, tableDesc> tblDesc = null;
+
+ private Map<Byte, Boolean> bigKeysExistingMap = null;
+
+ Configuration hconf = null;
+ List<Object> dummyKey = null;
+ String taskId;
+
+ private CommonJoinOperator<? extends Serializable> joinOp;
+ private int numAliases;
+ private joinDesc conf;
+
+ public SkewJoinHandler(CommonJoinOperator<? extends Serializable> joinOp) {
+ this.joinOp = joinOp;
+ this.numAliases = joinOp.numAliases;
+ this.conf = joinOp.getConf();
+ }
+
+ public void initiliaze(Configuration hconf) {
+ this.hconf = hconf;
+ joinDesc desc = joinOp.getConf();
+ skewKeyDefinition = desc.getSkewKeyDefinition();
+ skewKeysTableObjectInspector = new HashMap<Byte, StructObjectInspector>(
+ numAliases);
+ tblDesc = desc.getSkewKeysValuesTables();
+ tblSerializers = new HashMap<Byte, SerDe>(numAliases);
+ bigKeysExistingMap = new HashMap<Byte, Boolean>(numAliases);
+ taskId = Utilities.getTaskId(hconf);
+
+ for (int i = 0; i < numAliases; i++) {
+ Byte alias = conf.getTagOrder()[i];
+ List<ObjectInspector> skewTableKeyInspectors = new ArrayList<ObjectInspector>();
+ StructObjectInspector soi = (StructObjectInspector) this.joinOp.inputObjInspectors[alias];
+ StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
+ .toString());
+ List<? extends StructField> keyFields = ((StructObjectInspector) sf
+ .getFieldObjectInspector()).getAllStructFieldRefs();
+ int keyFieldSize = keyFields.size();
+ for (int k = 0; k < keyFieldSize; k++) {
+ skewTableKeyInspectors.add(keyFields.get(k).getFieldObjectInspector());
+ }
+ tableDesc joinKeyDesc = desc.getKeyTableDesc();
+ List<String> keyColNames = Utilities.getColumnNames(joinKeyDesc.getProperties());
+ StructObjectInspector structTblKeyInpector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(keyColNames, skewTableKeyInspectors);
+
+ try {
+ SerDe serializer = (SerDe) ReflectionUtils.newInstance(tblDesc.get(
+ alias).getDeserializerClass(), null);
+ serializer.initialize(null, tblDesc.get(alias).getProperties());
+ tblSerializers.put((byte) i, serializer);
+ } catch (SerDeException e) {
+ LOG.error("Skewjoin will be disabled due to " + e.getMessage(), e);
+ this.joinOp.handleSkewJoin = false;
+ break;
+ }
+
+ tableDesc valTblDesc = this.joinOp.getSpillTableDesc(alias);
+ List<String> valColNames = new ArrayList<String>();
+ if (valTblDesc != null)
+ valColNames = Utilities.getColumnNames(valTblDesc.getProperties());
+ StructObjectInspector structTblValInpector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(valColNames,
+ this.joinOp.joinValuesStandardObjectInspectors.get((byte) i));
+
+ StructObjectInspector structTblInpector = ObjectInspectorFactory
+ .getUnionStructObjectInspector(Arrays.asList(new StructObjectInspector[] {
+ structTblValInpector,structTblKeyInpector }));
+ skewKeysTableObjectInspector.put((byte) i, structTblInpector);
+ }
+
+ // reset rowcontainer's serde, objectinspector, and tableDesc.
+ for (int i = 0; i < numAliases; i++) {
+ Byte alias = conf.getTagOrder()[i];
+ RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+ .valueOf((byte) i));
+ if (rc != null) {
+ rc.setSerDe(tblSerializers.get((byte) i), skewKeysTableObjectInspector
+ .get((byte) i));
+ rc.setTableDesc(tblDesc.get(alias));
+ }
+ }
+ }
+
+ void endGroup() throws IOException, HiveException {
+ if(skewKeyInCurrentGroup) {
+
+ String specPath = conf.getBigKeysDirMap().get((byte)currBigKeyTag);
+ RowContainer<ArrayList<Object>> bigKey = joinOp.storage.get(Byte.valueOf((byte)currBigKeyTag));
+ Path outputPath = getOperatorOutputPath(specPath);
+ FileSystem destFs = outputPath.getFileSystem(hconf);
+ bigKey.copyToDFSDirecory(destFs, outputPath);
+
+ for (int i = 0; i < numAliases; i++) {
+ if (((byte)i) == currBigKeyTag) continue;
+ RowContainer<ArrayList<Object>> values = joinOp.storage.get(Byte.valueOf((byte)i));
+ if(values != null) {
+ specPath = conf.getSmallKeysDirMap().get((byte)currBigKeyTag).get((byte)i);
+ values.copyToDFSDirecory(destFs, getOperatorOutputPath(specPath));
+ }
+ }
+ }
+ skewKeyInCurrentGroup = false;
+ }
+
+ boolean skewKeyInCurrentGroup = false;
+ public void handleSkew(int tag) throws HiveException {
+
+ if(joinOp.newGroupStarted || tag != currTag) {
+ rowNumber = 0;
+ currTag = tag;
+ }
+
+ if(joinOp.newGroupStarted) {
+ currBigKeyTag = -1;
+ joinOp.newGroupStarted = false;
+ dummyKey = (List<Object>)joinOp.getGroupKeyObject();
+ skewKeyInCurrentGroup = false;
+
+ for (int i = 0; i < numAliases; i++) {
+ RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte.valueOf((byte)i));
+ if(rc != null) {
+ rc.setKeyObject(dummyKey);
+ }
+ }
+ }
+
+ rowNumber++;
+ if (currBigKeyTag == -1 && (tag < numAliases - 1)
+ && rowNumber >= skewKeyDefinition) {
+ // the first time we see a big key. If this key is not in the last
+ // table (the last table can always be streamed), we define that we get
+ // a skew key now.
+ currBigKeyTag = tag;
+
+ // right now we assume that the group by is an ArrayList object. It may
+ // change in future.
+ if(! (dummyKey instanceof List))
+ throw new RuntimeException("Bug in handle skew key in a seperate job.");
+
+ skewKeyInCurrentGroup = true;
+ bigKeysExistingMap.put(Byte.valueOf((byte)currBigKeyTag), Boolean.TRUE);
+ }
+ }
+
+ public void close(boolean abort) throws HiveException {
+ if (!abort) {
+ try {
+ endGroup();
+ commit();
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ } else {
+ for (int bigKeyTbl = 0; bigKeyTbl < numAliases; bigKeyTbl++) {
+
+ // if we did not see a skew key in this table, continue to next
+ // table
+ if (!bigKeysExistingMap.get((byte) bigKeyTbl))
+ continue;
+
+ try {
+ String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl);
+ Path bigKeyPath = getOperatorOutputPath(specPath);
+ FileSystem fs = bigKeyPath.getFileSystem(hconf);
+ delete(bigKeyPath, fs);
+ for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
+ if (((byte) smallKeyTbl) == bigKeyTbl)
+ continue;
+ specPath = conf.getSmallKeysDirMap().get((byte) bigKeyTbl).get(
+ (byte) smallKeyTbl);
+ delete(getOperatorOutputPath(specPath), fs);
+ }
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+ }
+
+ private void delete(Path operatorOutputPath, FileSystem fs) {
+ try {
+ fs.delete(operatorOutputPath, true);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ private void commit() throws IOException {
+ for (int bigKeyTbl = 0; bigKeyTbl < numAliases; bigKeyTbl++) {
+
+ // if we did not see a skew key in this table, continue to next table
+ // we are trying to avoid an extra call of FileSystem.exists()
+ Boolean existing = bigKeysExistingMap.get(Byte.valueOf((byte)bigKeyTbl));
+ if (existing == null || !existing)
+ continue;
+
+ String specPath = conf.getBigKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl));
+ commitOutputPathToFinalPath(specPath, false);
+ for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
+ if ( smallKeyTbl == bigKeyTbl)
+ continue;
+ specPath = conf.getSmallKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl)).get(
+ Byte.valueOf((byte) smallKeyTbl));
+ // the file may not exist, and we just ignore this
+ commitOutputPathToFinalPath(specPath, true);
+ }
+ }
+ }
+
+ private void commitOutputPathToFinalPath(String specPath,
+ boolean ignoreNonExisting) throws IOException {
+ Path outPath = getOperatorOutputPath(specPath);
+ Path finalPath = getOperatorFinalPath(specPath);
+ FileSystem fs = outPath.getFileSystem(hconf);
+ // for local file system in Hadoop-0.17.2.1, it will throw IOException when
+ // file not existing.
+ try {
+ if (!fs.rename(outPath, finalPath)) {
+ throw new IOException("Unable to rename output to: " + finalPath);
+ }
+ } catch (FileNotFoundException e) {
+ if (!ignoreNonExisting)
+ throw e;
+ } catch (IOException e) {
+ if (!fs.exists(outPath) && ignoreNonExisting)
+ return;
+ throw e;
+ }
+ }
+
+ private Path getOperatorOutputPath(String specPath) throws IOException {
+ Path tmpPath = Utilities.toTempPath(specPath);
+ return new Path(tmpPath, Utilities.toTempPath(taskId));
+ }
+
+ private Path getOperatorFinalPath(String specPath) throws IOException {
+ Path tmpPath = Utilities.toTempPath(specPath);
+ return new Path(tmpPath, taskId);
+ }
+
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Sat Jan 16 06:44:01 2010
@@ -20,8 +20,11 @@
import java.io.*;
import java.util.*;
+
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -36,7 +39,7 @@
* Task implementation
**/
-public abstract class Task <T extends Serializable> implements Serializable {
+public abstract class Task <T extends Serializable> implements Serializable, Node {
private static final long serialVersionUID = 1L;
transient protected boolean started;
@@ -50,7 +53,7 @@
transient protected QueryPlan queryPlan;
transient protected TaskHandle taskHandle;
transient protected Map<String, Long> taskCounters;
-
+ transient protected DriverContext driverContext;
// Bean methods
protected List<Task<? extends Serializable>> childTasks;
@@ -65,7 +68,7 @@
this.taskCounters = new HashMap<String, Long>();
}
- public void initialize (HiveConf conf, QueryPlan queryPlan) {
+ public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
this.queryPlan = queryPlan;
isdone = false;
started = false;
@@ -80,7 +83,8 @@
LOG.error(StringUtils.stringifyException(e));
throw new RuntimeException (e);
}
-
+ this.driverContext = driverContext;
+
console = new LogHelper(LOG);
}
@@ -133,6 +137,10 @@
public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
this.childTasks = childTasks;
}
+
+ public List<? extends Node> getChildren() {
+ return getChildTasks();
+ }
public List<Task<? extends Serializable>> getChildTasks() {
return childTasks;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Jan 16 06:44:01 2010
@@ -54,12 +54,16 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.commons.logging.LogFactory;
@@ -71,7 +75,9 @@
/**
* The object in the reducer are composed of these top level fields
*/
-
+
+ public static String HADOOP_LOCAL_FS = "file:///";
+
public static enum ReduceField { KEY, VALUE, ALIAS };
private static Map<String, mapredWork> gWorkMap=
Collections.synchronizedMap(new HashMap<String, mapredWork>());
@@ -337,6 +343,15 @@
public static tableDesc getTableDesc(Table tbl) {
return (new tableDesc (tbl.getDeserializer().getClass(), tbl.getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema()));
}
+
+ //column names and column types are all delimited by comma
+ public static tableDesc getTableDesc(String cols, String colTypes) {
+ return (new tableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
+ HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
+ }
public static partitionDesc getPartitionDesc(Partition part) throws HiveException {
@@ -835,6 +850,32 @@
}
return names;
}
+
+ public static List<String> getColumnNames(Properties props) {
+ List<String> names = new ArrayList<String>();
+ String colNames = props.getProperty(Constants.LIST_COLUMNS);
+ String[] cols = colNames.trim().split(",");
+ if (cols != null) {
+ for(String col : cols) {
+ if(col!=null && !col.trim().equals(""))
+ names.add(col);
+ }
+ }
+ return names;
+ }
+
+ public static List<String> getColumnTypes(Properties props) {
+ List<String> names = new ArrayList<String>();
+ String colNames = props.getProperty(Constants.LIST_COLUMN_TYPES);
+ String[] cols = colNames.trim().split(",");
+ if (cols != null) {
+ for(String col : cols) {
+ if(col!=null && !col.trim().equals(""))
+ names.add(col);
+ }
+ }
+ return names;
+ }
public static void validateColumnNames(List<String> colNames,
List<String> checkCols) throws SemanticException {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=899891&r1=899890&r2=899891&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Sat Jan 16 06:44:01 2010
@@ -24,8 +24,10 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -39,6 +41,7 @@
transient protected int metadataTag;
transient protected RowContainer obj;
+ transient protected Configuration conf;
public MapJoinObjectValue() {
}
@@ -80,7 +83,9 @@
MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(Integer.valueOf(metadataTag));
int sz = in.readInt();
- RowContainer res = new RowContainer();
+ RowContainer res = new RowContainer(ctx.getConf());
+ res.setSerDe(ctx.getSerDe(), ctx.getStandardOI());
+ res.setTableDesc(ctx.getTblDesc());
for (int pos = 0; pos < sz; pos++) {
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
val.readFields(in);
@@ -122,6 +127,9 @@
catch (SerDeException e) {
throw new IOException(e);
}
+ catch (HiveException e) {
+ throw new IOException(e);
+ }
}
/**
@@ -152,4 +160,8 @@
this.obj = obj;
}
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
}