You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/02/24 00:58:42 UTC

svn commit: r1073992 [1/3] - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/ ql/src/java/org/a...

Author: nzhang
Date: Wed Feb 23 23:58:41 2011
New Revision: 1073992

URL: http://svn.apache.org/viewvc?rev=1073992&view=rev
Log:
HIVE-1950. Block merge for RCFile (Yongqiang He via Ning Zhang)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
    hive/trunk/ql/src/test/queries/clientnegative/merge_negative_1.q
    hive/trunk/ql/src/test/queries/clientnegative/merge_negative_2.q
    hive/trunk/ql/src/test/queries/clientpositive/alter_merge.q
    hive/trunk/ql/src/test/queries/clientpositive/alter_merge_stats.q
    hive/trunk/ql/src/test/results/clientnegative/merge_negative_1.q.out
    hive/trunk/ql/src/test/results/clientnegative/merge_negative_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/alter_merge.q.out
    hive/trunk/ql/src/test/results/clientpositive/alter_merge_stats.q.out
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/CombineHiveKey.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Wed Feb 23 23:58:41 2011
@@ -84,7 +84,7 @@ public class ConditionalTask extends Tas
     for (Task<? extends Serializable> tsk : getListTasks()) {
       if (!resTasks.contains(tsk)) {
         driverContext.getRunnable().remove(tsk);
-        console.printInfo(ExecDriver.getJobEndMsg("" + Utilities.randGen.nextInt())
+        console.printInfo(HadoopJobExecHelper.getJobEndMsg("" + Utilities.randGen.nextInt())
             + ", job is filtered out (removed at runtime).");
         if (tsk.isMapRedTask()) {
           driverContext.incCurJobNo(1);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Feb 23 23:58:41 2011
@@ -75,6 +75,9 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -89,6 +92,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.MetaDataFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.AlterTablePartMergeFilesDesc;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
@@ -138,6 +142,7 @@ import org.apache.hadoop.hive.serde2.dyn
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -350,6 +355,11 @@ public class DDLTask extends Task<DDLWor
       if (showIndexes != null) {
         return showIndexes(db, showIndexes);
       }
+      
+      AlterTablePartMergeFilesDesc mergeFilesDesc = work.getMergeFilesDesc();
+      if(mergeFilesDesc != null) {
+        return mergeFiles(db, mergeFilesDesc);
+      }
 
     } catch (InvalidTableException e) {
       console.printError("Table " + e.getTableName() + " does not exist");
@@ -369,6 +379,33 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  /**
+   * First, make sure the source table/partition is not
+   * archived/indexes/non-rcfile. If either of these is true, throw an
+   * exception.
+   * 
+   * The way how it does the merge is to create a BlockMergeTask from the
+   * mergeFilesDesc.
+   * 
+   * @param db
+   * @param mergeFilesDesc
+   * @return
+   * @throws HiveException
+   */
+  private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
+      throws HiveException {
+    // merge work only needs input and output.
+    MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
+        mergeFilesDesc.getOutputDir());
+    DriverContext driverCxt = new DriverContext();
+    BlockMergeTask taskExec = new BlockMergeTask();
+    taskExec.initialize(db.getConf(), null, driverCxt);
+    taskExec.setWork(mergeWork);
+    int ret = taskExec.execute(driverCxt);
+
+    return ret;
+  }
+
   private int grantOrRevokeRole(GrantRevokeRoleDDL grantOrRevokeRoleDDL)
       throws HiveException {
     try {
@@ -894,23 +931,6 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
-  /**
-   * Determines whether a partition has been archived
-   *
-   * @param p
-   * @return
-   */
-
-  private boolean isArchived(Partition p) {
-    Map<String, String> params = p.getParameters();
-    if ("true".equalsIgnoreCase(params.get(
-        org.apache.hadoop.hive.metastore.api.Constants.IS_ARCHIVED))) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
   private void setIsArchived(Partition p, boolean state) {
     Map<String, String> params = p.getParameters();
     if (state) {
@@ -958,7 +978,7 @@ public class DDLTask extends Task<DDLWor
    */
   private void setArchived(Partition p, Path parentDir, String dirInArchive, String archiveName)
       throws URISyntaxException {
-    assert(isArchived(p) == false);
+    assert(Utilities.isArchived(p) == false);
     Map<String, String> params = p.getParameters();
 
     URI parentUri = parentDir.toUri();
@@ -996,7 +1016,7 @@ public class DDLTask extends Task<DDLWor
    * @param p - the partition to modify
    */
   private void setUnArchived(Partition p) {
-    assert(isArchived(p) == true);
+    assert(Utilities.isArchived(p) == true);
     String parentDir = getOriginalLocation(p);
     setIsArchived(p, false);
     setOriginalLocation(p, null);
@@ -1051,7 +1071,7 @@ public class DDLTask extends Task<DDLWor
       throw new HiveException("Specified partition does not exist");
     }
 
-    if (isArchived(p)) {
+    if (Utilities.isArchived(p)) {
       // If there were a failure right after the metadata was updated in an
       // archiving operation, it's possible that the original, unarchived files
       // weren't deleted.
@@ -1236,7 +1256,7 @@ public class DDLTask extends Task<DDLWor
       throw new HiveException("Specified partition does not exist");
     }
 
-    if (!isArchived(p)) {
+    if (!Utilities.isArchived(p)) {
       Path location = new Path(p.getLocation());
       Path leftOverArchiveDir = new Path(location.getParent(),
           location.getName() + INTERMEDIATE_ARCHIVED_DIR_SUFFIX);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Feb 23 23:58:41 2011
@@ -28,13 +28,9 @@ import java.lang.management.MemoryMXBean
 import java.net.URL;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Collections;
 import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,10 +52,6 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
-import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
-import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
@@ -85,7 +77,6 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.log4j.Appender;
 import org.apache.log4j.BasicConfigurator;
@@ -98,30 +89,22 @@ import org.apache.log4j.varia.NullAppend
  * ExecDriver.
  *
  */
-public class ExecDriver extends Task<MapredWork> implements Serializable {
+public class ExecDriver extends Task<MapredWork> implements Serializable, HadoopJobExecHook {
 
   private static final long serialVersionUID = 1L;
 
   protected transient JobConf job;
-  protected transient int mapProgress = 0;
-  protected transient int reduceProgress = 0;
-  public transient String jobId;
-
-  public String getJobId() {
-    return jobId;
-  }
-
-  public void setJobId(String jobId) {
-    this.jobId = jobId;
-  }
-
   public static MemoryMXBean memoryMXBean;
+  protected HadoopJobExecHelper jobExecHelper;
 
   /**
    * Constructor when invoked from QL.
    */
   public ExecDriver() {
     super();
+    LOG = LogFactory.getLog(this.getClass().getName());
+    console = new LogHelper(LOG);
+    this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
   }
 
   protected static String getResourceFiles(Configuration conf, SessionState.ResourceType t) {
@@ -179,6 +162,7 @@ public class ExecDriver extends Task<Map
     if (StringUtils.isNotBlank(addedArchives)) {
       HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives);
     }
+    this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
   }
 
   /**
@@ -189,101 +173,7 @@ public class ExecDriver extends Task<Map
     this.job = job;
     LOG = LogFactory.getLog(this.getClass().getName());
     console = new LogHelper(LOG, isSilent);
-  }
-
-  /**
-   * A list of the currently running jobs spawned in this Hive instance that is used to kill all
-   * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
-   * still jobs running.
-   */
-  private static Map<String, String> runningJobKillURIs = Collections
-      .synchronizedMap(new HashMap<String, String>());
-
-  /**
-   * In Hive, when the user control-c's the command line, any running jobs spawned from that command
-   * line are best-effort killed.
-   *
-   * This static constructor registers a shutdown thread to iterate over all the running job kill
-   * URLs and do a get on them.
-   *
-   */
-  static {
-    if (new org.apache.hadoop.conf.Configuration()
-        .getBoolean("webinterface.private.actions", false)) {
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        @Override
-        public void run() {
-          synchronized (runningJobKillURIs) {
-            for (String uri : runningJobKillURIs.values()) {
-              try {
-                System.err.println("killing job with: " + uri);
-                java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
-                    .openConnection();
-                conn.setRequestMethod("POST");
-                int retCode = conn.getResponseCode();
-                if (retCode != 200) {
-                  System.err.println("Got an error trying to kill job with URI: " + uri + " = "
-                      + retCode);
-                }
-              } catch (Exception e) {
-                System.err.println("trying to kill job, caught: " + e);
-                // do nothing
-              }
-            }
-          }
-        }
-      });
-    }
-  }
-
-  /**
-   * from StreamJob.java.
-   */
-  private void jobInfo(RunningJob rj) {
-    if (job.get("mapred.job.tracker", "local").equals("local")) {
-      console.printInfo("Job running in-process (local Hadoop)");
-    } else {
-      String hp = job.get("mapred.job.tracker");
-      if (SessionState.get() != null) {
-        SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
-            getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
-      }
-      console.printInfo(ExecDriver.getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
-          + rj.getTrackingURL());
-      console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
-          + " job  -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
-    }
-  }
-
-  /**
-   * This class contains the state of the running task Going forward, we will return this handle
-   * from execute and Driver can split execute into start, monitorProgess and postProcess.
-   */
-  private static class ExecDriverTaskHandle extends TaskHandle {
-    JobClient jc;
-    RunningJob rj;
-
-    JobClient getJobClient() {
-      return jc;
-    }
-
-    RunningJob getRunningJob() {
-      return rj;
-    }
-
-    public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
-      this.jc = jc;
-      this.rj = rj;
-    }
-
-    public void setRunningJob(RunningJob job) {
-      rj = job;
-    }
-
-    @Override
-    public Counters getCounters() throws IOException {
-      return rj.getCounters();
-    }
+    this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
   }
 
   /**
@@ -296,20 +186,7 @@ public class ExecDriver extends Task<Map
    *
    * @return true if fatal errors happened during job execution, false otherwise.
    */
-  private boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
-    if (ctrs == null) {
-      // hadoop might return null if it cannot locate the job.
-      // we may still be able to retrieve the job status - so ignore
-      return false;
-    }
-    // check for number of created files
-    long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
-    long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
-    if (numFiles > upperLimit) {
-      errMsg.append("total number of created files exceeds ").append(upperLimit);
-      return true;
-    }
-
+  public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
     for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
       if (op.checkFatalErrors(ctrs, errMsg)) {
         return true;
@@ -323,186 +200,7 @@ public class ExecDriver extends Task<Map
     return false;
   }
 
-  private boolean progress(ExecDriverTaskHandle th) throws IOException {
-    JobClient jc = th.getJobClient();
-    RunningJob rj = th.getRunningJob();
-    String lastReport = "";
-    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-    long reportTime = System.currentTimeMillis();
-    long maxReportInterval = 60 * 1000; // One minute
-    boolean fatal = false;
-    StringBuilder errMsg = new StringBuilder();
-    long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
-    boolean initializing = true;
-    boolean initOutputPrinted = false;
-    while (!rj.isComplete()) {
-      try {
-        Thread.sleep(pullInterval);
-      } catch (InterruptedException e) {
-      }
-
-      if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) {
-        // No reason to poll untill the job is initialized
-        continue;
-      } else {
-        // By now the job is initialized so no reason to do
-        // rj.getJobState() again and we do not want to do an extra RPC call
-        initializing = false;
-      }
-
-      if (!initOutputPrinted) {
-        SessionState ss = SessionState.get();
-
-        String logMapper;
-        String logReducer;
-
-        TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
-        if (mappers == null) {
-          logMapper = "no information for number of mappers; ";
-        } else {
-          int numMap = mappers.length;
-          if (ss != null) {
-            ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
-                Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
-          }
-          logMapper = "number of mappers: " + numMap + "; ";
-        }
-
-        TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
-        if (reducers == null) {
-          logReducer = "no information for number of reducers. ";
-        } else {
-          int numReduce = reducers.length;
-          if (ss != null) {
-            ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
-                Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
-          }
-          logReducer = "number of reducers: " + numReduce;
-        }
-
-        console
-            .printInfo("Hadoop job information for " + getId() + ": " + logMapper + logReducer);
-        initOutputPrinted = true;
-      }
-
-      RunningJob newRj = jc.getJob(rj.getJobID());
-      if (newRj == null) {
-        // under exceptional load, hadoop may not be able to look up status
-        // of finished jobs (because it has purged them from memory). From
-        // hive's perspective - it's equivalent to the job having failed.
-        // So raise a meaningful exception
-        throw new IOException("Could not find status of job: + rj.getJobID()");
-      } else {
-        th.setRunningJob(newRj);
-        rj = newRj;
-      }
-
-      // If fatal errors happen we should kill the job immediately rather than
-      // let the job retry several times, which eventually lead to failure.
-      if (fatal) {
-        continue; // wait until rj.isComplete
-      }
-
-      Counters ctrs = th.getCounters();
-
-      if (fatal = checkFatalErrors(ctrs, errMsg)) {
-        console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job.");
-        rj.killJob();
-        continue;
-      }
-      errMsg.setLength(0);
-
-      updateCounters(ctrs, rj);
-
-      String report = " " + getId() + " map = " + mapProgress + "%,  reduce = " + reduceProgress
-          + "%";
-
-      if (!report.equals(lastReport)
-          || System.currentTimeMillis() >= reportTime + maxReportInterval) {
-
-        // write out serialized plan with counters to log file
-        // LOG.info(queryPlan);
-        String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
-        SessionState ss = SessionState.get();
-        if (ss != null) {
-          ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs);
-          ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
-              Keys.TASK_HADOOP_PROGRESS, output);
-          ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), this);
-          ss.getHiveHistory().logPlanProgress(queryPlan);
-        }
-        console.printInfo(output);
-        lastReport = report;
-        reportTime = System.currentTimeMillis();
-      }
-    }
-
-    boolean success;
-    Counters ctrs = th.getCounters();
-
-    if (fatal) {
-      success = false;
-    } else {
-      // check for fatal error again in case it occurred after
-      // the last check before the job is completed
-      if (checkFatalErrors(ctrs, errMsg)) {
-        console.printError("[Fatal Error] " + errMsg.toString());
-        success = false;
-      } else {
-        success = rj.isSuccessful();
-      }
-    }
-
-    setDone();
-    // update based on the final value of the counters
-    updateCounters(ctrs, rj);
-
-    SessionState ss = SessionState.get();
-    if (ss != null) {
-      ss.getHiveHistory().logPlanProgress(queryPlan);
-    }
-    // LOG.info(queryPlan);
-    return (success);
-  }
-
-  /**
-   * Update counters relevant to this task.
-   */
-  private void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
-    mapProgress = Math.round(rj.mapProgress() * 100);
-    reduceProgress = Math.round(rj.reduceProgress() * 100);
-    taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
-    taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
-    if (ctrs == null) {
-      // hadoop might return null if it cannot locate the job.
-      // we may still be able to retrieve the job status - so ignore
-      return;
-    }
-    for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
-      op.updateCounters(ctrs);
-    }
-    if (work.getReducer() != null) {
-      work.getReducer().updateCounters(ctrs);
-    }
-  }
-
-  public boolean mapStarted() {
-    return mapProgress > 0;
-  }
-
-  public boolean reduceStarted() {
-    return reduceProgress > 0;
-  }
-
-  public boolean mapDone() {
-    return mapProgress == 100;
-  }
-
-  public boolean reduceDone() {
-    return reduceProgress == 100;
-  }
-
-  /**
+   /**
    * Execute a query plan using Hadoop.
    */
   @Override
@@ -676,7 +374,6 @@ public class ExecDriver extends Task<Map
         HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
       }
       JobClient jc = new JobClient(job);
-
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
@@ -692,35 +389,13 @@ public class ExecDriver extends Task<Map
 
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
-
-      jobId = rj.getJobID();
-
       // replace it back
       if (pwd != null) {
         HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
       }
-
-      // add to list of running jobs to kill in case of abnormal shutdown
-
-      runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
-
-      ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
-      jobInfo(rj);
-      success = progress(th);
-
-      String statusMesg = getJobEndMsg(rj.getJobID());
-      if (!success) {
-        statusMesg += " with errors";
-        returnVal = 2;
-        console.printError(statusMesg);
-        if (HiveConf.getBoolVar(job, HiveConf.ConfVars.SHOW_JOB_FAIL_DEBUG_INFO)) {
-          showJobFailDebugInfo(job, rj);
-        }
-      } else {
-        console.printInfo(statusMesg);
-      }
-
-
+      
+      returnVal = jobExecHelper.progress(rj, jc);
+      success = (returnVal == 0);
     } catch (Exception e) {
       e.printStackTrace();
       String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
@@ -747,7 +422,7 @@ public class ExecDriver extends Task<Map
           if (returnVal != 0) {
             rj.killJob();
           }
-          runningJobKillURIs.remove(rj.getJobID());
+          HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
         }
       } catch (Exception e) {
       }
@@ -778,179 +453,21 @@ public class ExecDriver extends Task<Map
 
     return (returnVal);
   }
-
-  /**
-   * This msg pattern is used to track when a job is started.
-   *
-   * @param jobId
-   * @return
-   */
-  private static String getJobStartMsg(String jobId) {
-    return "Starting Job = " + jobId;
-  }
-
-  /**
-   * this msg pattern is used to track when a job is successfully done.
-   *
-   * @param jobId
-   * @return
-   */
-  public static String getJobEndMsg(String jobId) {
-    return "Ended Job = " + jobId;
+  
+  public boolean mapStarted() {
+    return this.jobExecHelper.mapStarted();
   }
 
-  private String getTaskAttemptLogUrl(String taskTrackerHttpAddress, String taskAttemptId) {
-    return taskTrackerHttpAddress + "/tasklog?taskid=" + taskAttemptId + "&all=true";
+  public boolean reduceStarted() {
+    return this.jobExecHelper.reduceStarted();
   }
 
-  // Used for showJobFailDebugInfo
-  private static class TaskInfo {
-    String jobId;
-    HashSet<String> logUrls;
-
-    public TaskInfo(String jobId) {
-      this.jobId = jobId;
-      logUrls = new HashSet<String>();
-    }
-
-    public void addLogUrl(String logUrl) {
-      logUrls.add(logUrl);
-    }
-
-    public HashSet<String> getLogUrls() {
-      return logUrls;
-    }
-
-    public String getJobId() {
-      return jobId;
-    }
+  public boolean mapDone() {
+    return this.jobExecHelper.mapDone();
   }
 
-  @SuppressWarnings("deprecation")
-  private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
-    // Mapping from task ID to the number of failures
-    Map<String, Integer> failures = new HashMap<String, Integer>();
-    // Successful task ID's
-    Set<String> successes = new HashSet<String>();
-
-    Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>();
-
-    int startIndex = 0;
-
-    // Loop to get all task completion events because getTaskCompletionEvents
-    // only returns a subset per call
-    while (true) {
-      TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
-
-      if (taskCompletions == null || taskCompletions.length == 0) {
-        break;
-      }
-
-      boolean more = true;
-      for (TaskCompletionEvent t : taskCompletions) {
-        // getTaskJobIDs returns Strings for compatibility with Hadoop versions
-        // without TaskID or TaskAttemptID
-        String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
-
-        if (taskJobIds == null) {
-          console.printError("Task attempt info is unavailable in this Hadoop version");
-          more = false;
-          break;
-        }
-
-        // For each task completion event, get the associated task id, job id
-        // and the logs
-        String taskId = taskJobIds[0];
-        String jobId = taskJobIds[1];
-
-        TaskInfo ti = taskIdToInfo.get(taskId);
-        if (ti == null) {
-          ti = new TaskInfo(jobId);
-          taskIdToInfo.put(taskId, ti);
-        }
-        // These tasks should have come from the same job.
-        assert (ti.getJobId() == jobId);
-        ti.getLogUrls().add(getTaskAttemptLogUrl(t.getTaskTrackerHttp(), t.getTaskId()));
-
-        // If a task failed, then keep track of the total number of failures
-        // for that task (typically, a task gets re-run up to 4 times if it
-        // fails
-
-        if (t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
-          Integer failAttempts = failures.get(taskId);
-          if (failAttempts == null) {
-            failAttempts = Integer.valueOf(0);
-          }
-          failAttempts = Integer.valueOf(failAttempts.intValue() + 1);
-          failures.put(taskId, failAttempts);
-        } else {
-          successes.add(taskId);
-        }
-      }
-      if (!more) {
-        break;
-      }
-      startIndex += taskCompletions.length;
-    }
-    // Remove failures for tasks that succeeded
-    for (String task : successes) {
-      failures.remove(task);
-    }
-
-    if (failures.keySet().size() == 0) {
-      return;
-    }
-
-    // Find the highest failure count
-    int maxFailures = 0;
-    for (Integer failCount : failures.values()) {
-      if (maxFailures < failCount.intValue()) {
-        maxFailures = failCount.intValue();
-      }
-    }
-
-    // Display Error Message for tasks with the highest failure count
-    String jtUrl = JobTrackerURLResolver.getURL(conf);
-
-    for (String task : failures.keySet()) {
-      if (failures.get(task).intValue() == maxFailures) {
-        TaskInfo ti = taskIdToInfo.get(task);
-        String jobId = ti.getJobId();
-        String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid=" + task.toString();
-
-        TaskLogProcessor tlp = new TaskLogProcessor(conf);
-        for (String logUrl : ti.getLogUrls()) {
-          tlp.addTaskAttemptLogUrl(logUrl);
-        }
-
-        List<ErrorAndSolution> errors = tlp.getErrors();
-
-        StringBuilder sb = new StringBuilder();
-        // We use a StringBuilder and then call printError only once as
-        // printError will write to both stderr and the error log file. In
-        // situations where both the stderr and the log file output is
-        // simultaneously output to a single stream, this will look cleaner.
-        sb.append("\n");
-        sb.append("Task with the most failures(" + maxFailures + "): \n");
-        sb.append("-----\n");
-        sb.append("Task ID:\n  " + task + "\n\n");
-        sb.append("URL:\n  " + taskUrl + "\n");
-
-        for (ErrorAndSolution e : errors) {
-          sb.append("\n");
-          sb.append("Possible error:\n  " + e.getError() + "\n\n");
-          sb.append("Solution:\n  " + e.getSolution() + "\n");
-        }
-        sb.append("-----\n");
-
-        console.printError(sb.toString());
-
-        // Only print out one task because that's good enough for debugging.
-        break;
-      }
-    }
-    return;
-
+  public boolean reduceDone() {
+    return this.jobExecHelper.reduceDone();
   }
 
   private static void printUsage() {
@@ -1375,4 +892,19 @@ public class ExecDriver extends Task<Map
       }
     }
   }
+
+  @Override
+  public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
+    for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+      op.updateCounters(ctrs);
+    }
+    if (work.getReducer() != null) {
+      work.getReducer().updateCounters(ctrs);
+    }
+  }
+
+  @Override
+  public void logPlanProgress(SessionState ss) throws IOException {
+    ss.getHiveHistory().logPlanProgress(queryPlan);
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Feb 23 23:58:41 2011
@@ -673,7 +673,7 @@ public class FileSinkOperator extends Te
       if ((conf != null) && isNativeTable) {
         String specPath = conf.getDirName();
         DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
-        mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx);
+        Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf);
       }
     } catch (IOException e) {
       throw new HiveException(e);
@@ -681,83 +681,6 @@ public class FileSinkOperator extends Te
     super.jobClose(hconf, success, feedBack);
   }
 
-  public void mvFileToFinalPath(String specPath, Configuration hconf,
-      boolean success, Log log, DynamicPartitionCtx dpCtx) throws IOException, HiveException {
-
-    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
-    Path tmpPath = Utilities.toTempPath(specPath);
-    Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
-        + ".intermediate");
-    Path finalPath = new Path(specPath);
-    if (success) {
-      if (fs.exists(tmpPath)) {
-        // Step1: rename tmp output folder to intermediate path. After this
-        // point, updates from speculative tasks still writing to tmpPath
-        // will not appear in finalPath.
-        log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
-        Utilities.rename(fs, tmpPath, intermediatePath);
-        // Step2: remove any tmp file or double-committed output files
-        ArrayList<String> emptyBuckets =
-          Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
-        // create empty buckets if necessary
-        if (emptyBuckets.size() > 0) {
-          createEmptyBuckets(hconf, emptyBuckets);
-        }
-
-        // Step3: move to the file destination
-        log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
-        Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
-      }
-    } else {
-      fs.delete(tmpPath, true);
-    }
-  }
-
-  /**
-   * Check the existence of buckets according to bucket specification. Create empty buckets if
-   * needed.
-   * @param specPath The final path where the dynamic partitions should be in.
-   * @param conf FileSinkDesc.
-   * @param dpCtx dynamic partition context.
-   * @throws HiveException
-   * @throws IOException
-   */
-  private void createEmptyBuckets(Configuration hconf, ArrayList<String> paths)
-      throws HiveException, IOException {
-
-    JobConf jc;
-    if (hconf instanceof JobConf) {
-      jc = new JobConf(hconf);
-    } else {
-      // test code path
-      jc = new JobConf(hconf, ExecDriver.class);
-    }
-    HiveOutputFormat<?, ?> hiveOutputFormat = null;
-    Class<? extends Writable> outputClass = null;
-    boolean isCompressed = conf.getCompressed();
-    TableDesc tableInfo = conf.getTableInfo();
-    try {
-      Serializer serializer = (Serializer) tableInfo.getDeserializerClass().newInstance();
-      serializer.initialize(null, tableInfo.getProperties());
-      outputClass = serializer.getSerializedClass();
-      hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (InstantiationException e) {
-      throw new HiveException(e);
-    } catch (IllegalAccessException e) {
-      throw new HiveException(e);
-    }
-
-    for (String p: paths) {
-      Path path = new Path(p);
-      RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
-          jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
-      writer.close(false);
-      LOG.info("created empty bucket for enforcing bucketing at " + path);
-    }
-  }
-
   @Override
   public OperatorType getType() {
     return OperatorType.FILESINK;

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1073992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Wed Feb 23 23:58:41 2011
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
+import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
+import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+public class HadoopJobExecHelper {
+  
+  protected transient JobConf job;
+  protected Task<? extends Serializable> task;
+  
+  protected transient int mapProgress = 0;
+  protected transient int reduceProgress = 0;
+  public transient String jobId;
+  private LogHelper console;
+  private HadoopJobExecHook callBackObj;
+
+  /**
+   * Update counters relevant to this task.
+   */
+  private void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
+    mapProgress = Math.round(rj.mapProgress() * 100);
+    reduceProgress = Math.round(rj.reduceProgress() * 100);
+    task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
+    task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
+    if (ctrs == null) {
+      // hadoop might return null if it cannot locate the job.
+      // we may still be able to retrieve the job status - so ignore
+      return;
+    }
+    if(callBackObj != null) {
+      callBackObj.updateCounters(ctrs, rj);      
+    }
+  }
+  
+  /**
+   * This msg pattern is used to track when a job is started.
+   *
+   * @param jobId
+   * @return
+   */
+  private static String getJobStartMsg(String jobId) {
+    return "Starting Job = " + jobId;
+  }
+
+  /**
+   * this msg pattern is used to track when a job is successfully done.
+   *
+   * @param jobId
+   * @return
+   */
+  public static String getJobEndMsg(String jobId) {
+    return "Ended Job = " + jobId;
+  }
+
+  private String getTaskAttemptLogUrl(String taskTrackerHttpAddress, String taskAttemptId) {
+    return taskTrackerHttpAddress + "/tasklog?taskid=" + taskAttemptId + "&all=true";
+  }
+
+  public boolean mapStarted() {
+    return mapProgress > 0;
+  }
+
+  public boolean reduceStarted() {
+    return reduceProgress > 0;
+  }
+
+  public boolean mapDone() {
+    return mapProgress == 100;
+  }
+
+  public boolean reduceDone() {
+    return reduceProgress == 100;
+  }
+
+  
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
+  
+  public HadoopJobExecHelper() {
+  }
+  
+  public HadoopJobExecHelper(JobConf job, LogHelper console,
+      Task<? extends Serializable> task, HadoopJobExecHook hookCallBack) {
+    this.job = job;
+    this.console = console;
+    this.task = task;
+    this.callBackObj = hookCallBack;
+  }
+
+  
+  /**
+   * A list of the currently running jobs spawned in this Hive instance that is used to kill all
+   * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
+   * still jobs running.
+   */
+  public static Map<String, String> runningJobKillURIs = Collections
+      .synchronizedMap(new HashMap<String, String>());
+
+  
+  /**
+   * In Hive, when the user control-c's the command line, any running jobs spawned from that command
+   * line are best-effort killed.
+   *
+   * This static constructor registers a shutdown thread to iterate over all the running job kill
+   * URLs and do a get on them.
+   *
+   */
+  static {
+    if (new org.apache.hadoop.conf.Configuration()
+        .getBoolean("webinterface.private.actions", false)) {
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        @Override
+        public void run() {
+          synchronized (runningJobKillURIs) {
+            for (String uri : runningJobKillURIs.values()) {
+              try {
+                System.err.println("killing job with: " + uri);
+                java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
+                    .openConnection();
+                conn.setRequestMethod("POST");
+                int retCode = conn.getResponseCode();
+                if (retCode != 200) {
+                  System.err.println("Got an error trying to kill job with URI: " + uri + " = "
+                      + retCode);
+                }
+              } catch (Exception e) {
+                System.err.println("trying to kill job, caught: " + e);
+                // do nothing
+              }
+            }
+          }
+        }
+      });
+    }
+  }
+  
+  public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+    if (ctrs == null) {
+      // hadoop might return null if it cannot locate the job.
+      // we may still be able to retrieve the job status - so ignore
+      return false;
+    }
+    // check for number of created files
+    long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+    long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+    if (numFiles > upperLimit) {
+      errMsg.append("total number of created files exceeds ").append(upperLimit);
+      return true;
+    }
+    return this.callBackObj.checkFatalErrors(ctrs, errMsg);
+  }
+  
+  private boolean progress(ExecDriverTaskHandle th) throws IOException {
+    JobClient jc = th.getJobClient();
+    RunningJob rj = th.getRunningJob();
+    String lastReport = "";
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+    long reportTime = System.currentTimeMillis();
+    long maxReportInterval = 60 * 1000; // One minute
+    boolean fatal = false;
+    StringBuilder errMsg = new StringBuilder();
+    long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
+    boolean initializing = true;
+    while (!rj.isComplete()) {
+      try {
+        Thread.sleep(pullInterval);
+      } catch (InterruptedException e) {
+      }
+
+      if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) {
+        // No reason to poll untill the job is initialized
+        continue;
+      } else {
+        // By now the job is initialized so no reason to do
+        // rj.getJobState() again and we do not want to do an extra RPC call
+        initializing = false;
+      }
+
+      RunningJob newRj = jc.getJob(rj.getJobID());
+      if (newRj == null) {
+        // under exceptional load, hadoop may not be able to look up status
+        // of finished jobs (because it has purged them from memory). From
+        // hive's perspective - it's equivalent to the job having failed.
+        // So raise a meaningful exception
+        throw new IOException("Could not find status of job: + rj.getJobID()");
+      } else {
+        th.setRunningJob(newRj);
+        rj = newRj;
+      }
+
+      // If fatal errors happen we should kill the job immediately rather than
+      // let the job retry several times, which eventually lead to failure.
+      if (fatal) {
+        continue; // wait until rj.isComplete
+      }
+
+      Counters ctrs = th.getCounters();
+
+      if (fatal = this.callBackObj.checkFatalErrors(ctrs, errMsg)) {
+        console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job.");
+        rj.killJob();
+        continue;
+      }
+      errMsg.setLength(0);
+
+      updateCounters(ctrs, rj);
+
+      String report = " " + getId() + " map = " + mapProgress + "%,  reduce = " + reduceProgress
+          + "%";
+
+      if (!report.equals(lastReport)
+          || System.currentTimeMillis() >= reportTime + maxReportInterval) {
+
+        // write out serialized plan with counters to log file
+        // LOG.info(queryPlan);
+        String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
+        SessionState ss = SessionState.get();
+        if (ss != null) {
+          ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs);
+          ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
+              Keys.TASK_HADOOP_PROGRESS, output);
+          ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), this.task);
+          this.callBackObj.logPlanProgress(ss);
+        }
+        console.printInfo(output);
+        lastReport = report;
+        reportTime = System.currentTimeMillis();
+      }
+    }
+
+    boolean success;
+    Counters ctrs = th.getCounters();
+
+    if (fatal) {
+      success = false;
+    } else {
+      // check for fatal error again in case it occurred after
+      // the last check before the job is completed
+      if (checkFatalErrors(ctrs, errMsg)) {
+        console.printError("[Fatal Error] " + errMsg.toString());
+        success = false;
+      } else {
+        success = rj.isSuccessful();
+      }
+    }
+
+    this.task.setDone();
+    // update based on the final value of the counters
+    updateCounters(ctrs, rj);
+
+    SessionState ss = SessionState.get();
+    if (ss != null) {
+      this.callBackObj.logPlanProgress(ss);
+    }
+    // LOG.info(queryPlan);
+    return (success);
+  }
+  
+  private String getId() {
+    return this.task.getId();
+  }
+
+  /**
+   * from StreamJob.java.
+   */
+  public void jobInfo(RunningJob rj) {
+    if (job.get("mapred.job.tracker", "local").equals("local")) {
+      console.printInfo("Job running in-process (local Hadoop)");
+    } else {
+      String hp = job.get("mapred.job.tracker");
+      if (SessionState.get() != null) {
+        SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
+            getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
+      }
+      console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
+          + rj.getTrackingURL());
+      console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
+          + " job  -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
+    }
+  }
+
+  /**
+   * This class contains the state of the running task Going forward, we will return this handle
+   * from execute and Driver can split execute into start, monitorProgess and postProcess.
+   */
+  private static class ExecDriverTaskHandle extends TaskHandle {
+    JobClient jc;
+    RunningJob rj;
+
+    JobClient getJobClient() {
+      return jc;
+    }
+
+    RunningJob getRunningJob() {
+      return rj;
+    }
+
+    public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+      this.jc = jc;
+      this.rj = rj;
+    }
+
+    public void setRunningJob(RunningJob job) {
+      rj = job;
+    }
+
+    @Override
+    public Counters getCounters() throws IOException {
+      return rj.getCounters();
+    }
+  }
+  
+  // Used for showJobFailDebugInfo
+  private static class TaskInfo {
+    String jobId;
+    HashSet<String> logUrls;
+
+    public TaskInfo(String jobId) {
+      this.jobId = jobId;
+      logUrls = new HashSet<String>();
+    }
+
+    public void addLogUrl(String logUrl) {
+      logUrls.add(logUrl);
+    }
+
+    public HashSet<String> getLogUrls() {
+      return logUrls;
+    }
+
+    public String getJobId() {
+      return jobId;
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
+    // Mapping from task ID to the number of failures
+    Map<String, Integer> failures = new HashMap<String, Integer>();
+    // Successful task ID's
+    Set<String> successes = new HashSet<String>();
+
+    Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>();
+
+    int startIndex = 0;
+
+    // Loop to get all task completion events because getTaskCompletionEvents
+    // only returns a subset per call
+    while (true) {
+      TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
+
+      if (taskCompletions == null || taskCompletions.length == 0) {
+        break;
+      }
+
+      boolean more = true;
+      for (TaskCompletionEvent t : taskCompletions) {
+        // getTaskJobIDs returns Strings for compatibility with Hadoop versions
+        // without TaskID or TaskAttemptID
+        String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
+
+        if (taskJobIds == null) {
+          console.printError("Task attempt info is unavailable in this Hadoop version");
+          more = false;
+          break;
+        }
+
+        // For each task completion event, get the associated task id, job id
+        // and the logs
+        String taskId = taskJobIds[0];
+        String jobId = taskJobIds[1];
+
+        TaskInfo ti = taskIdToInfo.get(taskId);
+        if (ti == null) {
+          ti = new TaskInfo(jobId);
+          taskIdToInfo.put(taskId, ti);
+        }
+        // These tasks should have come from the same job.
+        assert (ti.getJobId() == jobId);
+        ti.getLogUrls().add(getTaskAttemptLogUrl(t.getTaskTrackerHttp(), t.getTaskId()));
+
+        // If a task failed, then keep track of the total number of failures
+        // for that task (typically, a task gets re-run up to 4 times if it
+        // fails
+
+        if (t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
+          Integer failAttempts = failures.get(taskId);
+          if (failAttempts == null) {
+            failAttempts = Integer.valueOf(0);
+          }
+          failAttempts = Integer.valueOf(failAttempts.intValue() + 1);
+          failures.put(taskId, failAttempts);
+        } else {
+          successes.add(taskId);
+        }
+      }
+      if (!more) {
+        break;
+      }
+      startIndex += taskCompletions.length;
+    }
+    // Remove failures for tasks that succeeded
+    for (String task : successes) {
+      failures.remove(task);
+    }
+
+    if (failures.keySet().size() == 0) {
+      return;
+    }
+
+    // Find the highest failure count
+    int maxFailures = 0;
+    for (Integer failCount : failures.values()) {
+      if (maxFailures < failCount.intValue()) {
+        maxFailures = failCount.intValue();
+      }
+    }
+
+    // Display Error Message for tasks with the highest failure count
+    String jtUrl = JobTrackerURLResolver.getURL(conf);
+
+    for (String task : failures.keySet()) {
+      if (failures.get(task).intValue() == maxFailures) {
+        TaskInfo ti = taskIdToInfo.get(task);
+        String jobId = ti.getJobId();
+        String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid=" + task.toString();
+
+        TaskLogProcessor tlp = new TaskLogProcessor(conf);
+        for (String logUrl : ti.getLogUrls()) {
+          tlp.addTaskAttemptLogUrl(logUrl);
+        }
+
+        List<ErrorAndSolution> errors = tlp.getErrors();
+
+        StringBuilder sb = new StringBuilder();
+        // We use a StringBuilder and then call printError only once as
+        // printError will write to both stderr and the error log file. In
+        // situations where both the stderr and the log file output is
+        // simultaneously output to a single stream, this will look cleaner.
+        sb.append("\n");
+        sb.append("Task with the most failures(" + maxFailures + "): \n");
+        sb.append("-----\n");
+        sb.append("Task ID:\n  " + task + "\n\n");
+        sb.append("URL:\n  " + taskUrl + "\n");
+
+        for (ErrorAndSolution e : errors) {
+          sb.append("\n");
+          sb.append("Possible error:\n  " + e.getError() + "\n\n");
+          sb.append("Solution:\n  " + e.getSolution() + "\n");
+        }
+        sb.append("-----\n");
+
+        console.printError(sb.toString());
+
+        // Only print out one task because that's good enough for debugging.
+        break;
+      }
+    }
+    return;
+
+  }
+
+  public int progress(RunningJob rj, JobClient jc) throws IOException {
+    jobId = rj.getJobID();
+    
+    int returnVal = 0;
+    
+    // remove the pwd from conf file so that job tracker doesn't show this
+    // logs
+    String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
+    if (pwd != null) {
+      HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
+    }
+
+    // replace it back
+    if (pwd != null) {
+      HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
+    }
+
+    // add to list of running jobs to kill in case of abnormal shutdown
+
+    runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
+
+    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
+    jobInfo(rj);
+    boolean success = progress(th);
+
+    String statusMesg = getJobEndMsg(rj.getJobID());
+    if (!success) {
+      statusMesg += " with errors";
+      returnVal = 2;
+      console.printError(statusMesg);
+      if (HiveConf.getBoolVar(job, HiveConf.ConfVars.SHOW_JOB_FAIL_DEBUG_INFO)) {
+        showJobFailDebugInfo(job, rj);
+      }
+    } else {
+      console.printInfo(statusMesg);
+    }
+    
+    return returnVal;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java?rev=1073992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java Wed Feb 23 23:58:41 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.RunningJob;
+
+@SuppressWarnings("deprecation")
+public interface HadoopJobExecHook {
+  
+  public void updateCounters(Counters ctrs, RunningJob rj) throws IOException;
+  public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg);
+  public void logPlanProgress(SessionState ss) throws IOException;
+  
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Wed Feb 23 23:58:41 2011
@@ -207,6 +207,7 @@ public class StatsTask extends Task<Stat
        LOG.error("Cannot get table " + tableName, e);
        console.printError("Cannot get table " + tableName, e.toString());
     }
+    
     return aggregateStats();
   }
 
@@ -227,9 +228,7 @@ public class StatsTask extends Task<Stat
 
   private int aggregateStats() {
 
-    String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-    StatsFactory.setImplementation(statsImplementationClass, conf);
-    StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+    StatsAggregator statsAggregator = null;
 
     try {
       // Stats setup:
@@ -237,38 +236,44 @@ public class StatsTask extends Task<Stat
       FileSystem fileSys;
       FileStatus[] fileStatus;
 
-      // manufacture a StatsAggregator
-      if (!statsAggregator.connect(conf)) {
-        throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+      if(!this.getWork().getNoStatsAggregator()) {
+        String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+        StatsFactory.setImplementation(statsImplementationClass, conf);
+        statsAggregator = StatsFactory.getStatsAggregator();
+        // manufacture a StatsAggregator
+        if (!statsAggregator.connect(conf)) {
+          throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+        }
       }
 
       TableStatistics tblStats = new TableStatistics();
 
-      //
-      // For partitioned table get the old table statistics for incremental update
-      //
-      if (table.isPartitioned()) {
-        org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
-        Map<String, String> parameters = tTable.getParameters();
-        if (parameters.containsKey(StatsSetupConst.ROW_COUNT)) {
-          tblStats.setNumRows(Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)));
-        }
-        if (parameters.containsKey(StatsSetupConst.NUM_PARTITIONS)) {
-          tblStats.setNumPartitions(Integer.parseInt(parameters.get(StatsSetupConst.NUM_PARTITIONS)));
-        }
-        if (parameters.containsKey(StatsSetupConst.NUM_FILES)) {
-          tblStats.setNumFiles(Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)));
-        }
-        if (parameters.containsKey(StatsSetupConst.TOTAL_SIZE)) {
-          tblStats.setSize(Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)));
-        }
+      org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
+      Map<String, String> parameters = tTable.getParameters();
+      
+      boolean tableStatsExist = this.existStats(parameters);
+      
+      if (parameters.containsKey(StatsSetupConst.ROW_COUNT)) {
+        tblStats.setNumRows(Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)));
+      }
+      if (parameters.containsKey(StatsSetupConst.NUM_PARTITIONS)) {
+        tblStats.setNumPartitions(Integer.parseInt(parameters.get(StatsSetupConst.NUM_PARTITIONS)));
+      }
+      if (parameters.containsKey(StatsSetupConst.NUM_FILES)) {
+        tblStats.setNumFiles(Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)));
+      }
+      if (parameters.containsKey(StatsSetupConst.TOTAL_SIZE)) {
+        tblStats.setSize(Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)));
       }
 
       List<Partition> partitions = getPartitionsList();
-
+      boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
+      
       if (partitions == null) {
         // non-partitioned tables:
-
+        if (!tableStatsExist && atomic) {
+          return 0;
+        }
         Path tablePath = wh.getDefaultTablePath(table.getDbName(), table.getTableName());
         fileSys = tablePath.getFileSystem(conf);
         fileStatus = Utilities.getFileStatusRecurse(tablePath, 1, fileSys);
@@ -280,12 +285,14 @@ public class StatsTask extends Task<Stat
         tblStats.setSize(tableSize);
 
         // In case of a non-partitioned table, the key for stats temporary store is "rootDir"
-        String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
-        if (rows != null) {
-          tblStats.setNumRows(Long.parseLong(rows));
-        } else {
-          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
-            throw new HiveException("StatsAggregator failed to get numRows.");
+        if(statsAggregator != null) {
+          String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
+          if (rows != null) {
+            tblStats.setNumRows(Long.parseLong(rows));
+          } else {
+            if (atomic) {
+              throw new HiveException("StatsAggregator failed to get numRows.");
+            }
           }
         }
       } else {
@@ -294,20 +301,45 @@ public class StatsTask extends Task<Stat
         // and update the table stats based on the old and new stats.
         for (Partition partn : partitions) {
           //
+          // get the old partition stats
+          //
+          org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+          parameters = tPart.getParameters();
+
+          boolean hasStats = this.existStats(parameters);
+          if(!hasStats && atomic) {
+            continue;
+          }
+
+          int  nf = parameters.containsKey(StatsSetupConst.NUM_FILES) ?
+                    Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)) :
+                    0;
+          long nr = parameters.containsKey(StatsSetupConst.ROW_COUNT) ?
+                    Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)) :
+                    0L;
+          long sz = parameters.containsKey(StatsSetupConst.TOTAL_SIZE) ?
+                    Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)) :
+                    0L;
+          
+          //
           // get the new partition stats
           //
           PartitionStatistics newPartStats = new PartitionStatistics();
 
           // In that case of a partition, the key for stats temporary store is "rootDir/[dynamic_partition_specs/]%"
           String partitionID = work.getAggKey() + Warehouse.makePartPath(partn.getSpec());
-
-          String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
-          if (rows != null) {
-            newPartStats.setNumRows(Long.parseLong(rows));
-          } else {
-            if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
-              throw new HiveException("StatsAggregator failed to get numRows.");
+          
+          if (statsAggregator != null) {
+            String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
+            if (rows != null) {
+              newPartStats.setNumRows(Long.parseLong(rows));
+            } else {
+              if (atomic) {
+                throw new HiveException("StatsAggregator failed to get numRows.");
+              }
             }
+          } else {
+            newPartStats.setNumRows(nr);
           }
 
           fileSys = partn.getPartitionPath().getFileSystem(conf);
@@ -320,26 +352,6 @@ public class StatsTask extends Task<Stat
           }
           newPartStats.setSize(partitionSize);
 
-          //
-          // get the old partition stats
-          //
-          org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
-          Map<String, String> parameters = tPart.getParameters();
-
-          boolean hasStats =
-            parameters.containsKey(StatsSetupConst.NUM_FILES) ||
-            parameters.containsKey(StatsSetupConst.ROW_COUNT) ||
-            parameters.containsKey(StatsSetupConst.TOTAL_SIZE);
-
-          int  nf = parameters.containsKey(StatsSetupConst.NUM_FILES) ?
-                    Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)) :
-                    0;
-          long nr = parameters.containsKey(StatsSetupConst.ROW_COUNT) ?
-                    Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)) :
-                    0L;
-          long sz = parameters.containsKey(StatsSetupConst.TOTAL_SIZE) ?
-                    Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)) :
-                    0L;
           if (hasStats) {
             PartitionStatistics oldPartStats = new PartitionStatistics(nf, nr, sz);
             tblStats.updateStats(oldPartStats, newPartStats);
@@ -363,12 +375,10 @@ public class StatsTask extends Task<Stat
         }
       }
 
-
       //
       // write table stats to metastore
       //
-      org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
-      Map<String, String> parameters = tTable.getParameters();
+      parameters = tTable.getParameters();
       parameters.put(StatsSetupConst.ROW_COUNT, Long.toString(tblStats.getNumRows()));
       parameters.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(tblStats.getNumPartitions()));
       parameters.put(StatsSetupConst.NUM_FILES, Integer.toString(tblStats.getNumFiles()));
@@ -387,11 +397,20 @@ public class StatsTask extends Task<Stat
           "Failed with exception " + e.getMessage() + "\n"
           + StringUtils.stringifyException(e));
     } finally {
-      statsAggregator.closeConnection();
+      if(statsAggregator != null) {
+        statsAggregator.closeConnection();        
+      }
     }
     // StatsTask always return 0 so that the whole job won't fail
     return 0;
   }
+  
+  private boolean existStats(Map<String, String> parameters) {
+    return parameters.containsKey(StatsSetupConst.ROW_COUNT)
+        || parameters.containsKey(StatsSetupConst.NUM_FILES)
+        || parameters.containsKey(StatsSetupConst.TOTAL_SIZE)
+        || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
+  }
 
   /**
    * Get the list of partitions that need to update statistics.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java Wed Feb 23 23:58:41 2011
@@ -43,7 +43,7 @@ public final class Throttle {
   /**
    * Fetch http://tracker.om:/gc.jsp?threshold=period.
    */
-  static void checkJobTracker(JobConf conf, Log LOG) {
+  public static void checkJobTracker(JobConf conf, Log LOG) {
 
     try {
       byte[] buffer = new byte[1024];

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Feb 23 23:58:41 2011
@@ -80,6 +80,9 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
@@ -91,6 +94,7 @@ import org.apache.hadoop.hive.ql.parse.E
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -101,9 +105,12 @@ import org.apache.hadoop.hive.ql.plan.Pl
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
@@ -1133,6 +1140,83 @@ public final class Utilities {
     Path pathPattern = new Path(path, sb.toString());
     return fs.globStatus(pathPattern);
   }
+  
+  public static void mvFileToFinalPath(String specPath, Configuration hconf,
+      boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException, HiveException {
+
+    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+    Path tmpPath = Utilities.toTempPath(specPath);
+    Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+        + ".intermediate");
+    Path finalPath = new Path(specPath);
+    if (success) {
+      if (fs.exists(tmpPath)) {
+        // Step1: rename tmp output folder to intermediate path. After this
+        // point, updates from speculative tasks still writing to tmpPath
+        // will not appear in finalPath.
+        log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+        Utilities.rename(fs, tmpPath, intermediatePath);
+        // Step2: remove any tmp file or double-committed output files
+        ArrayList<String> emptyBuckets =
+          Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+        // create empty buckets if necessary
+        if (emptyBuckets.size() > 0) {
+          createEmptyBuckets(hconf, emptyBuckets, conf);
+        }
+
+        // Step3: move to the file destination
+        log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
+        Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+      }
+    } else {
+      fs.delete(tmpPath, true);
+    }
+  }
+  
+  /**
+   * Check the existence of buckets according to bucket specification. Create empty buckets if
+   * needed.
+   * @param specPath The final path where the dynamic partitions should be in.
+   * @param conf FileSinkDesc.
+   * @param dpCtx dynamic partition context.
+   * @throws HiveException
+   * @throws IOException
+   */
+  private static void createEmptyBuckets(Configuration hconf, ArrayList<String> paths, FileSinkDesc conf)
+      throws HiveException, IOException {
+
+    JobConf jc;
+    if (hconf instanceof JobConf) {
+      jc = new JobConf(hconf);
+    } else {
+      // test code path
+      jc = new JobConf(hconf, ExecDriver.class);
+    }
+    HiveOutputFormat<?, ?> hiveOutputFormat = null;
+    Class<? extends Writable> outputClass = null;
+    boolean isCompressed = conf.getCompressed();
+    TableDesc tableInfo = conf.getTableInfo();
+    try {
+      Serializer serializer = (Serializer) tableInfo.getDeserializerClass().newInstance();
+      serializer.initialize(null, tableInfo.getProperties());
+      outputClass = serializer.getSerializedClass();
+      hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+    } catch (SerDeException e) {
+      throw new HiveException(e);
+    } catch (InstantiationException e) {
+      throw new HiveException(e);
+    } catch (IllegalAccessException e) {
+      throw new HiveException(e);
+    }
+
+    for (String p: paths) {
+      Path path = new Path(p);
+      RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+          jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
+      writer.close(false);
+      LOG.info("created empty bucket for enforcing bucketing at " + path);
+    }
+  }
 
   /**
    * Remove all temporary files and duplicate (double-committed) files from a given directory.
@@ -1644,4 +1728,20 @@ public final class Utilities {
     double result = (double) time / (double)1000;
     return result;
   }
+
+  /**
+   * Determines whether a partition has been archived
+   *
+   * @param p
+   * @return
+   */
+  public static boolean isArchived(Partition p) {
+    Map<String, String> params = p.getParameters();
+    if ("true".equalsIgnoreCase(params.get(
+        org.apache.hadoop.hive.metastore.api.Constants.IS_ARCHIVED))) {
+      return true;
+    } else {
+      return false;
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Feb 23 23:58:41 2011
@@ -387,10 +387,12 @@ public final class HiveFileFormatUtils {
   public static List<Operator<? extends Serializable>> doGetAliasesFromPath(
     Map<String, ArrayList<String>> pathToAliases,
     Map<String, Operator<? extends Serializable>> aliasToWork, Path dir) {
-
-    String path = getMatchingPath(pathToAliases, dir);
     List<Operator<? extends Serializable>> opList =
       new ArrayList<Operator<? extends Serializable>>();
+    if (pathToAliases == null) {
+      return opList;
+    }
+    String path = getMatchingPath(pathToAliases, dir);
     List<String> aliases = pathToAliases.get(path);
     for (String alias : aliases) {
       opList.add(aliasToWork.get(alias));

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Feb 23 23:58:41 2011
@@ -374,7 +374,11 @@ public class HiveInputFormat<K extends W
     if (this.mrwork == null) {
       init(job);
     }
-
+    
+    if(this.mrwork.getPathToAliases() == null) {
+      return;
+    }
+    
     ArrayList<String> aliases = new ArrayList<String>();
     Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
         .getPathToAliases().entrySet().iterator();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Feb 23 23:58:41 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.VersionMismatchException;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -190,7 +191,7 @@ public class RCFile {
    * <li>{the end of the key part}</li>
    * </ul>
    */
-  static class KeyBuffer implements Writable {
+  public static class KeyBuffer implements WritableComparable {
     // each column's value length in a split
     private int[] eachColumnValueLen = null;
     private int[] eachColumnUncompressedValueLen = null;
@@ -200,6 +201,14 @@ public class RCFile {
     private int numberRows = 0;
     // how many columns
     private int columnNumber = 0;
+    
+    // return the number of columns recorded in this file's header
+    public int getColumnNumber() {
+      return columnNumber;
+    }
+
+    public KeyBuffer(){
+    }
 
     KeyBuffer(int columnNumber) {
       this(0, columnNumber);
@@ -281,6 +290,12 @@ public class RCFile {
 
       return ret;
     }
+
+    @Override
+    public int compareTo(Object arg0) {
+      throw new RuntimeException("compareTo not supported in class "
+          + this.getClass().getName());
+    }
   }
 
   /**
@@ -293,7 +308,7 @@ public class RCFile {
    * column_2_row_2_value,....]</li>
    * </ul>
    */
-  static class ValueBuffer implements Writable {
+  public static class ValueBuffer implements WritableComparable {
 
     class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
 
@@ -347,6 +362,9 @@ public class RCFile {
     Decompressor valDecompressor = null;
     NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
     CompressionInputStream deflatFilter = null;
+    
+    public ValueBuffer() throws IOException {
+    }
 
     public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
       this(keyBuffer, null);
@@ -464,6 +482,12 @@ public class RCFile {
         CodecPool.returnDecompressor(valDecompressor);
       }
     }
+    
+    @Override
+    public int compareTo(Object arg0) {
+      throw new RuntimeException("compareTo not supported in class "
+          + this.getClass().getName());
+    }
   }
 
   /**
@@ -872,6 +896,33 @@ public class RCFile {
       bufferedRecords = 0;
       columnBufferSize = 0;
     }
+    
+    /**
+     * flush a block out without doing anything except compressing the key part.
+     */
+    public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
+        int recordLen, int keyLength, int compressedKeyLen) throws IOException {
+      checkAndWriteSync(); // sync
+      out.writeInt(recordLen); // total record length
+      out.writeInt(keyLength); // key portion length
+      
+      if(this.isCompressed()) {
+        //compress key and write key out
+        keyCompressionBuffer.reset();
+        keyDeflateFilter.resetState();
+        keyBuffer.write(keyDeflateOut);
+        keyDeflateOut.flush();
+        keyDeflateFilter.finish();
+        compressedKeyLen = keyCompressionBuffer.getLength();
+        out.writeInt(compressedKeyLen);
+        out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+      } else {
+        out.writeInt(compressedKeyLen);
+        keyBuffer.write(out);
+      }
+
+      valueBuffer.write(out); // value
+    }
 
     private void clearColumnBuffers() throws IOException {
       for (int i = 0; i < columnNumber; i++) {
@@ -1304,6 +1355,15 @@ public class RCFile {
       currentValue.readFields(in);
       currentValue.inited = true;
     }
+    
+    public boolean nextBlock() throws IOException {
+      int keyLength = nextKeyBuffer();
+      if(keyLength > 0) {
+        currentValueBuffer();
+        return true;
+      }
+      return false;
+    }
 
     private boolean rowFetched = false;
 
@@ -1500,5 +1560,44 @@ public class RCFile {
         CodecPool.returnDecompressor(keyDecompressor);
       }
     }
+
+    /**
+     * return the KeyBuffer object used in the reader. Internally in each
+     * reader, there is only one KeyBuffer object, which gets reused for every
+     * block.
+     */
+    public KeyBuffer getCurrentKeyBufferObj() {
+      return this.currentKey;
+    }
+    
+    /**
+     * return the ValueBuffer object used in the reader. Internally in each
+     * reader, there is only one ValueBuffer object, which gets reused for every
+     * block.
+     */
+    public ValueBuffer getCurrentValueBufferObj() {
+      return this.currentValue;
+    }
+    
+    //return the current block's length
+    public int getCurrentBlockLength() {
+      return this.currentRecordLength;
+    }
+
+    //return the current block's key length
+    public int getCurrentKeyLength() {
+      return this.currentKeyLength;
+    }
+
+    //return the current block's compressed key length
+    public int getCurrentCompressedKeyLen() {
+      return this.compressedKeyLen;
+    }
+    
+    //return the CompressionCodec used for this file
+    public CompressionCodec getCompressionCodec() {
+      return this.codec;
+    }
+    
   }
 }