You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2009/12/30 19:33:25 UTC

svn commit: r894691 - in /hadoop/hive/trunk: CHANGES.txt common/src/java/org/apache/hadoop/hive/conf/HiveConf.java conf/hive-default.xml ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java

Author: heyongqiang
Date: Wed Dec 30 18:33:24 2009
New Revision: 894691

URL: http://svn.apache.org/viewvc?rev=894691&view=rev
Log:
HIVE-1020 create a confifuration variable to control speculative execution for reducers in hive

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Dec 30 18:33:24 2009
@@ -92,6 +92,9 @@
     HIVE-571 Add rename column in a table
     (He Yongqiang via namit)
 
+    HIVE-1020 Create a confifuration variable to control speculative execution for reducers in hive
+    (namit via He Yongqiang)
+
   IMPROVEMENTS
 
     HIVE-760. Add version info to META-INF/MANIFEST.MF.

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Dec 30 18:33:24 2009
@@ -71,7 +71,8 @@
     PREEXECHOOKS("hive.exec.pre.hooks", ""),
     POSTEXECHOOKS("hive.exec.post.hooks", ""),
     EXECPARALLEL("hive.exec.parallel",false), // parallel query launching
-    
+    HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution",true),
+
     // hadoop stuff
     HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"),
     HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"),
@@ -80,6 +81,7 @@
     HADOOPJT("mapred.job.tracker", "local"),
     HADOOPNUMREDUCERS("mapred.reduce.tasks", 1),
     HADOOPJOBNAME("mapred.job.name", null),
+    HADOOPSPECULATIVEEXECREDUCERS("mapred.reduce.tasks.speculative.execution", false),
 
     // MetaStore stuff.
     METASTOREDIRECTORY("hive.metastore.metadb.dir", ""),
@@ -131,10 +133,10 @@
     HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000),
     HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float)0.5),
     HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float)0.5),
-    
+
     // for hive udtf operator
     HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false),
-    
+
     // Default file format for CREATE TABLE statement
     // Options: TextFile, SequenceFile
     HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"),
@@ -165,7 +167,7 @@
     HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false),
     HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long)(256*1000*1000)),
     HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long)(16*1000*1000)),
-    
+
     HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000),
 
     HIVEJOBPROGRESS("hive.task.progress", false),
@@ -176,7 +178,7 @@
     HIVEOPTCP("hive.optimize.cp", true), // column pruner
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
     HIVEOPTGROUPBY("hive.optimize.groupby", true); // optimize group by
-    
+
     public final String varname;
     public final String defaultVal;
     public final int defaultIntVal;

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Wed Dec 30 18:33:24 2009
@@ -401,4 +401,10 @@
   <description>Whether Hive should automatically send progress information to TaskTracker when using UDTF's to prevent the task getting killed because of inactivity.  Users should be cautious because this may prevent TaskTracker from killing tasks with infinte loops.  </description>
 </property>
 
+<property>
+  <name>hive.mapred.reduce.tasks.speculative.execution</name>
+  <value>true</value>
+  <description>Whether speculative execution for reducers should be turned on. </description>
+</property>
+
 </configuration>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=894691&r1=894690&r2=894691&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Dec 30 18:33:24 2009
@@ -67,7 +67,7 @@
   transient protected JobConf job;
   transient protected int mapProgress = 0;
   transient protected int reduceProgress = 0;
-  
+
   public static Random randGen = new Random();
   /**
    * Constructor when invoked from QL
@@ -146,7 +146,7 @@
    * used to kill all running jobs in the event of an unexpected shutdown -
    * i.e., the JVM shuts down while there are still jobs running.
    */
-  public static Map<String, String> runningJobKillURIs 
+  public static Map<String, String> runningJobKillURIs
     = Collections.synchronizedMap(new HashMap<String, String>());
 
   /**
@@ -168,7 +168,7 @@
               String uri = elems.next();
               try {
                 System.err.println("killing job with: " + uri);
-                java.net.HttpURLConnection conn = (java.net.HttpURLConnection) 
+                java.net.HttpURLConnection conn = (java.net.HttpURLConnection)
                   new java.net.URL(uri).openConnection();
                 conn.setRequestMethod("POST");
                 int retCode = conn.getResponseCode();
@@ -450,6 +450,10 @@
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());
     job.setReducerClass(ExecReducer.class);
 
+    // Turn on speculative execution for reducers
+    HiveConf.setVar(job,HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+                    HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
+
     String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
     if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat)))
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
@@ -608,12 +612,12 @@
     Map<String, Integer> failures = new HashMap<String, Integer>();
     Set<String> successes = new HashSet<String> ();
     Map<String, String> taskToJob = new HashMap<String,String>();
-    
+
     int startIndex = 0;
-    
+
     while(true) {
       TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
-      
+
       if(taskCompletions == null || taskCompletions.length == 0) {
         break;
       }
@@ -629,11 +633,11 @@
           more = false;
           break;
         }
-        
+
         String taskId = taskJobIds[0];
         String jobId = taskJobIds[1];
         taskToJob.put(taskId, jobId);
-        
+
         if(t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
           Integer failAttempts = failures.get(taskId);
           if(failAttempts == null) {
@@ -648,28 +652,28 @@
       if(!more) {
         break;
       }
-      startIndex += taskCompletions.length;      
+      startIndex += taskCompletions.length;
     }
     // Remove failures for tasks that succeeded
     for(String task : successes) {
       failures.remove(task);
     }
- 
+
     if(failures.keySet().size() == 0) {
       return;
     }
-    
+
     // Find the highest failure count
     int maxFailures = 0;
     for(Integer failCount : failures.values()) {
       if(maxFailures < failCount.intValue())
         maxFailures = failCount.intValue();
     }
-    
+
     // Display Error Message for tasks with the highest failure count
     console.printError("\nFailed tasks with most" + "(" + maxFailures + ")" + " failures " + ": ");
     String jtUrl = JobTrackerURLResolver.getURL(conf);
-    
+
     for(String task : failures.keySet()) {
       if(failures.get(task).intValue() == maxFailures) {
         String jobId = taskToJob.get(task);