You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2009/12/30 19:33:25 UTC
svn commit: r894691 - in /hadoop/hive/trunk: CHANGES.txt
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
conf/hive-default.xml
ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Author: heyongqiang
Date: Wed Dec 30 18:33:24 2009
New Revision: 894691
URL: http://svn.apache.org/viewvc?rev=894691&view=rev
Log:
HIVE-1020 create a confifuration variable to control speculative execution for reducers in hive
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Dec 30 18:33:24 2009
@@ -92,6 +92,9 @@
HIVE-571 Add rename column in a table
(He Yongqiang via namit)
+ HIVE-1020 Create a confifuration variable to control speculative execution for reducers in hive
+ (namit via He Yongqiang)
+
IMPROVEMENTS
HIVE-760. Add version info to META-INF/MANIFEST.MF.
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Dec 30 18:33:24 2009
@@ -71,7 +71,8 @@
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
EXECPARALLEL("hive.exec.parallel",false), // parallel query launching
-
+ HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution",true),
+
// hadoop stuff
HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"),
HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"),
@@ -80,6 +81,7 @@
HADOOPJT("mapred.job.tracker", "local"),
HADOOPNUMREDUCERS("mapred.reduce.tasks", 1),
HADOOPJOBNAME("mapred.job.name", null),
+ HADOOPSPECULATIVEEXECREDUCERS("mapred.reduce.tasks.speculative.execution", false),
// MetaStore stuff.
METASTOREDIRECTORY("hive.metastore.metadb.dir", ""),
@@ -131,10 +133,10 @@
HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float)0.5),
HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float)0.5),
-
+
// for hive udtf operator
HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false),
-
+
// Default file format for CREATE TABLE statement
// Options: TextFile, SequenceFile
HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"),
@@ -165,7 +167,7 @@
HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false),
HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long)(256*1000*1000)),
HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)),
-
+
HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
HIVEJOBPROGRESS("hive.task.progress", false),
@@ -176,7 +178,7 @@
HIVEOPTCP("hive.optimize.cp", true), // column pruner
HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
HIVEOPTGROUPBY("hive.optimize.groupby", true); // optimize group by
-
+
public final String varname;
public final String defaultVal;
public final int defaultIntVal;
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Wed Dec 30 18:33:24 2009
@@ -401,4 +401,10 @@
<description>Whether Hive should automatically send progress information to TaskTracker when using UDTF's to prevent the task getting killed because of inactivity. Users should be cautious because this may prevent TaskTracker from killing tasks with infinte loops. </description>
</property>
+<property>
+ <name>hive.mapred.reduce.tasks.speculative.execution</name>
+ <value>true</value>
+ <description>Whether speculative execution for reducers should be turned on. </description>
+</property>
+
</configuration>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Dec 30 18:33:24 2009
@@ -67,7 +67,7 @@
transient protected JobConf job;
transient protected int mapProgress = 0;
transient protected int reduceProgress = 0;
-
+
public static Random randGen = new Random();
/**
* Constructor when invoked from QL
@@ -146,7 +146,7 @@
* used to kill all running jobs in the event of an unexpected shutdown -
* i.e., the JVM shuts down while there are still jobs running.
*/
- public static Map<String, String> runningJobKillURIs
+ public static Map<String, String> runningJobKillURIs
= Collections.synchronizedMap(new HashMap<String, String>());
/**
@@ -168,7 +168,7 @@
String uri = elems.next();
try {
System.err.println("killing job with: " + uri);
- java.net.HttpURLConnection conn = (java.net.HttpURLConnection)
+ java.net.HttpURLConnection conn = (java.net.HttpURLConnection)
new java.net.URL(uri).openConnection();
conn.setRequestMethod("POST");
int retCode = conn.getResponseCode();
@@ -450,6 +450,10 @@
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
job.setReducerClass(ExecReducer.class);
+ // Turn on speculative execution for reducers
+ HiveConf.setVar(job,HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
+
String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat)))
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
@@ -608,12 +612,12 @@
Map<String, Integer> failures = new HashMap<String, Integer>();
Set<String> successes = new HashSet<String> ();
Map<String, String> taskToJob = new HashMap<String,String>();
-
+
int startIndex = 0;
-
+
while(true) {
TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
-
+
if(taskCompletions == null || taskCompletions.length == 0) {
break;
}
@@ -629,11 +633,11 @@
more = false;
break;
}
-
+
String taskId = taskJobIds[0];
String jobId = taskJobIds[1];
taskToJob.put(taskId, jobId);
-
+
if(t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
Integer failAttempts = failures.get(taskId);
if(failAttempts == null) {
@@ -648,28 +652,28 @@
if(!more) {
break;
}
- startIndex += taskCompletions.length;
+ startIndex += taskCompletions.length;
}
// Remove failures for tasks that succeeded
for(String task : successes) {
failures.remove(task);
}
-
+
if(failures.keySet().size() == 0) {
return;
}
-
+
// Find the highest failure count
int maxFailures = 0;
for(Integer failCount : failures.values()) {
if(maxFailures < failCount.intValue())
maxFailures = failCount.intValue();
}
-
+
// Display Error Message for tasks with the highest failure count
console.printError("\nFailed tasks with most" + "(" + maxFailures + ")" + " failures " + ": ");
String jtUrl = JobTrackerURLResolver.getURL(conf);
-
+
for(String task : failures.keySet()) {
if(failures.get(task).intValue() == maxFailures) {
String jobId = taskToJob.get(task);