You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC

svn commit: r901644 [4/37] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jan 21 10:37:58 2010
@@ -18,48 +18,62 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
-import java.net.URLEncoder;
 import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
 
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.lang.StringUtils;
-
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
-import org.apache.hadoop.hive.ql.io.*;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.varia.NullAppender;
-import java.lang.ClassNotFoundException;
-import org.apache.hadoop.hive.common.FileUtils;
 
 public class ExecDriver extends Task<mapredWork> implements Serializable {
 
@@ -69,8 +83,9 @@
   transient protected int mapProgress = 0;
   transient protected int reduceProgress = 0;
   transient protected boolean success = false; // if job execution is successful
-  
+
   public static Random randGen = new Random();
+
   /**
    * Constructor when invoked from QL
    */
@@ -78,7 +93,8 @@
     super();
   }
 
-  public static String getResourceFiles(Configuration conf, SessionState.ResourceType t) {
+  public static String getResourceFiles(Configuration conf,
+      SessionState.ResourceType t) {
     // fill in local files to be added to the task environment
     SessionState ss = SessionState.get();
     Set<String> files = (ss == null) ? null : ss.list_resource(t, null);
@@ -109,7 +125,8 @@
    * Initialization when invoked from QL
    */
   @Override
-  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+  public void initialize(HiveConf conf, QueryPlan queryPlan,
+      DriverContext driverContext) {
     super.initialize(conf, queryPlan, driverContext);
     job = new JobConf(conf, ExecDriver.class);
     // NOTE: initialize is only called if it is in non-local mode.
@@ -127,7 +144,8 @@
     if (StringUtils.isNotBlank(addedJars)) {
       HiveConf.setVar(job, ConfVars.HIVEADDEDJARS, addedJars);
     }
-    String addedArchives = getResourceFiles(job, SessionState.ResourceType.ARCHIVE);
+    String addedArchives = getResourceFiles(job,
+        SessionState.ResourceType.ARCHIVE);
     if (StringUtils.isNotBlank(addedArchives)) {
       HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives);
     }
@@ -149,35 +167,35 @@
    * used to kill all running jobs in the event of an unexpected shutdown -
    * i.e., the JVM shuts down while there are still jobs running.
    */
-  public static Map<String, String> runningJobKillURIs
-    = Collections.synchronizedMap(new HashMap<String, String>());
+  public static Map<String, String> runningJobKillURIs = Collections
+      .synchronizedMap(new HashMap<String, String>());
 
   /**
    * In Hive, when the user control-c's the command line, any running jobs
    * spawned from that command line are best-effort killed.
-   *
+   * 
    * This static constructor registers a shutdown thread to iterate over all the
    * running job kill URLs and do a get on them.
-   *
+   * 
    */
   static {
     if (new org.apache.hadoop.conf.Configuration().getBoolean(
         "webinterface.private.actions", false)) {
       Runtime.getRuntime().addShutdownHook(new Thread() {
+        @Override
         public void run() {
-          synchronized(runningJobKillURIs) {
-            for (Iterator<String> elems = runningJobKillURIs.values().iterator(); elems
-                .hasNext();) {
-              String uri = elems.next();
+          synchronized (runningJobKillURIs) {
+            for (String uri : runningJobKillURIs.values()) {
               try {
                 System.err.println("killing job with: " + uri);
-                java.net.HttpURLConnection conn = (java.net.HttpURLConnection)
-                  new java.net.URL(uri).openConnection();
+                java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(
+                    uri).openConnection();
                 conn.setRequestMethod("POST");
                 int retCode = conn.getResponseCode();
                 if (retCode != 200) {
-                  System.err.println("Got an error trying to kill job with URI: "
-                      + uri + " = " + retCode);
+                  System.err
+                      .println("Got an error trying to kill job with URI: "
+                          + uri + " = " + retCode);
                 }
               } catch (Exception e) {
                 System.err.println("trying to kill job, caught: " + e);
@@ -200,11 +218,11 @@
       String hp = job.get("mapred.job.tracker");
       if (SessionState.get() != null) {
         SessionState.get().getHiveHistory().setTaskProperty(
-            SessionState.get().getQueryId(), getId(),
-            Keys.TASK_HADOOP_ID, rj.getJobID());
+            SessionState.get().getQueryId(), getId(), Keys.TASK_HADOOP_ID,
+            rj.getJobID());
       }
-      console.printInfo(ExecDriver.getJobEndMsg(rj.getJobID()) + ", Tracking URL = "
-          + rj.getTrackingURL());
+      console.printInfo(ExecDriver.getJobEndMsg(rj.getJobID())
+          + ", Tracking URL = " + rj.getTrackingURL());
       console.printInfo("Kill Command = "
           + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
           + " job  -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
@@ -212,66 +230,73 @@
   }
 
   /**
-   * This class contains the state of the running task
-   * Going forward, we will return this handle from execute
-   * and Driver can split execute into start, monitorProgess and postProcess
+   * This class contains the state of the running task Going forward, we will
+   * return this handle from execute and Driver can split execute into start,
+   * monitorProgess and postProcess
    */
   public static class ExecDriverTaskHandle extends TaskHandle {
     JobClient jc;
     RunningJob rj;
+
     JobClient getJobClient() {
       return jc;
     }
+
     RunningJob getRunningJob() {
       return rj;
     }
+
     public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
       this.jc = jc;
       this.rj = rj;
     }
+
     public void setRunningJob(RunningJob job) {
-      this.rj = job;
+      rj = job;
     }
+
     public Counters getCounters() throws IOException {
       return rj.getCounters();
     }
   }
-  
+
   /**
    * Fatal errors are those errors that cannot be recovered by retries. These
-   * are application dependent. Examples of fatal errors include:
-   *  - the small table in the map-side joins is too large to be feasible to be
-   *    handled by one mapper. The job should fail and the user should be warned
-   *    to use regular joins rather than map-side joins.
-   * Fatal errors are indicated by counters that are set at execution time. 
-   * If the counter is non-zero, a fatal error occurred. The value of the counter
-   * indicates the error type. 
-   * @return true if fatal errors happened during job execution, false otherwise.
+   * are application dependent. Examples of fatal errors include: - the small
+   * table in the map-side joins is too large to be feasible to be handled by
+   * one mapper. The job should fail and the user should be warned to use
+   * regular joins rather than map-side joins. Fatal errors are indicated by
+   * counters that are set at execution time. If the counter is non-zero, a
+   * fatal error occurred. The value of the counter indicates the error type.
+   * 
+   * @return true if fatal errors happened during job execution, false
+   *         otherwise.
    */
   protected boolean checkFatalErrors(TaskHandle t, StringBuffer errMsg) {
     ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
     RunningJob rj = th.getRunningJob();
     try {
       Counters ctrs = th.getCounters();
-      for (Operator<? extends Serializable> op: work.getAliasToWork().values()) {
-        if (op.checkFatalErrors(ctrs, errMsg))
+      for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+        if (op.checkFatalErrors(ctrs, errMsg)) {
           return true;
+        }
       }
       return false;
-     } catch (IOException e) {
+    } catch (IOException e) {
       // this exception can be tolerated
       e.printStackTrace();
       return false;
     }
   }
-  
+
   public void progress(TaskHandle taskHandle) throws IOException {
-    ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle;
+    ExecDriverTaskHandle th = (ExecDriverTaskHandle) taskHandle;
     JobClient jc = th.getJobClient();
     RunningJob rj = th.getRunningJob();
     String lastReport = "";
-    SimpleDateFormat dateFormat
-        = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+    SimpleDateFormat dateFormat = new SimpleDateFormat(
+        "yyyy-MM-dd HH:mm:ss,SSS");
     long reportTime = System.currentTimeMillis();
     long maxReportInterval = 60 * 1000; // One minute
     boolean fatal = false;
@@ -282,38 +307,41 @@
       } catch (InterruptedException e) {
       }
       th.setRunningJob(jc.getJob(rj.getJobID()));
-      
+
       // If fatal errors happen we should kill the job immediately rather than
       // let the job retry several times, which eventually lead to failure.
-      if (fatal)
-        continue;  // wait until rj.isComplete
-      if ( fatal = checkFatalErrors(th, errMsg)) {
+      if (fatal) {
+        continue; // wait until rj.isComplete
+      }
+      if (fatal = checkFatalErrors(th, errMsg)) {
         success = false;
-        console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job.");
+        console.printError("[Fatal Error] " + errMsg.toString()
+            + ". Killing the job.");
         rj.killJob();
         continue;
       }
       errMsg.setLength(0);
-      
+
       updateCounters(th);
 
-      String report = " "+getId()+" map = " + this.mapProgress + "%,  reduce = " + this.reduceProgress + "%";
-      
+      String report = " " + getId() + " map = " + mapProgress + "%,  reduce = "
+          + reduceProgress + "%";
+
       if (!report.equals(lastReport)
           || System.currentTimeMillis() >= reportTime + maxReportInterval) {
 
         // write out serialized plan with counters to log file
         // LOG.info(queryPlan);
-        String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
+        String output = dateFormat.format(Calendar.getInstance().getTime())
+            + report;
         SessionState ss = SessionState.get();
         if (ss != null) {
-          ss.getHiveHistory().setTaskCounters(
-              SessionState.get().getQueryId(), getId(), rj);
-          ss.getHiveHistory().setTaskProperty(
-              SessionState.get().getQueryId(), getId(),
-              Keys.TASK_HADOOP_PROGRESS, output);
-          ss.getHiveHistory().progressTask(
-              SessionState.get().getQueryId(), this);
+          ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(),
+              getId(), rj);
+          ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
+              getId(), Keys.TASK_HADOOP_PROGRESS, output);
+          ss.getHiveHistory().progressTask(SessionState.get().getQueryId(),
+              this);
           ss.getHiveHistory().logPlanProgress(queryPlan);
         }
         console.printInfo(output);
@@ -321,13 +349,15 @@
         reportTime = System.currentTimeMillis();
       }
     }
-    // check for fatal error again in case it occurred after the last check before the job is completed
-    if ( !fatal && (fatal = checkFatalErrors(th, errMsg))) {
+    // check for fatal error again in case it occurred after the last check
+    // before the job is completed
+    if (!fatal && (fatal = checkFatalErrors(th, errMsg))) {
       console.printError("[Fatal Error] " + errMsg.toString());
       success = false;
-    } else 
+    } else {
       success = rj.isSuccessful();
- 
+    }
+
     setDone();
     th.setRunningJob(jc.getJob(rj.getJobID()));
     updateCounters(th);
@@ -335,15 +365,17 @@
     if (ss != null) {
       ss.getHiveHistory().logPlanProgress(queryPlan);
     }
-    //LOG.info(queryPlan);
+    // LOG.info(queryPlan);
   }
 
   /**
    * Estimate the number of reducers needed for this job, based on job input,
    * and configuration parameters.
+   * 
    * @return the number of reducers.
    */
-  public int estimateNumberOfReducers(HiveConf hive, JobConf job, mapredWork work) throws IOException {
+  public int estimateNumberOfReducers(HiveConf hive, JobConf job,
+      mapredWork work) throws IOException {
     if (hive == null) {
       hive = new HiveConf();
     }
@@ -351,10 +383,10 @@
     int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
     long totalInputFileSize = getTotalInputFileSize(job, work);
 
-    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
-        + " totalInputFileSize=" + totalInputFileSize);
+    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
 
-    int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
     reducers = Math.max(1, reducers);
     reducers = Math.min(maxReducers, reducers);
     return reducers;
@@ -367,41 +399,54 @@
     // this is a temporary hack to fix things that are not fixed in the compiler
     Integer numReducersFromWork = work.getNumReduceTasks();
 
-    if(work.getReducer() == null) {
-      console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
+    if (work.getReducer() == null) {
+      console
+          .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
       work.setNumReduceTasks(Integer.valueOf(0));
     } else {
       if (numReducersFromWork >= 0) {
-        console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
+        console.printInfo("Number of reduce tasks determined at compile time: "
+            + work.getNumReduceTasks());
       } else if (job.getNumReduceTasks() > 0) {
         int reducers = job.getNumReduceTasks();
         work.setNumReduceTasks(reducers);
-        console.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + reducers);
+        console
+            .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+                + reducers);
       } else {
         int reducers = estimateNumberOfReducers(conf, job, work);
         work.setNumReduceTasks(reducers);
-        console.printInfo("Number of reduce tasks not specified. Estimated from input data size: " + reducers);
+        console
+            .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+                + reducers);
 
       }
-      console.printInfo("In order to change the average load for a reducer (in bytes):");
-      console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
+      console
+          .printInfo("In order to change the average load for a reducer (in bytes):");
+      console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
+          + "=<number>");
       console.printInfo("In order to limit the maximum number of reducers:");
-      console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
+      console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname
+          + "=<number>");
       console.printInfo("In order to set a constant number of reducers:");
-      console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
+      console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
+          + "=<number>");
     }
   }
 
   /**
    * Calculate the total size of input files.
-   * @param job the hadoop job conf.
+   * 
+   * @param job
+   *          the hadoop job conf.
    * @return the total size in bytes.
    * @throws IOException
    */
-  public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
+  public long getTotalInputFileSize(JobConf job, mapredWork work)
+      throws IOException {
     long r = 0;
     // For each input path, calculate the total size.
-    for (String path: work.getPathToAliases().keySet()) {
+    for (String path : work.getPathToAliases().keySet()) {
       try {
         Path p = new Path(path);
         FileSystem fs = p.getFileSystem(job);
@@ -419,14 +464,16 @@
    */
   @Override
   public void updateCounters(TaskHandle t) throws IOException {
-    ExecDriverTaskHandle th = (ExecDriverTaskHandle)t;
+    ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
     RunningJob rj = th.getRunningJob();
-    this.mapProgress = Math.round(rj.mapProgress() * 100);
-    this.reduceProgress = Math.round(rj.reduceProgress() * 100);
-    taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long.valueOf(this.mapProgress));
-    taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long.valueOf(this.reduceProgress));
+    mapProgress = Math.round(rj.mapProgress() * 100);
+    reduceProgress = Math.round(rj.reduceProgress() * 100);
+    taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long
+        .valueOf(mapProgress));
+    taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long
+        .valueOf(reduceProgress));
     Counters ctrs = th.getCounters();
-    for (Operator<? extends Serializable> op: work.getAliasToWork().values()) {
+    for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
       op.updateCounters(ctrs);
     }
     if (work.getReducer() != null) {
@@ -450,21 +497,20 @@
     return reduceProgress == 100;
   }
 
-
   /**
    * Execute a query plan using Hadoop
    */
   public int execute() {
-    
+
     success = true;
-    
+
     try {
       setNumberOfReducers();
-    } catch(IOException e) {
+    } catch (IOException e) {
       String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
-        + e.getMessage();
+          + e.getMessage();
       console.printError(statusMesg, "\n"
-                         + org.apache.hadoop.util.StringUtils.stringifyException(e));
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
       return 1;
     }
 
@@ -473,16 +519,17 @@
       throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
     }
 
-
     String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
-    String jobScratchDirStr = hiveScratchDir + File.separator+ Utilities.randGen.nextInt();
-    Path   jobScratchDir = new Path(jobScratchDirStr);
+    String jobScratchDirStr = hiveScratchDir + File.separator
+        + Utilities.randGen.nextInt();
+    Path jobScratchDir = new Path(jobScratchDirStr);
     String emptyScratchDirStr = null;
-    Path   emptyScratchDir    = null;
+    Path emptyScratchDir = null;
 
     int numTries = 3;
     while (numTries > 0) {
-      emptyScratchDirStr = hiveScratchDir + File.separator + Utilities.randGen.nextInt();
+      emptyScratchDirStr = hiveScratchDir + File.separator
+          + Utilities.randGen.nextInt();
       emptyScratchDir = new Path(emptyScratchDirStr);
 
       try {
@@ -490,10 +537,12 @@
         fs.mkdirs(emptyScratchDir);
         break;
       } catch (Exception e) {
-        if (numTries > 0)
+        if (numTries > 0) {
           numTries--;
-        else
-          throw new RuntimeException("Failed to make dir " + emptyScratchDir.toString() + " : " + e.getMessage());
+        } else {
+          throw new RuntimeException("Failed to make dir "
+              + emptyScratchDir.toString() + " : " + e.getMessage());
+        }
       }
     }
 
@@ -507,17 +556,19 @@
     job.setReducerClass(ExecReducer.class);
 
     // Turn on speculative execution for reducers
-    HiveConf.setVar(job,HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
-                    HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
+    HiveConf.setVar(job, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+        HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
 
     String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
-    if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat)))
+    if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+    }
 
     LOG.info("Using " + inpFormat);
 
     try {
-      job.setInputFormat((Class<? extends InputFormat>)(Class.forName(inpFormat)));
+      job.setInputFormat((Class<? extends InputFormat>) (Class
+          .forName(inpFormat)));
     } catch (ClassNotFoundException e) {
       throw new RuntimeException(e.getMessage());
     }
@@ -526,14 +577,14 @@
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
 
-    // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands it
+    // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands
+    // it
     String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
     String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
     if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
-      String allJars =
-        StringUtils.isNotBlank(auxJars)
-        ? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
-        : addedJars;
+      String allJars = StringUtils.isNotBlank(auxJars) ? (StringUtils
+          .isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
+          : addedJars;
       LOG.info("adding libjars: " + allJars);
       initializeFiles("tmpjars", allJars);
     }
@@ -544,7 +595,8 @@
       initializeFiles("tmpfiles", addedFiles);
     }
     // Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it
-    String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES);
+    String addedArchives = HiveConf.getVar(job,
+        HiveConf.ConfVars.HIVEADDEDARCHIVES);
     if (StringUtils.isNotBlank(addedArchives)) {
       initializeFiles("tmparchives", addedArchives);
     }
@@ -552,12 +604,13 @@
     int returnVal = 0;
     RunningJob rj = null, orig_rj = null;
 
-    boolean noName = StringUtils.isEmpty(HiveConf.
-      getVar(job,HiveConf.ConfVars.HADOOPJOBNAME));
+    boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
+        HiveConf.ConfVars.HADOOPJOBNAME));
 
-    if(noName) {
+    if (noName) {
       // This is for a special case to ensure unit tests pass
-      HiveConf.setVar(job,HiveConf.ConfVars.HADOOPJOBNAME, "JOB"+randGen.nextInt());
+      HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB"
+          + randGen.nextInt());
     }
 
     try {
@@ -565,20 +618,22 @@
 
       Utilities.setMapRedWork(job, work);
 
-      // remove the pwd from conf file so that job tracker doesn't show this logs
+      // remove the pwd from conf file so that job tracker doesn't show this
+      // logs
       String pwd = job.get(HiveConf.ConfVars.METASTOREPWD.varname);
-      if (pwd != null)
+      if (pwd != null) {
         job.set(HiveConf.ConfVars.METASTOREPWD.varname, "HIVE");
+      }
       JobClient jc = new JobClient(job);
 
-
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
       orig_rj = rj = jc.submitJob(job);
       // replace it back
-      if (pwd != null)
+      if (pwd != null) {
         job.set(HiveConf.ConfVars.METASTOREPWD.varname, pwd);
+      }
 
       // add to list of running jobs so in case of abnormal shutdown can kill
       // it.
@@ -590,11 +645,12 @@
       progress(th); // success status will be setup inside progress
 
       if (rj == null) {
-        // in the corner case where the running job has disappeared from JT memory
+        // in the corner case where the running job has disappeared from JT
+        // memory
         // remember that we did actually submit the job.
         rj = orig_rj;
         success = false;
-      } 
+      }
 
       String statusMesg = getJobEndMsg(rj.getJobID());
       if (!success) {
@@ -636,41 +692,44 @@
 
     try {
       if (rj != null) {
-        if(work.getAliasToWork() != null) {
-          for(Operator<? extends Serializable> op:
-                work.getAliasToWork().values()) {
+        if (work.getAliasToWork() != null) {
+          for (Operator<? extends Serializable> op : work.getAliasToWork()
+              .values()) {
             op.jobClose(job, success);
           }
         }
-        if(work.getReducer() != null) {
+        if (work.getReducer() != null) {
           work.getReducer().jobClose(job, success);
         }
       }
     } catch (Exception e) {
       // jobClose needs to execute successfully otherwise fail task
-      if(success) {
+      if (success) {
         success = false;
         returnVal = 3;
-        String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
+        String mesg = "Job Commit failed with exception '"
+            + Utilities.getNameMessage(e) + "'";
         console.printError(mesg, "\n"
-                           + org.apache.hadoop.util.StringUtils.stringifyException(e));
+            + org.apache.hadoop.util.StringUtils.stringifyException(e));
       }
     }
 
     return (returnVal);
   }
-  
+
   /**
    * this msg pattern is used to track when a job is started
+   * 
    * @param jobId
    * @return
    */
   public static String getJobStartMsg(String jobId) {
     return "Starting Job = " + jobId;
   }
-  
+
   /**
    * this msg pattern is used to track when a job is successfully done.
+   * 
    * @param jobId
    * @return
    */
@@ -678,29 +737,33 @@
     return "Ended Job = " + jobId;
   }
 
-  private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
+  private void showJobFailDebugInfo(JobConf conf, RunningJob rj)
+      throws IOException {
 
     Map<String, Integer> failures = new HashMap<String, Integer>();
-    Set<String> successes = new HashSet<String> ();
-    Map<String, String> taskToJob = new HashMap<String,String>();
+    Set<String> successes = new HashSet<String>();
+    Map<String, String> taskToJob = new HashMap<String, String>();
 
     int startIndex = 0;
 
-    while(true) {
-      TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
+    while (true) {
+      TaskCompletionEvent[] taskCompletions = rj
+          .getTaskCompletionEvents(startIndex);
 
-      if(taskCompletions == null || taskCompletions.length == 0) {
+      if (taskCompletions == null || taskCompletions.length == 0) {
         break;
       }
 
       boolean more = true;
-      for(TaskCompletionEvent t : taskCompletions) {
-        // getTaskJobIDs return Strings for compatibility with Hadoop version without
+      for (TaskCompletionEvent t : taskCompletions) {
+        // getTaskJobIDs return Strings for compatibility with Hadoop version
+        // without
         // TaskID or TaskAttemptID
-        String [] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
+        String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
 
-        if(taskJobIds == null) {
-          console.printError("Task attempt info is unavailable in this Hadoop version");
+        if (taskJobIds == null) {
+          console
+              .printError("Task attempt info is unavailable in this Hadoop version");
           more = false;
           break;
         }
@@ -709,9 +772,9 @@
         String jobId = taskJobIds[1];
         taskToJob.put(taskId, jobId);
 
-        if(t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
+        if (t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
           Integer failAttempts = failures.get(taskId);
-          if(failAttempts == null) {
+          if (failAttempts == null) {
             failAttempts = Integer.valueOf(0);
           }
           failAttempts = Integer.valueOf(failAttempts.intValue() + 1);
@@ -720,36 +783,39 @@
           successes.add(taskId);
         }
       }
-      if(!more) {
+      if (!more) {
         break;
       }
       startIndex += taskCompletions.length;
     }
     // Remove failures for tasks that succeeded
-    for(String task : successes) {
+    for (String task : successes) {
       failures.remove(task);
     }
 
-    if(failures.keySet().size() == 0) {
+    if (failures.keySet().size() == 0) {
       return;
     }
 
     // Find the highest failure count
     int maxFailures = 0;
-    for(Integer failCount : failures.values()) {
-      if(maxFailures < failCount.intValue())
+    for (Integer failCount : failures.values()) {
+      if (maxFailures < failCount.intValue()) {
         maxFailures = failCount.intValue();
+      }
     }
 
     // Display Error Message for tasks with the highest failure count
-    console.printError("\nFailed tasks with most" + "(" + maxFailures + ")" + " failures " + ": ");
+    console.printError("\nFailed tasks with most" + "(" + maxFailures + ")"
+        + " failures " + ": ");
     String jtUrl = JobTrackerURLResolver.getURL(conf);
 
-    for(String task : failures.keySet()) {
-      if(failures.get(task).intValue() == maxFailures) {
+    for (String task : failures.keySet()) {
+      if (failures.get(task).intValue() == maxFailures) {
         String jobId = taskToJob.get(task);
-        String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid=" + task.toString();
-        console.printError("Task URL: " + taskUrl +"\n");
+        String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid="
+            + task.toString();
+        console.printError("Task URL: " + taskUrl + "\n");
         // Only print out one task because that's good enough for debugging.
         break;
       }
@@ -845,14 +911,18 @@
         // see also - code in CliDriver.java
         ClassLoader loader = conf.getClassLoader();
         if (StringUtils.isNotBlank(auxJars)) {
-          loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
+          loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars,
+              ","));
         }
         if (StringUtils.isNotBlank(addedJars)) {
-          loader = Utilities.addToClassPath(loader, StringUtils.split(addedJars, ","));
+          loader = Utilities.addToClassPath(loader, StringUtils.split(
+              addedJars, ","));
         }
         conf.setClassLoader(loader);
-        // Also set this to the Thread ContextClassLoader, so new threads will inherit
-        // this class loader, and propagate into newly created Configurations by those
+        // Also set this to the Thread ContextClassLoader, so new threads will
+        // inherit
+        // this class loader, and propagate into newly created Configurations by
+        // those
         // new threads.
         Thread.currentThread().setContextClassLoader(loader);
       } catch (Exception e) {
@@ -887,8 +957,9 @@
         String oneProp = (String) one;
 
         if (localMode
-            && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir)))
+            && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
           continue;
+        }
 
         String oneValue = deltaP.getProperty(oneProp);
 
@@ -941,8 +1012,9 @@
 
     if (inpFs.exists(dirPath)) {
       FileStatus[] fStats = inpFs.listStatus(dirPath);
-      if (fStats.length > 0)
+      if (fStats.length > 0) {
         return false;
+      }
     }
     return true;
   }
@@ -950,18 +1022,22 @@
   /**
    * Handle a empty/null path for a given alias
    */
-  private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths,
-                           boolean isEmptyPath, String alias) throws Exception {
+  private int addInputPath(String path, JobConf job, mapredWork work,
+      String hiveScratchDir, int numEmptyPaths, boolean isEmptyPath,
+      String alias) throws Exception {
     // either the directory does not exist or it is empty
     assert path == null || isEmptyPath;
 
     // The input file does not exist, replace it by a empty file
     Class<? extends HiveOutputFormat> outFileFormat = null;
 
-    if (isEmptyPath)
-      outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc().getOutputFileFormatClass();
-    else
-      outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc().getOutputFileFormatClass();
+    if (isEmptyPath) {
+      outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc()
+          .getOutputFileFormatClass();
+    } else {
+      outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc()
+          .getOutputFileFormatClass();
+    }
 
     // create a dummy empty file in a new directory
     String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
@@ -974,13 +1050,13 @@
     LOG.info("Changed input file to " + newPath.toString());
 
     // toggle the work
-    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    LinkedHashMap<String, ArrayList<String>> pathToAliases = work
+        .getPathToAliases();
     if (isEmptyPath) {
       assert path != null;
       pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
       pathToAliases.remove(path);
-    }
-    else {
+    } else {
       assert path == null;
       ArrayList<String> newList = new ArrayList<String>();
       newList.add(alias);
@@ -989,25 +1065,28 @@
 
     work.setPathToAliases(pathToAliases);
 
-    LinkedHashMap<String,partitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+    LinkedHashMap<String, partitionDesc> pathToPartitionInfo = work
+        .getPathToPartitionInfo();
     if (isEmptyPath) {
-      pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
+      pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo
+          .get(path));
       pathToPartitionInfo.remove(path);
-    }
-    else {
+    } else {
       partitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
       pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
     }
     work.setPathToPartitionInfo(pathToPartitionInfo);
 
     String onefile = newPath.toString();
-    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, new Properties(), null);
+    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(
+        job, newFilePath, Text.class, false, new Properties(), null);
     recWriter.close(false);
     FileInputFormat.addInputPaths(job, onefile);
     return numEmptyPaths;
   }
 
-  private void addInputPaths(JobConf job, mapredWork work, String hiveScratchDir) throws Exception {
+  private void addInputPaths(JobConf job, mapredWork work, String hiveScratchDir)
+      throws Exception {
     int numEmptyPaths = 0;
 
     List<String> pathsProcessed = new ArrayList<String>();
@@ -1015,7 +1094,7 @@
     // AliasToWork contains all the aliases
     for (String oneAlias : work.getAliasToWork().keySet()) {
       LOG.info("Processing alias " + oneAlias);
-      List<String> emptyPaths     = new ArrayList<String>();
+      List<String> emptyPaths = new ArrayList<String>();
 
       // The alias may not have any path
       String path = null;
@@ -1024,31 +1103,41 @@
         if (aliases.contains(oneAlias)) {
           path = onefile;
 
-          // Multiple aliases can point to the same path - it should be processed only once
-          if (pathsProcessed.contains(path))
+          // Multiple aliases can point to the same path - it should be
+          // processed only once
+          if (pathsProcessed.contains(path)) {
             continue;
+          }
           pathsProcessed.add(path);
 
           LOG.info("Adding input file " + path);
 
-          if (!isEmptyPath(job, path))
+          if (!isEmptyPath(job, path)) {
             FileInputFormat.addInputPaths(job, path);
-          else
+          } else {
             emptyPaths.add(path);
+          }
         }
       }
 
       // Create a empty file if the directory is empty
-      for (String emptyPath : emptyPaths)
-        numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, oneAlias);
+      for (String emptyPath : emptyPaths) {
+        numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir,
+            numEmptyPaths, true, oneAlias);
+      }
 
       // If the query references non-existent partitions
-      // We need to add a empty file, it is not acceptable to change the operator tree
+      // We need to add a empty file, it is not acceptable to change the
+      // operator tree
       // Consider the query:
-      //  select * from (select count(1) from T union all select count(1) from T2) x;
-      // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 rows)
-      if (path == null)
-        numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias);
+      // select * from (select count(1) from T union all select count(1) from
+      // T2) x;
+      // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
+      // rows)
+      if (path == null) {
+        numEmptyPaths = addInputPath(null, job, work, hiveScratchDir,
+            numEmptyPaths, false, oneAlias);
+      }
     }
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Thu Jan 21 10:37:58 2010
@@ -56,17 +56,19 @@
   private MemoryMXBean memoryMXBean;
   private long numRows = 0;
   private long nextCntr = 1;
-  
+
+  @Override
   public void configure(JobConf job) {
-    // Allocate the bean at the beginning - 
+    // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-      
+
     try {
-      l4j.info("conf classpath = " 
-          + Arrays.asList(((URLClassLoader)job.getClassLoader()).getURLs()));
-      l4j.info("thread classpath = " 
-          + Arrays.asList(((URLClassLoader)Thread.currentThread().getContextClassLoader()).getURLs()));
+      l4j.info("conf classpath = "
+          + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+      l4j.info("thread classpath = "
+          + Arrays.asList(((URLClassLoader) Thread.currentThread()
+              .getContextClassLoader()).getURLs()));
     } catch (Exception e) {
       l4j.info("cannot get classpath: " + e.getMessage());
     }
@@ -88,35 +90,39 @@
       }
       fetchOperators = new HashMap<String, FetchOperator>();
       // create map local operators
-      for (Map.Entry<String, fetchWork> entry : localWork.getAliasToFetchWork().entrySet()) {
-        fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(), job));
+      for (Map.Entry<String, fetchWork> entry : localWork.getAliasToFetchWork()
+          .entrySet()) {
+        fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(),
+            job));
         l4j.info("fetchoperator for " + entry.getKey() + " created");
       }
       // initialize map local operators
       for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-        Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(entry.getKey()); 
+        Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
+            .get(entry.getKey());
         // All the operators need to be initialized before process
-        forwardOp.initialize(jc, new ObjectInspector[]{entry.getValue().getOutputObjectInspector()});
+        forwardOp.initialize(jc, new ObjectInspector[] { entry.getValue()
+            .getOutputObjectInspector() });
         l4j.info("fetchoperator for " + entry.getKey() + " initialized");
       }
-      // defer processing of map local operators to first row if in case there is no input (??)
+      // defer processing of map local operators to first row if in case there
+      // is no input (??)
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
         // will this be true here?
-        // Don't create a new object if we are already out of memory 
-        throw (OutOfMemoryError) e; 
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
       } else {
-        throw new RuntimeException ("Map operator initialization failed", e);
+        throw new RuntimeException("Map operator initialization failed", e);
       }
     }
 
   }
 
-  public void map(Object key, Object value,
-                  OutputCollector output,
-                  Reporter reporter) throws IOException {
-    if(oc == null) {
+  public void map(Object key, Object value, OutputCollector output,
+      Reporter reporter) throws IOException {
+    if (oc == null) {
       oc = output;
       rp = reporter;
       mo.setOutputCollector(oc);
@@ -126,11 +132,13 @@
         try {
           mapredLocalWork localWork = mo.getConf().getMapLocalWork();
           int fetchOpNum = 0;
-          for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+          for (Map.Entry<String, FetchOperator> entry : fetchOperators
+              .entrySet()) {
             int fetchOpRows = 0;
             String alias = entry.getKey();
             FetchOperator fetchOp = entry.getValue();
-            Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(alias); 
+            Operator<? extends Serializable> forwardOp = localWork
+                .getAliasToWork().get(alias);
 
             while (true) {
               InspectableObject row = fetchOp.getNextRow();
@@ -140,40 +148,46 @@
               }
               fetchOpRows++;
               forwardOp.process(row.o, 0);
-              // check if any operator had a fatal error or early exit during execution
-              if ( forwardOp.getDone() ) {
+              // check if any operator had a fatal error or early exit during
+              // execution
+              if (forwardOp.getDone()) {
                 done = true;
                 break;
               }
             }
-            
+
             if (l4j.isInfoEnabled()) {
-              l4j.info("fetch " + fetchOpNum++ + " processed " + fetchOpRows + " used mem: " + memoryMXBean.getHeapMemoryUsage().getUsed());
+              l4j
+                  .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows
+                      + " used mem: "
+                      + memoryMXBean.getHeapMemoryUsage().getUsed());
             }
           }
         } catch (Throwable e) {
           abort = true;
           if (e instanceof OutOfMemoryError) {
-            // Don't create a new object if we are already out of memory 
-            throw (OutOfMemoryError) e; 
+            // Don't create a new object if we are already out of memory
+            throw (OutOfMemoryError) e;
           } else {
-            throw new RuntimeException ("Map local work failed", e);
+            throw new RuntimeException("Map local work failed", e);
           }
         }
       }
     }
 
     try {
-      if (mo.getDone())
+      if (mo.getDone()) {
         done = true;
-      else {
-        // Since there is no concept of a group, we don't invoke startGroup/endGroup for a mapper
-        mo.process((Writable)value);
+      } else {
+        // Since there is no concept of a group, we don't invoke
+        // startGroup/endGroup for a mapper
+        mo.process((Writable) value);
         if (l4j.isInfoEnabled()) {
           numRows++;
           if (numRows == nextCntr) {
             long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecMapper: processing " + numRows + " rows: used memory = " + used_memory);
+            l4j.info("ExecMapper: processing " + numRows
+                + " rows: used memory = " + used_memory);
             nextCntr = getNextCntr(numRows);
           }
         }
@@ -182,26 +196,29 @@
       abort = true;
       e.printStackTrace();
       if (e instanceof OutOfMemoryError) {
-        // Don't create a new object if we are already out of memory 
-        throw (OutOfMemoryError) e; 
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
       } else {
-        throw new RuntimeException (e.getMessage(), e);
+        throw new RuntimeException(e.getMessage(), e);
       }
     }
   }
 
   private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the reducer. It dumps
+    // A very simple counter to keep track of number of rows processed by the
+    // reducer. It dumps
     // every 1 million times, and quickly before that
-    if (cntr >= 1000000)
+    if (cntr >= 1000000) {
       return cntr + 1000000;
-    
+    }
+
     return 10 * cntr;
   }
 
+  @Override
   public void close() {
     // No row was processed
-    if(oc == null) {
+    if (oc == null) {
       l4j.trace("Close called. no row processed by map.");
     }
 
@@ -212,24 +229,26 @@
       if (fetchOperators != null) {
         mapredLocalWork localWork = mo.getConf().getMapLocalWork();
         for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-          Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(entry.getKey()); 
+          Operator<? extends Serializable> forwardOp = localWork
+              .getAliasToWork().get(entry.getKey());
           forwardOp.close(abort);
         }
       }
-      
+
       if (l4j.isInfoEnabled()) {
         long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-        l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " + used_memory);
+        l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
+            + used_memory);
       }
-      
-      reportStats rps = new reportStats (rp);
+
+      reportStats rps = new reportStats(rp);
       mo.preorderMap(rps);
       return;
     } catch (Exception e) {
-      if(!abort) {
+      if (!abort) {
         // signal new failure to map-reduce
         l4j.error("Hit error while closing operators - failing tree");
-        throw new RuntimeException ("Error while closing operators", e);
+        throw new RuntimeException("Error while closing operators", e);
       }
     }
   }
@@ -240,15 +259,17 @@
 
   public static class reportStats implements Operator.OperatorFunc {
     Reporter rp;
-    public reportStats (Reporter rp) {
+
+    public reportStats(Reporter rp) {
       this.rp = rp;
     }
+
     public void func(Operator op) {
       Map<Enum, Long> opStats = op.getStats();
-      for(Map.Entry<Enum, Long> e: opStats.entrySet()) {
-          if(this.rp != null) {
-              rp.incrCounter(e.getKey(), e.getValue());
-          }
+      for (Map.Entry<Enum, Long> e : opStats.entrySet()) {
+        if (rp != null) {
+          rp.incrCounter(e.getKey(), e.getValue());
+        }
       }
     }
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Thu Jan 21 10:37:58 2010
@@ -18,23 +18,20 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.*;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
 import java.net.URLClassLoader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-
+import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -43,12 +40,17 @@
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class ExecReducer extends MapReduceBase implements Reducer {
 
   private JobConf jc;
-  private OutputCollector<?,?> oc;
+  private OutputCollector<?, ?> oc;
   private Operator<?> reducer;
   private Reporter rp;
   private boolean abort = false;
@@ -56,101 +58,108 @@
   private long cntr = 0;
   private long nextCntr = 1;
 
-  private static String [] fieldNames;
+  private static String[] fieldNames;
   public static final Log l4j = LogFactory.getLog("ExecReducer");
-  
+
   // used to log memory usage periodically
   private MemoryMXBean memoryMXBean;
-  
+
   // TODO: move to DynamicSerDe when it's ready
   private Deserializer inputKeyDeserializer;
-  // Input value serde needs to be an array to support different SerDe 
+  // Input value serde needs to be an array to support different SerDe
   // for different tags
-  private SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
+  private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
   static {
-    ArrayList<String> fieldNameArray =  new ArrayList<String> ();
-    for(Utilities.ReduceField r: Utilities.ReduceField.values()) {
+    ArrayList<String> fieldNameArray = new ArrayList<String>();
+    for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
       fieldNameArray.add(r.toString());
     }
-    fieldNames = fieldNameArray.toArray(new String [0]);
+    fieldNames = fieldNameArray.toArray(new String[0]);
   }
 
   tableDesc keyTableDesc;
   tableDesc[] valueTableDesc;
-  
+
+  @Override
   public void configure(JobConf job) {
     ObjectInspector[] rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector keyObjectInspector;
 
-    // Allocate the bean at the beginning - 
+    // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-    
+
     try {
-      l4j.info("conf classpath = " 
-          + Arrays.asList(((URLClassLoader)job.getClassLoader()).getURLs()));
-      l4j.info("thread classpath = " 
-          + Arrays.asList(((URLClassLoader)Thread.currentThread().getContextClassLoader()).getURLs()));
+      l4j.info("conf classpath = "
+          + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+      l4j.info("thread classpath = "
+          + Arrays.asList(((URLClassLoader) Thread.currentThread()
+              .getContextClassLoader()).getURLs()));
     } catch (Exception e) {
       l4j.info("cannot get classpath: " + e.getMessage());
     }
     jc = job;
     mapredWork gWork = Utilities.getMapRedWork(job);
     reducer = gWork.getReducer();
-    reducer.setParentOperators(null); // clear out any parents as reducer is the root
+    reducer.setParentOperators(null); // clear out any parents as reducer is the
+                                      // root
     isTagged = gWork.getNeedsTagging();
     try {
       keyTableDesc = gWork.getKeyDesc();
-      inputKeyDeserializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+      inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+          .getDeserializerClass(), null);
       inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
       keyObjectInspector = inputKeyDeserializer.getObjectInspector();
-      valueTableDesc = new tableDesc[gWork.getTagToValueDesc().size()]; 
-      for(int tag=0; tag<gWork.getTagToValueDesc().size(); tag++) {
+      valueTableDesc = new tableDesc[gWork.getTagToValueDesc().size()];
+      for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
         // We should initialize the SerDe with the TypeInfo when available.
         valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
-        inputValueDeserializer[tag] = (SerDe)ReflectionUtils.newInstance(valueTableDesc[tag].getDeserializerClass(), null);
-        inputValueDeserializer[tag].initialize(null, valueTableDesc[tag].getProperties());
-        valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
-        
+        inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+            valueTableDesc[tag].getDeserializerClass(), null);
+        inputValueDeserializer[tag].initialize(null, valueTableDesc[tag]
+            .getProperties());
+        valueObjectInspector[tag] = inputValueDeserializer[tag]
+            .getObjectInspector();
+
         ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
         ois.add(keyObjectInspector);
         ois.add(valueObjectInspector[tag]);
         ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
-        rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
-            Arrays.asList(fieldNames), ois);
+        rowObjectInspector[tag] = ObjectInspectorFactory
+            .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    
-    //initialize reduce operator tree
+
+    // initialize reduce operator tree
     try {
       l4j.info(reducer.dump(0));
       reducer.initialize(jc, rowObjectInspector);
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
-        // Don't create a new object if we are already out of memory 
-        throw (OutOfMemoryError) e; 
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
       } else {
-        throw new RuntimeException ("Reduce operator initialization failed", e);
+        throw new RuntimeException("Reduce operator initialization failed", e);
       }
     }
   }
 
   private Object keyObject;
-  private Object[] valueObject = new Object[Byte.MAX_VALUE];
-  
+  private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+
   private BytesWritable groupKey;
-  
+
   ArrayList<Object> row = new ArrayList<Object>(3);
   ByteWritable tag = new ByteWritable();
-  public void reduce(Object key, Iterator values,
-                     OutputCollector output,
-                     Reporter reporter) throws IOException {
 
-    if(oc == null) {
+  public void reduce(Object key, Iterator values, OutputCollector output,
+      Reporter reporter) throws IOException {
+
+    if (oc == null) {
       // propagete reporter and output collector to all operators
       oc = output;
       rp = reporter;
@@ -159,34 +168,35 @@
     }
 
     try {
-      BytesWritable keyWritable = (BytesWritable)key;
-      tag.set((byte)0);
+      BytesWritable keyWritable = (BytesWritable) key;
+      tag.set((byte) 0);
       if (isTagged) {
         // remove the tag
         int size = keyWritable.getSize() - 1;
-        tag.set(keyWritable.get()[size]); 
+        tag.set(keyWritable.get()[size]);
         keyWritable.setSize(size);
       }
-      
+
       if (!keyWritable.equals(groupKey)) {
         // If a operator wants to do some work at the beginning of a group
-        if (groupKey == null) { //the first group
+        if (groupKey == null) { // the first group
           groupKey = new BytesWritable();
         } else {
           // If a operator wants to do some work at the end of a group
           l4j.trace("End Group");
           reducer.endGroup();
         }
-        
+
         try {
           keyObject = inputKeyDeserializer.deserialize(keyWritable);
         } catch (Exception e) {
-          throw new HiveException("Unable to deserialize reduce input key from " + 
-              Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize())
-              + " with properties " + keyTableDesc.getProperties(),
-              e);
+          throw new HiveException(
+              "Unable to deserialize reduce input key from "
+                  + Utilities.formatBinaryString(keyWritable.get(), 0,
+                      keyWritable.getSize()) + " with properties "
+                  + keyTableDesc.getProperties(), e);
         }
-        
+
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
         l4j.trace("Start Group");
         reducer.startGroup();
@@ -195,15 +205,18 @@
       // System.err.print(keyObject.toString());
       while (values.hasNext()) {
         BytesWritable valueWritable = (BytesWritable) values.next();
-        //System.err.print(who.getHo().toString());
+        // System.err.print(who.getHo().toString());
         try {
-          valueObject[tag.get()] = inputValueDeserializer[tag.get()].deserialize(valueWritable);
+          valueObject[tag.get()] = inputValueDeserializer[tag.get()]
+              .deserialize(valueWritable);
         } catch (SerDeException e) {
-          throw new HiveException("Unable to deserialize reduce input value (tag=" + tag.get()
-              + ") from " + 
-              Utilities.formatBinaryString(valueWritable.get(), 0, valueWritable.getSize())
-              + " with properties " + valueTableDesc[tag.get()].getProperties(),
-              e);
+          throw new HiveException(
+              "Unable to deserialize reduce input value (tag="
+                  + tag.get()
+                  + ") from "
+                  + Utilities.formatBinaryString(valueWritable.get(), 0,
+                      valueWritable.getSize()) + " with properties "
+                  + valueTableDesc[tag.get()].getProperties(), e);
         }
         row.clear();
         row.add(keyObject);
@@ -214,7 +227,8 @@
           cntr++;
           if (cntr == nextCntr) {
             long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecReducer: processing " + cntr + " rows: used memory = " + used_memory);
+            l4j.info("ExecReducer: processing " + cntr
+                + " rows: used memory = " + used_memory);
             nextCntr = getNextCntr(cntr);
           }
         }
@@ -224,27 +238,30 @@
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
-        // Don't create a new object if we are already out of memory 
-        throw (OutOfMemoryError) e; 
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
       } else {
-        throw new IOException (e);
+        throw new IOException(e);
       }
     }
   }
 
   private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the reducer. It dumps
+    // A very simple counter to keep track of number of rows processed by the
+    // reducer. It dumps
     // every 1 million times, and quickly before that
-    if (cntr >= 1000000)
+    if (cntr >= 1000000) {
       return cntr + 1000000;
-    
+    }
+
     return 10 * cntr;
   }
 
+  @Override
   public void close() {
 
     // No row was processed
-    if(oc == null) {
+    if (oc == null) {
       l4j.trace("Close called no row");
     }
 
@@ -255,18 +272,20 @@
         reducer.endGroup();
       }
       if (l4j.isInfoEnabled()) {
-        l4j.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed());
+        l4j.info("ExecReducer: processed " + cntr + " rows: used memory = "
+            + memoryMXBean.getHeapMemoryUsage().getUsed());
       }
-      
+
       reducer.close(abort);
-      reportStats rps = new reportStats (rp);
+      reportStats rps = new reportStats(rp);
       reducer.preorderMap(rps);
       return;
     } catch (Exception e) {
-      if(!abort) {
+      if (!abort) {
         // signal new failure to map-reduce
         l4j.error("Hit error while closing operators - failing tree");
-        throw new RuntimeException ("Error while closing operators: " + e.getMessage(), e);
+        throw new RuntimeException("Error while closing operators: "
+            + e.getMessage(), e);
       }
     }
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Thu Jan 21 10:37:58 2010
@@ -36,7 +36,6 @@
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.util.StringUtils;
 
-
 /**
  * ExplainTask implementation
  * 
@@ -47,52 +46,53 @@
   public ExplainTask() {
     super();
   }
-  
+
+  @Override
   public int execute() {
-    
+
     try {
-      OutputStream outS = work.getResFile().getFileSystem(conf).create(work.getResFile());
+      OutputStream outS = work.getResFile().getFileSystem(conf).create(
+          work.getResFile());
       PrintStream out = new PrintStream(outS);
-    	
+
       // Print out the parse AST
       outputAST(work.getAstStringTree(), out, 0);
       out.println();
-      
+
       outputDependencies(out, work.getRootTasks(), 0);
       out.println();
-      
+
       // Go over all the tasks and dump out the plans
       outputStagePlans(out, work.getRootTasks(), 0);
       out.close();
-      
+
       return (0);
-    }
-    catch (Exception e) {
-      console.printError("Failed with exception " +   e.getMessage(), "\n" + StringUtils.stringifyException(e));
+    } catch (Exception e) {
+      console.printError("Failed with exception " + e.getMessage(), "\n"
+          + StringUtils.stringifyException(e));
       return (1);
     }
   }
 
   private String indentString(int indent) {
     StringBuilder sb = new StringBuilder();
-    for(int i=0; i<indent; ++i) {
+    for (int i = 0; i < indent; ++i) {
       sb.append(" ");
     }
-    
+
     return sb.toString();
   }
 
-  private void outputMap(Map<?, ?> mp, String header,
-                         PrintStream out, boolean extended, int indent) 
-    throws Exception {
+  private void outputMap(Map<?, ?> mp, String header, PrintStream out,
+      boolean extended, int indent) throws Exception {
 
     boolean first_el = true;
-    for(Entry<?,?> ent: mp.entrySet()) {
+    for (Entry<?, ?> ent : mp.entrySet()) {
       if (first_el) {
         out.println(header);
       }
       first_el = false;
-              
+
       // Print the key
       out.print(indentString(indent));
       out.printf("%s ", ent.getKey().toString());
@@ -105,78 +105,74 @@
         out.println();
       } else if (ent.getValue() instanceof Serializable) {
         out.println();
-        outputPlan((Serializable)ent.getValue(), out, extended, indent+2);
-      } 
+        outputPlan((Serializable) ent.getValue(), out, extended, indent + 2);
+      }
     }
   }
 
-  private void outputList(List<?> l, String header,
-                          PrintStream out, boolean extended, int indent) 
-    throws Exception {
-  
+  private void outputList(List<?> l, String header, PrintStream out,
+      boolean extended, int indent) throws Exception {
+
     boolean first_el = true;
     boolean nl = false;
-    for(Object o: l) {
+    for (Object o : l) {
       if (first_el) {
         out.print(header);
       }
-              
+
       if (isPrintable(o)) {
         if (!first_el) {
           out.print(", ");
         } else {
           out.print(" ");
         }
-                
+
         out.print(o);
         nl = true;
-      }
-      else if (o instanceof Serializable) {
+      } else if (o instanceof Serializable) {
         if (first_el) {
           out.println();
         }
-        outputPlan((Serializable)o, out, extended, indent+2);
+        outputPlan((Serializable) o, out, extended, indent + 2);
       }
-              
+
       first_el = false;
     }
-            
+
     if (nl) {
       out.println();
     }
   }
 
   private boolean isPrintable(Object val) {
-    if (val instanceof Boolean ||
-        val instanceof String ||
-        val instanceof Integer ||
-        val instanceof Byte ||
-        val instanceof Float ||
-        val instanceof Double) {
+    if (val instanceof Boolean || val instanceof String
+        || val instanceof Integer || val instanceof Byte
+        || val instanceof Float || val instanceof Double) {
       return true;
     }
 
     if (val.getClass().isPrimitive()) {
       return true;
     }
-    
+
     return false;
   }
 
-  private void outputPlan(Serializable work, PrintStream out, boolean extended, int indent) 
-    throws Exception {
+  private void outputPlan(Serializable work, PrintStream out, boolean extended,
+      int indent) throws Exception {
     // Check if work has an explain annotation
     Annotation note = work.getClass().getAnnotation(explain.class);
-    
+
     if (note instanceof explain) {
-      explain xpl_note = (explain)note;
+      explain xpl_note = (explain) note;
       if (extended || xpl_note.normalExplain()) {
         out.print(indentString(indent));
         out.println(xpl_note.displayName());
       }
     }
 
-    // If this is an operator then we need to call the plan generation on the conf and then
+    // If this is an operator then we need to call the plan generation on the
+    // conf and then
     // the children
     if (work instanceof Operator) {
       Operator<? extends Serializable> operator = (Operator<? extends Serializable>) work;
@@ -184,42 +180,42 @@
         outputPlan(operator.getConf(), out, extended, indent);
       }
       if (operator.getChildOperators() != null) {
-        for(Operator<? extends Serializable> op: operator.getChildOperators()) {
-          outputPlan(op, out, extended, indent+2);
+        for (Operator<? extends Serializable> op : operator.getChildOperators()) {
+          outputPlan(op, out, extended, indent + 2);
         }
       }
       return;
     }
-    
+
     // We look at all methods that generate values for explain
     Method[] methods = work.getClass().getMethods();
     Arrays.sort(methods, new MethodComparator());
 
-    for(Method m: methods) {
-      int prop_indents = indent+2;
+    for (Method m : methods) {
+      int prop_indents = indent + 2;
       note = m.getAnnotation(explain.class);
 
       if (note instanceof explain) {
-        explain xpl_note = (explain)note;
+        explain xpl_note = (explain) note;
 
         if (extended || xpl_note.normalExplain()) {
-          
+
           Object val = m.invoke(work);
 
           if (val == null) {
             continue;
           }
-          
+
           String header = null;
-          if (!xpl_note.displayName().equals("")){
-            header = indentString(prop_indents) + xpl_note.displayName() +":";
+          if (!xpl_note.displayName().equals("")) {
+            header = indentString(prop_indents) + xpl_note.displayName() + ":";
           } else {
             prop_indents = indent;
             header = indentString(prop_indents);
           }
 
           if (isPrintable(val)) {
-            
+
             out.printf("%s ", header);
             out.println(val);
             continue;
@@ -227,98 +223,100 @@
           // Try this as a map
           try {
             // Go through the map and print out the stuff
-            Map<?,?> mp = (Map<?,?>)val;
-            outputMap(mp, header, out, extended, prop_indents+2);
+            Map<?, ?> mp = (Map<?, ?>) val;
+            outputMap(mp, header, out, extended, prop_indents + 2);
             continue;
-          }
-          catch (ClassCastException ce) {
+          } catch (ClassCastException ce) {
             // Ignore - all this means is that this is not a map
           }
 
           // Try this as a list
           try {
-            List<?> l = (List<?>)val;
-            outputList(l, header, out, extended, prop_indents+2);
-            
+            List<?> l = (List<?>) val;
+            outputList(l, header, out, extended, prop_indents + 2);
+
             continue;
-          }
-          catch (ClassCastException ce) {
+          } catch (ClassCastException ce) {
             // Ignore
           }
-          
 
           // Finally check if it is serializable
           try {
-            Serializable s = (Serializable)val;
+            Serializable s = (Serializable) val;
             out.println(header);
-            outputPlan(s, out, extended, prop_indents+2);
-            
+            outputPlan(s, out, extended, prop_indents + 2);
+
             continue;
-          }
-          catch (ClassCastException ce) {
+          } catch (ClassCastException ce) {
             // Ignore
           }
         }
       }
     }
   }
-  
-  private void outputPlan(Task<? extends Serializable> task, PrintStream out, 
-                          boolean extended, HashSet<Task<? extends Serializable>> displayedSet,
-                          int indent) 
-    throws Exception {
-  
+
+  private void outputPlan(Task<? extends Serializable> task, PrintStream out,
+      boolean extended, HashSet<Task<? extends Serializable>> displayedSet,
+      int indent) throws Exception {
+
     if (displayedSet.contains(task)) {
       return;
     }
     displayedSet.add(task);
-    
+
     out.print(indentString(indent));
     out.printf("Stage: %s\n", task.getId());
-    // Start by getting the work part of the task and call the output plan for the work
-    outputPlan(task.getWork(), out, extended, indent+2);
+    // Start by getting the work part of the task and call the output plan for
+    // the work
+    outputPlan(task.getWork(), out, extended, indent + 2);
     out.println();
-    if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
-      for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+    if (task instanceof ConditionalTask
+        && ((ConditionalTask) task).getListTasks() != null) {
+      for (Task<? extends Serializable> con : ((ConditionalTask) task)
+          .getListTasks()) {
         outputPlan(con, out, extended, displayedSet, indent);
       }
     }
     if (task.getChildTasks() != null) {
-      for(Task<? extends Serializable> child: task.getChildTasks()) {
+      for (Task<? extends Serializable> child : task.getChildTasks()) {
         outputPlan(child, out, extended, displayedSet, indent);
       }
     }
   }
 
-  private Set<Task<? extends Serializable>> dependeciesTaskSet = new HashSet<Task<? extends Serializable>>();
-  private void outputDependencies(Task<? extends Serializable> task, PrintStream out, int indent, boolean rootTskCandidate) 
-    throws Exception {
-    
-    if(dependeciesTaskSet.contains(task))
+  private final Set<Task<? extends Serializable>> dependeciesTaskSet = new HashSet<Task<? extends Serializable>>();
+
+  private void outputDependencies(Task<? extends Serializable> task,
+      PrintStream out, int indent, boolean rootTskCandidate) throws Exception {
+
+    if (dependeciesTaskSet.contains(task)) {
       return;
+    }
     dependeciesTaskSet.add(task);
-    
+
     out.print(indentString(indent));
     out.printf("%s", task.getId());
     if ((task.getParentTasks() == null || task.getParentTasks().isEmpty())) {
-      if(rootTskCandidate)
+      if (rootTskCandidate) {
         out.print(" is a root stage");
-    }
-    else {
+      }
+    } else {
       out.print(" depends on stages: ");
       boolean first = true;
-      for(Task<? extends Serializable> parent: task.getParentTasks()) {
+      for (Task<? extends Serializable> parent : task.getParentTasks()) {
         if (!first) {
           out.print(", ");
         }
         first = false;
         out.print(parent.getId());
       }
-      
-      if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+
+      if (task instanceof ConditionalTask
+          && ((ConditionalTask) task).getListTasks() != null) {
         out.print(" , consists of ");
         first = true;
-        for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+        for (Task<? extends Serializable> con : ((ConditionalTask) task)
+            .getListTasks()) {
           if (!first) {
             out.print(", ");
           }
@@ -326,63 +324,63 @@
           out.print(con.getId());
         }
       }
-      
+
     }
     out.println();
-    
-    if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
-      for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+
+    if (task instanceof ConditionalTask
+        && ((ConditionalTask) task).getListTasks() != null) {
+      for (Task<? extends Serializable> con : ((ConditionalTask) task)
+          .getListTasks()) {
         outputDependencies(con, out, indent, false);
       }
     }
-    
+
     if (task.getChildTasks() != null) {
-      for(Task<? extends Serializable> child: task.getChildTasks()) {
+      for (Task<? extends Serializable> child : task.getChildTasks()) {
         outputDependencies(child, out, indent, true);
       }
     }
-    
+
   }
 
   public void outputAST(String treeString, PrintStream out, int indent) {
     out.print(indentString(indent));
     out.println("ABSTRACT SYNTAX TREE:");
-    out.print(indentString(indent+2));
-    out.println(treeString);    
+    out.print(indentString(indent + 2));
+    out.println(treeString);
   }
 
-  public void outputDependencies(PrintStream out, 
-                                 List<Task<? extends Serializable>> rootTasks,
-                                 int indent) 
-    throws Exception {
+  public void outputDependencies(PrintStream out,
+      List<Task<? extends Serializable>> rootTasks, int indent)
+      throws Exception {
     out.print(indentString(indent));
     out.println("STAGE DEPENDENCIES:");
-    for(Task<? extends Serializable> rootTask: rootTasks) {
-      outputDependencies(rootTask, out, indent+2, true);
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      outputDependencies(rootTask, out, indent + 2, true);
     }
   }
 
-  public void outputStagePlans(PrintStream out, 
-                               List<Task<? extends Serializable>> rootTasks,
-                               int indent) 
-    throws Exception {
+  public void outputStagePlans(PrintStream out,
+      List<Task<? extends Serializable>> rootTasks, int indent)
+      throws Exception {
     out.print(indentString(indent));
     out.println("STAGE PLANS:");
     HashSet<Task<? extends Serializable>> displayedSet = new HashSet<Task<? extends Serializable>>();
-    for(Task<? extends Serializable> rootTask: rootTasks) {
-      outputPlan(rootTask, out, work.getExtended(),
-                 displayedSet, indent+2);
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      outputPlan(rootTask, out, work.getExtended(), displayedSet, indent + 2);
     }
   }
 
   public static class MethodComparator implements Comparator {
     public int compare(Object o1, Object o2) {
-      Method m1 = (Method)o1;
-      Method m2 = (Method)o2;
+      Method m1 = (Method) o1;
+      Method m2 = (Method) o2;
       return m1.getName().compareTo(m2.getName());
     }
   }
 
+  @Override
   public int getType() {
     return StageType.EXPLAIN;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Thu Jan 21 10:37:58 2010
@@ -18,13 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 /**
@@ -33,10 +30,10 @@
 public class ExprNodeColumnEvaluator extends ExprNodeEvaluator {
 
   protected exprNodeColumnDesc expr;
-  
+
   transient StructObjectInspector[] inspectors;
   transient StructField[] fields;
-  
+
   public ExprNodeColumnEvaluator(exprNodeColumnDesc expr) {
     this.expr = expr;
   }
@@ -45,27 +42,28 @@
   public ObjectInspector initialize(ObjectInspector rowInspector)
       throws HiveException {
 
-    // We need to support field names like KEY.0, VALUE.1 between 
+    // We need to support field names like KEY.0, VALUE.1 between
     // map-reduce boundary.
     String[] names = expr.getColumn().split("\\.");
     inspectors = new StructObjectInspector[names.length];
     fields = new StructField[names.length];
-    
-    for(int i=0; i<names.length; i++) {
-      if (i==0) {
+
+    for (int i = 0; i < names.length; i++) {
+      if (i == 0) {
         inspectors[0] = (StructObjectInspector) rowInspector;
       } else {
-        inspectors[i] = (StructObjectInspector) fields[i-1].getFieldObjectInspector();
+        inspectors[i] = (StructObjectInspector) fields[i - 1]
+            .getFieldObjectInspector();
       }
       fields[i] = inspectors[i].getStructFieldRef(names[i]);
     }
-    return fields[names.length-1].getFieldObjectInspector();
+    return fields[names.length - 1].getFieldObjectInspector();
   }
-  
+
   @Override
   public Object evaluate(Object row) throws HiveException {
     Object o = row;
-    for(int i=0; i<fields.length; i++) {
+    for (int i = 0; i < fields.length; i++) {
       o = inspectors[i].getStructFieldData(o, fields[i]);
     }
     return o;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java Thu Jan 21 10:37:58 2010
@@ -20,7 +20,6 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeConstantDesc;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -31,25 +30,25 @@
   protected exprNodeConstantDesc expr;
   transient ObjectInspector writableObjectInspector;
   transient Object writableValue;
-  
+
   public ExprNodeConstantEvaluator(exprNodeConstantDesc expr) {
     this.expr = expr;
-    PrimitiveCategory pc = ((PrimitiveTypeInfo)expr.getTypeInfo())
+    PrimitiveCategory pc = ((PrimitiveTypeInfo) expr.getTypeInfo())
         .getPrimitiveCategory();
     writableObjectInspector = PrimitiveObjectInspectorFactory
         .getPrimitiveWritableObjectInspector(pc);
-    // Convert from Java to Writable 
+    // Convert from Java to Writable
     writableValue = PrimitiveObjectInspectorFactory
-        .getPrimitiveJavaObjectInspector(pc)
-        .getPrimitiveWritableObject(expr.getValue());
+        .getPrimitiveJavaObjectInspector(pc).getPrimitiveWritableObject(
+            expr.getValue());
   }
 
   @Override
   public ObjectInspector initialize(ObjectInspector rowInspector)
-    throws HiveException {
+      throws HiveException {
     return writableObjectInspector;
   }
-  
+
   @Override
   public Object evaluate(Object row) throws HiveException {
     return writableValue;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java Thu Jan 21 10:37:58 2010
@@ -24,16 +24,16 @@
 public abstract class ExprNodeEvaluator {
 
   /**
-   * Initialize should be called once and only once.
-   * Return the ObjectInspector for the return value, given the rowInspector.
+   * Initialize should be called once and only once. Return the ObjectInspector
+   * for the return value, given the rowInspector.
    */
-  public abstract ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException;
+  public abstract ObjectInspector initialize(ObjectInspector rowInspector)
+      throws HiveException;
 
   /**
-   * Evaluate the expression given the row.
-   * This method should use the rowInspector passed in from initialize to 
-   * inspect the row object.
-   * The return value will be inspected by the return value of initialize. 
+   * Evaluate the expression given the row. This method should use the
+   * rowInspector passed in from initialize to inspect the row object. The
+   * return value will be inspected by the return value of initialize.
    */
   public abstract Object evaluate(Object row) throws HiveException;