You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by at...@apache.org on 2009/01/28 22:15:25 UTC

svn commit: r738626 - in /hadoop/hive/trunk: ./ cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/...

Author: athusoo
Date: Wed Jan 28 21:15:25 2009
New Revision: 738626

URL: http://svn.apache.org/viewvc?rev=738626&view=rev
Log:
HIVE-176. Added a history log for Hive. (Suresh Anthony via athusoo)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
    hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jan 28 21:15:25 2009
@@ -9,6 +9,8 @@
 
   NEW FEATURES
 
+    HIVE-176. Added a history log for Hive. (Suresh Anthony via athusoo)
+
     HIVE-244. Add SQRT() UDF. (Jeff Hammerbacher via zshao)
 
     HIVE-216. Generate ruby bindings for service. (Raghotham Murthy via zshao)

Modified: hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (original)
+++ hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java Wed Jan 28 21:15:25 2009
@@ -250,7 +250,6 @@
       System.exit(3);
     }
 
-    SessionState.start(ss);
 
     if(! oproc.process_stage2(ss)) {
       System.exit(2);
@@ -261,6 +260,8 @@
     for(Map.Entry<Object, Object> item: ss.cmdProperties.entrySet()) {
       conf.set((String) item.getKey(), (String) item.getValue());
     }
+    
+    SessionState.start(ss);
 
     CliDriver cli = new CliDriver ();
 

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 28 21:15:25 2009
@@ -82,8 +82,13 @@
 
     // session identifier
     HIVESESSIONID("hive.session.id", ""),
+    
     // query being executed (multiple per session)
-    HIVEQUERYID("hive.query.string", ""),
+    HIVEQUERYSTRING("hive.query.string", ""),
+    
+    // id of query being executed (multiple per session)
+    HIVEQUERYID("hive.query.id", ""),
+    
     // id of the mapred plan being executed (multiple per query)
     HIVEPLANID("hive.query.planid", ""),
     // max jobname length
@@ -104,7 +109,11 @@
     
     // Default file format for CREATE TABLE statement
     // Options: TextFile, SequenceFile
-    HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile");
+    HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"),
+    
+    //Location of Hive run time structured log file
+    HIVEHISTORYFILELOC("hive.querylog.location",  "/tmp/"+System.getProperty("user.name"));
+    
     
     public final String varname;
     public final String defaultVal;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Jan 28 21:15:25 2009
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
+
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 
 import org.apache.commons.lang.StringUtils;
@@ -39,6 +40,8 @@
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.serde.ByteStream;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,21 +52,21 @@
 public class Driver implements CommandProcessor {
 
   static final private Log LOG = LogFactory.getLog("hive.ql.Driver");
-  private int maxRows   = 100;
+  private int maxRows = 100;
   ByteStream.Output bos = new ByteStream.Output();
-  
-  private ParseDriver  pd;
-  private HiveConf     conf;
-  private DataInput    resStream;
-  private LogHelper    console;
-  private Context      ctx;
+
+  private ParseDriver pd;
+  private HiveConf conf;
+  private DataInput resStream;
+  private LogHelper console;
+  private Context ctx;
   private BaseSemanticAnalyzer sem;
-  
+
   public int countJobs(List<Task<? extends Serializable>> tasks) {
     if (tasks == null)
       return 0;
     int jobs = 0;
-    for (Task<? extends Serializable> task: tasks) {
+    for (Task<? extends Serializable> task : tasks) {
       if (task.isMapRedTask()) {
         jobs++;
       }
@@ -81,11 +84,12 @@
         sem.setFetchTaskInit(true);
         sem.getFetchTask().initialize(conf);
       }
-      FetchTask ft = (FetchTask)sem.getFetchTask();
+      FetchTask ft = (FetchTask) sem.getFetchTask();
 
       tableDesc td = ft.getTblDesc();
       String tableName = "result";
-      List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer());
+      List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(
+          tableName, td.getDeserializer());
       String schema = MetaStoreUtils.getDDLFromFieldSchema(tableName, lst);
       return schema;
     }
@@ -111,7 +115,7 @@
       return false;
 
     boolean hasReduce = false;
-    for (Task<? extends Serializable> task: tasks) {
+    for (Task<? extends Serializable> task : tasks) {
       if (task.hasReduce()) {
         return true;
       }
@@ -121,10 +125,9 @@
     return hasReduce;
   }
 
-
   /**
    * for backwards compatibility with current tests
-   */ 
+   */
   public Driver(HiveConf conf) {
     console = new LogHelper(LOG);
     this.conf = conf;
@@ -133,34 +136,55 @@
 
   public Driver() {
     console = new LogHelper(LOG);
-    if(SessionState.get() != null) {
+    if (SessionState.get() != null) {
       conf = SessionState.get().getConf();
       ctx = new Context(conf);
     }
   }
 
+  private  String makeQueryId() {
+    GregorianCalendar gc = new GregorianCalendar();
+    String userid = System.getProperty("user.name");
+
+    return userid + "_" +
+      String.format("%1$4d%2$02d%3$02d%4$02d%5$02d%5$02d", gc.get(Calendar.YEAR),
+                    gc.get(Calendar.MONTH) + 1,
+                    gc.get(Calendar.DAY_OF_MONTH),
+                    gc.get(Calendar.HOUR_OF_DAY),
+                    gc.get(Calendar.MINUTE), gc.get(Calendar.SECOND));
+  }
+
+  
   public int run(String command) {
 
-    boolean noName = StringUtils.isEmpty(conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME));
+    boolean noName = StringUtils.isEmpty(conf
+        .getVar(HiveConf.ConfVars.HADOOPJOBNAME));
     int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
     int jobs = 0;
 
-    conf.setVar(HiveConf.ConfVars.HIVEQUERYID, command);
+    conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, command);
+    
+    String queryId = makeQueryId();
+    conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
 
     try {
-      
+
       TaskFactory.resetId();
       LOG.info("Starting command: " + command);
 
       ctx.clear();
       ctx.makeScratchDir();
+
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().startQuery(command, conf.getVar(HiveConf.ConfVars.HIVEQUERYID) );
+
       resStream = null;
-      
+
       pd = new ParseDriver();
       ASTNode tree = pd.parse(command);
 
-      while((tree.getToken() == null) && (tree.getChildCount() > 0)) {
-        tree = (ASTNode)tree.getChild(0);
+      while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
+        tree = (ASTNode) tree.getChild(0);
       }
 
       sem = SemanticAnalyzerFactory.get(conf, tree);
@@ -173,55 +197,71 @@
       if (jobs > 0) {
         console.printInfo("Total MapReduce jobs = " + jobs);
       }
-      
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().setQueryProperty(queryId,
+            Keys.QUERY_NUM_TASKS, String.valueOf(jobs));
+
       boolean hasReduce = hasReduceTasks(sem.getRootTasks());
+
       if (hasReduce) {
-        console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS));
+        console.printInfo("Number of reducers = "
+            + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS));
         console.printInfo("In order to change numer of reducers use:");
         console.printInfo("  set mapred.reduce.tasks = <number>");
       }
 
       String jobname = Utilities.abbreviate(command, maxlen - 6);
       int curJob = 0;
-      for(Task<? extends Serializable> rootTask: sem.getRootTasks()) {
+      for (Task<? extends Serializable> rootTask : sem.getRootTasks()) {
         // assumption that only top level tasks are map-reduce tasks
         if (rootTask.isMapRedTask()) {
-          curJob ++;
-          if(noName) {
-            conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob + "/" + jobs + ")");
+          curJob++;
+          if (noName) {
+            conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob
+                + "/" + jobs + ")");
           }
         }
         rootTask.initialize(conf);
       }
 
       // A very simple runtime that keeps putting runnable takss
-      // on a list and when a job completes, it puts the children at the back of the list
+      // on a list and when a job completes, it puts the children at the back of
+      // the list
       // while taking the job to run from the front of the list
       Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
 
-      for(Task<? extends Serializable> rootTask:sem.getRootTasks()) {
+      for (Task<? extends Serializable> rootTask : sem.getRootTasks()) {
         if (runnable.offer(rootTask) == false) {
           LOG.error("Could not insert the first task into the queue");
           return (1);
         }
       }
 
-      while(runnable.peek() != null) {
+      while (runnable.peek() != null) {
         Task<? extends Serializable> tsk = runnable.remove();
 
+        if (SessionState.get() != null)
+          SessionState.get().getHiveHistory().startTask(queryId, tsk,
+              tsk.getClass().getName());
+
         int exitVal = tsk.execute();
+        if (SessionState.get() != null) {
+          SessionState.get().getHiveHistory().setTaskProperty(queryId,
+              tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal));
+          SessionState.get().getHiveHistory().endTask(queryId, tsk);
+        }
         if (exitVal != 0) {
-          console.printError("FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName());
+          console.printError("FAILED: Execution Error, return code " + exitVal
+              + " from " + tsk.getClass().getName());
           return 9;
         }
-
         tsk.setDone();
 
         if (tsk.getChildTasks() == null) {
           continue;
         }
 
-        for(Task<? extends Serializable> child: tsk.getChildTasks()) {
+        for (Task<? extends Serializable> child : tsk.getChildTasks()) {
           // Check if the child is runnable
           if (!child.isRunnable()) {
             continue;
@@ -232,51 +272,66 @@
           }
         }
       }
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().setQueryProperty(queryId,
+            Keys.QUERY_RET_CODE, String.valueOf(0));
     } catch (SemanticException e) {
-      console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().setQueryProperty(queryId,
+            Keys.QUERY_RET_CODE, String.valueOf(10));
+      console.printError("FAILED: Error in semantic analysis: "
+          + e.getMessage(), "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
       return (10);
     } catch (ParseException e) {
-      console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().setQueryProperty(queryId,
+            Keys.QUERY_RET_CODE, String.valueOf(11));
+      console.printError("FAILED: Parse Error: " + e.getMessage(), "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
       return (11);
     } catch (Exception e) {
-      // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
-      console.printError("FAILED: Unknown exception : " + e.getMessage(),
-                         "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().setQueryProperty(queryId,
+            Keys.QUERY_RET_CODE, String.valueOf(12));
+      // Has to use full name to make sure it does not conflict with
+      // org.apache.commons.lang.StringUtils
+      console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
       return (12);
     } finally {
-      if(noName) {
+      if (SessionState.get() != null)
+        SessionState.get().getHiveHistory().endQuery(queryId);
+      if (noName) {
         conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, "");
-      } 
+      }
     }
 
     console.printInfo("OK");
     return (0);
   }
-  
-  
-  public boolean getResults(Vector<String> res) 
-  {
+
+  public boolean getResults(Vector<String> res) {
     if (sem != null && sem.getFetchTask() != null) {
       if (!sem.getFetchTaskInit()) {
         sem.setFetchTaskInit(true);
         sem.getFetchTask().initialize(conf);
       }
-      FetchTask ft = (FetchTask)sem.getFetchTask();
+      FetchTask ft = (FetchTask) sem.getFetchTask();
       ft.setMaxRows(maxRows);
       return ft.fetch(res);
     }
 
     if (resStream == null)
       resStream = ctx.getStream();
-    if (resStream == null) return false;
-    
+    if (resStream == null)
+      return false;
+
     int numRows = 0;
     String row = null;
 
-    while (numRows < maxRows)
-    {
-      if (resStream == null) 
-      {
+    while (numRows < maxRows) {
+      if (resStream == null) {
         if (numRows > 0)
           return true;
         else
@@ -285,8 +340,7 @@
 
       bos.reset();
       Utilities.streamStatus ss;
-      try
-      {
+      try {
         ss = Utilities.readColumn(resStream, bos);
         if (bos.getCount() > 0)
           row = new String(bos.getData(), 0, bos.getCount(), "UTF-8");
@@ -298,12 +352,13 @@
           res.add(row);
         }
       } catch (IOException e) {
-        console.printError("FAILED: Unexpected IO exception : " + e.getMessage());
+        console.printError("FAILED: Unexpected IO exception : "
+            + e.getMessage());
         res = null;
         return false;
       }
 
-      if (ss == Utilities.streamStatus.EOF) 
+      if (ss == Utilities.streamStatus.EOF)
         resStream = ctx.getStream();
     }
     return true;
@@ -314,14 +369,12 @@
       // Delete the scratch directory from the context
       ctx.removeScratchDir();
       ctx.clear();
+    } catch (Exception e) {
+      console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      return (13);
     }
-    catch (Exception e) {
-      console.printError("FAILED: Unknown exception : " + e.getMessage(),
-                         "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
-      return(13);
-    }
-    
-    return(0);
+
+    return (0);
   }
 }
-

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Jan 28 21:15:25 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.io.*;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -61,15 +62,16 @@
   public static String getRealFiles(Configuration conf) {
     // fill in local files to be added to the task environment
     SessionState ss = SessionState.get();
-    Set<String> files = (ss == null) ? null : ss.list_resource(SessionState.ResourceType.FILE, null);
-    if(files != null) {
-      ArrayList<String> realFiles = new ArrayList<String> (files.size());
-      for(String one: files) {
+    Set<String> files = (ss == null) ? null : ss.list_resource(
+        SessionState.ResourceType.FILE, null);
+    if (files != null) {
+      ArrayList<String> realFiles = new ArrayList<String>(files.size());
+      for (String one : files) {
         try {
           realFiles.add(Utilities.realFile(one, conf));
         } catch (IOException e) {
-          throw new RuntimeException ("Cannot validate file " + one +
-                                      "due to exception: " + e.getMessage(), e);
+          throw new RuntimeException("Cannot validate file " + one
+              + "due to exception: " + e.getMessage(), e);
         }
       }
       return StringUtils.join(realFiles, ",");
@@ -78,11 +80,10 @@
     }
   }
 
-
   /**
    * Initialization when invoked from QL
    */
-  public void initialize (HiveConf conf) {
+  public void initialize(HiveConf conf) {
     super.initialize(conf);
     job = new JobConf(conf, ExecDriver.class);
     String realFiles = getRealFiles(job);
@@ -91,7 +92,7 @@
 
       // workaround for hadoop-17 - jobclient only looks at commandlineconfig
       Configuration commandConf = JobClient.getCommandLineConfig();
-      if(commandConf != null) {
+      if (commandConf != null) {
         commandConf.set("tmpfiles", realFiles);
       }
     }
@@ -100,63 +101,70 @@
   /**
    * Constructor/Initialization for invocation as independent utility
    */
-  public ExecDriver(mapredWork plan, JobConf job, boolean isSilent) throws HiveException {
+  public ExecDriver(mapredWork plan, JobConf job, boolean isSilent)
+      throws HiveException {
     setWork(plan);
     this.job = job;
     LOG = LogFactory.getLog(this.getClass().getName());
-    console = new LogHelper(LOG, isSilent);    
+    console = new LogHelper(LOG, isSilent);
   }
 
   protected void fillInDefaults() {
     // this is a temporary hack to fix things that are not fixed in the compiler
-    if(work.getNumReduceTasks() == null) {
-      if(work.getReducer() == null) {
-        LOG.warn("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator");
+    if (work.getNumReduceTasks() == null) {
+      if (work.getReducer() == null) {
+        LOG
+            .warn("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator");
         work.setNumReduceTasks(Integer.valueOf(0));
       } else {
-        LOG.warn("Number of reduce tasks not specified. Defaulting to jobconf value of: " + job.getNumReduceTasks());
+        LOG
+            .warn("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+                + job.getNumReduceTasks());
         work.setNumReduceTasks(job.getNumReduceTasks());
       }
-    }
-    else
-      LOG.info("Number of reduce tasks determined at compile : " + work.getNumReduceTasks());
+    } else
+      LOG.info("Number of reduce tasks determined at compile : "
+          + work.getNumReduceTasks());
   }
 
   /**
-   * A list of the currently running jobs spawned in this Hive instance that is used
-   * to kill all running jobs in the event of an unexpected shutdown - i.e., the JVM shuts
-   * down while there are still jobs running.
+   * A list of the currently running jobs spawned in this Hive instance that is
+   * used to kill all running jobs in the event of an unexpected shutdown -
+   * i.e., the JVM shuts down while there are still jobs running.
    */
-  public static HashMap<String,String> runningJobKillURIs = new HashMap<String, String> ();
-
+  public static HashMap<String, String> runningJobKillURIs = new HashMap<String, String>();
 
   /**
-   * In Hive, when the user control-c's the command line, any running jobs spawned from that command 
-   * line are best-effort killed.
-   *
-   * This static constructor registers a shutdown thread to iterate over all the running job
-   * kill URLs and do a get on them.
-   *
+   * In Hive, when the user control-c's the command line, any running jobs
+   * spawned from that command line are best-effort killed.
+   * 
+   * This static constructor registers a shutdown thread to iterate over all the
+   * running job kill URLs and do a get on them.
+   * 
    */
   static {
-    if(new org.apache.hadoop.conf.Configuration().getBoolean("webinterface.private.actions", false)) {
+    if (new org.apache.hadoop.conf.Configuration().getBoolean(
+        "webinterface.private.actions", false)) {
       Runtime.getRuntime().addShutdownHook(new Thread() {
-          public void run() {
-            for(Iterator<String> elems = runningJobKillURIs.values().iterator(); elems.hasNext() ;  ) {
-              String uri = elems.next();
-              try {
-                System.err.println("killing job with: " + uri);
-                int retCode = ((java.net.HttpURLConnection)new java.net.URL(uri).openConnection()).getResponseCode();
-                if(retCode != 200) {
-                  System.err.println("Got an error trying to kill job with URI: " + uri + " = " + retCode);
-                }
-              } catch(Exception e) {
-                System.err.println("trying to kill job, caught: " + e);
-                // do nothing 
+        public void run() {
+          for (Iterator<String> elems = runningJobKillURIs.values().iterator(); elems
+              .hasNext();) {
+            String uri = elems.next();
+            try {
+              System.err.println("killing job with: " + uri);
+              int retCode = ((java.net.HttpURLConnection) new java.net.URL(uri)
+                  .openConnection()).getResponseCode();
+              if (retCode != 200) {
+                System.err.println("Got an error trying to kill job with URI: "
+                    + uri + " = " + retCode);
               }
+            } catch (Exception e) {
+              System.err.println("trying to kill job, caught: " + e);
+              // do nothing
             }
           }
-        });
+        }
+      });
     }
   }
 
@@ -168,19 +176,23 @@
       console.printInfo("Job running in-process (local Hadoop)");
     } else {
       String hp = job.get("mapred.job.tracker");
-      console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + rj.getTrackingURL());
-      console.printInfo("Kill Command = " +
-                  HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) +
-                  " job  -Dmapred.job.tracker=" + hp + " -kill "
-                  + rj.getJobID());
+      if (SessionState.get() != null) {
+        SessionState.get().getHiveHistory().setTaskProperty(
+            SessionState.get().getQueryId(), getId(),
+            Keys.TASK_HADOOP_ID, rj.getJobID());
+      }
+      console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = "
+          + rj.getTrackingURL());
+      console.printInfo("Kill Command = "
+          + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
+          + " job  -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
     }
   }
 
   /**
    * from StreamJob.java
    */
-  public RunningJob jobProgress(JobClient jc, RunningJob rj)
-    throws IOException {
+  public RunningJob jobProgress(JobClient jc, RunningJob rj) throws IOException {
     String lastReport = "";
     while (!rj.isComplete()) {
       try {
@@ -190,9 +202,20 @@
       rj = jc.getJob(rj.getJobID());
       String report = null;
       report = " map = " + Math.round(rj.mapProgress() * 100) + "%,  reduce ="
-        + Math.round(rj.reduceProgress() * 100) + "%";
-      
+          + Math.round(rj.reduceProgress() * 100) + "%";
+
       if (!report.equals(lastReport)) {
+
+        SessionState ss = SessionState.get();
+        if (ss != null) {
+          ss.getHiveHistory().setTaskCounters(
+              SessionState.get().getQueryId(), getId(), rj);
+          ss.getHiveHistory().setTaskProperty(
+              SessionState.get().getQueryId(), getId(),
+              Keys.TASK_HADOOP_PROGRESS, report);
+          ss.getHiveHistory().progressTask(
+              SessionState.get().getQueryId(), this);
+        }
         console.printInfo(report);
         lastReport = report;
       }
@@ -202,48 +225,51 @@
 
   private void inferNumReducers() throws Exception {
     FileSystem fs = FileSystem.get(job);
-    
+
     if ((work.getReducer() != null) && (work.getInferNumReducers() == true)) {
       long inpSz = 0;
-      
+
       // based on the input size - estimate the number of reducers
       Path[] inputPaths = FileInputFormat.getInputPaths(job);
-      
+
       for (Path inputP : inputPaths) {
         if (fs.exists(inputP)) {
           FileStatus[] fStats = fs.listStatus(inputP);
-          for (FileStatus fStat:fStats) 
+          for (FileStatus fStat : fStats)
             inpSz += fStat.getLen();
         }
       }
 
-      
-      int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1;
-      if (newRed < work.getNumReduceTasks().intValue())
-      {
-        LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed);
+      int newRed = (int) (inpSz / LOAD_PER_REDUCER) + 1;
+      if (newRed < work.getNumReduceTasks().intValue()) {
+
+        LOG.warn("Number of reduce tasks inferred based on input size to : "
+            + newRed);
         work.setNumReduceTasks(Integer.valueOf(newRed));
+
       }
     }
   }
 
   /**
    * Add new elements to the classpath
-   * @param newPaths Array of classpath elements
+   * 
+   * @param newPaths
+   *          Array of classpath elements
    */
-  private static void addToClassPath(String [] newPaths) throws Exception {
+  private static void addToClassPath(String[] newPaths) throws Exception {
     Thread curThread = Thread.currentThread();
-    URLClassLoader loader = (URLClassLoader)curThread.getContextClassLoader();
+    URLClassLoader loader = (URLClassLoader) curThread.getContextClassLoader();
     List<URL> curPath = Arrays.asList(loader.getURLs());
-    ArrayList<URL> newPath = new ArrayList<URL> ();
+    ArrayList<URL> newPath = new ArrayList<URL>();
 
-    for(String onestr: newPaths) {
+    for (String onestr : newPaths) {
       URL oneurl = (new File(onestr)).toURL();
-      if(!curPath.contains(oneurl)) {
+      if (!curPath.contains(oneurl)) {
         newPath.add(oneurl);
       }
     }
-    
+
     loader = new URLClassLoader(newPath.toArray(new URL[0]), loader);
     curThread.setContextClassLoader(loader);
   }
@@ -256,35 +282,34 @@
     fillInDefaults();
 
     String invalidReason = work.isInvalid();
-    if(invalidReason != null) {
-      throw new RuntimeException("Plan invalid, Reason: "+invalidReason);
+    if (invalidReason != null) {
+      throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
     }
 
     Utilities.setMapRedWork(job, work);
-    
-    
-    for(String onefile: work.getPathToAliases().keySet()) {
+
+    for (String onefile : work.getPathToAliases().keySet()) {
       LOG.info("Adding input file " + onefile);
       FileInputFormat.addInputPaths(job, onefile);
     }
-        
+
     String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
     String jobScratchDir = hiveScratchDir + Utilities.randGen.nextInt();
     FileOutputFormat.setOutputPath(job, new Path(jobScratchDir));
     job.setMapperClass(ExecMapper.class);
-    
-    job.setMapOutputKeyClass(HiveKey.class);    
+
+    job.setMapOutputKeyClass(HiveKey.class);
     job.setMapOutputValueClass(BytesWritable.class);
-    
+
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());
     job.setReducerClass(ExecReducer.class);
-    
+
     job.setInputFormat(org.apache.hadoop.hive.ql.io.HiveInputFormat.class);
-    
-    // No-Op - we don't really write anything here .. 
+
+    // No-Op - we don't really write anything here ..
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    
+
     String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
     if (StringUtils.isNotBlank(auxJars)) {
       LOG.info("adding libjars: " + auxJars);
@@ -294,47 +319,62 @@
     int returnVal = 0;
     FileSystem fs = null;
     RunningJob rj = null;
-    
+
     try {
       fs = FileSystem.get(job);
-      
+
       // if the input is empty exit gracefully
       Path[] inputPaths = FileInputFormat.getInputPaths(job);
       boolean emptyInput = true;
       for (Path inputP : inputPaths) {
-        if(!fs.exists(inputP))
+        if (!fs.exists(inputP))
           continue;
-        
+
         FileStatus[] fStats = fs.listStatus(inputP);
-        for (FileStatus fStat:fStats) {
-        	if (fStat.getLen() > 0) {
-        	  emptyInput = false;
-        		break;
-        	}
+        for (FileStatus fStat : fStats) {
+          if (fStat.getLen() > 0) {
+            emptyInput = false;
+            break;
+          }
         }
       }
-      	
+
       if (emptyInput) {
         console.printInfo("Job need not be submitted: no output: Success");
-      	return 0;
+        return 0;
       }
-      
+
       inferNumReducers();
-      JobClient jc = new JobClient(job);
+
       
+      if (SessionState.get() != null) {
+        if (work.getReducer() != null) {
+          SessionState.get().getHiveHistory().setTaskProperty(
+              SessionState.get().getQueryId(), getId(),
+              Keys.TASK_NUM_REDUCERS, String.valueOf(work.getNumReduceTasks()));
+        } else {
+          SessionState.get().getHiveHistory().setTaskProperty(
+              SessionState.get().getQueryId(), getId(),
+              Keys.TASK_NUM_REDUCERS, String.valueOf(0));
+        }
+      }
+      JobClient jc = new JobClient(job);
+
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
       rj = jc.submitJob(job);
 
-      // add to list of running jobs so in case of abnormal shutdown can kill it.
-      runningJobKillURIs.put(rj.getJobID(),  rj.getTrackingURL() + "&action=kill");
+      // add to list of running jobs so in case of abnormal shutdown can kill
+      // it.
+      runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL()
+          + "&action=kill");
 
       jobInfo(rj);
       rj = jobProgress(jc, rj);
 
       String statusMesg = "Ended Job = " + rj.getJobID();
-      if(!rj.isSuccessful()) {
+      if (!rj.isSuccessful()) {
         statusMesg += " with errors";
         returnVal = 2;
         console.printError(statusMesg);
@@ -343,45 +383,49 @@
       }
     } catch (Exception e) {
       String mesg = " with exception '" + e.getMessage() + "'";
-      if(rj != null) {
+      if (rj != null) {
         mesg = "Ended Job = " + rj.getJobID() + mesg;
       } else {
         mesg = "Job Submission failed" + mesg;
       }
-      // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
-      console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      // Has to use full name to make sure it does not conflict with
+      // org.apache.commons.lang.StringUtils
+      console.printError(mesg, "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
 
       returnVal = 1;
     } finally {
       Utilities.clearMapRedWork(job);
       try {
         fs.delete(new Path(jobScratchDir), true);
-        if(returnVal != 0 && rj != null) {
+        if (returnVal != 0 && rj != null) {
           rj.killJob();
         }
         runningJobKillURIs.remove(rj.getJobID());
-      } catch (Exception e) {}
+      } catch (Exception e) {
+      }
     }
     return (returnVal);
   }
-  
+
   private static void printUsage() {
-    System.out.println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "+
-                       "[-files <file1>[,<file2>] ...]");
+    System.out
+        .println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "
+            + "[-files <file1>[,<file2>] ...]");
     System.exit(1);
   }
 
   public static void main(String[] args) throws IOException, HiveException {
     String planFileName = null;
-    ArrayList<String> jobConfArgs = new ArrayList<String> ();
+    ArrayList<String> jobConfArgs = new ArrayList<String>();
     boolean isSilent = false;
     String files = null;
 
-    try{
-      for(int i=0; i<args.length; i++) {
-        if(args[i].equals("-plan")) {
+    try {
+      for (int i = 0; i < args.length; i++) {
+        if (args[i].equals("-plan")) {
           planFileName = args[++i];
-          System.out.println("plan = "+planFileName);
+          System.out.println("plan = " + planFileName);
         } else if (args[i].equals("-jobconf")) {
           jobConfArgs.add(args[++i]);
         } else if (args[i].equals("-silent")) {
@@ -395,33 +439,33 @@
       printUsage();
     }
 
-    if(planFileName == null) {
+    if (planFileName == null) {
       System.err.println("Must specify Plan File Name");
       printUsage();
     }
 
     JobConf conf = new JobConf(ExecDriver.class);
-    for(String one: jobConfArgs) {
+    for (String one : jobConfArgs) {
       int eqIndex = one.indexOf('=');
-      if(eqIndex != -1) {
+      if (eqIndex != -1) {
         try {
-          conf.set(one.substring(0, eqIndex),
-                   URLDecoder.decode(one.substring(eqIndex+1), "UTF-8"));
+          conf.set(one.substring(0, eqIndex), URLDecoder.decode(one
+              .substring(eqIndex + 1), "UTF-8"));
         } catch (UnsupportedEncodingException e) {
-          System.err.println("Unexpected error " + e.getMessage() + " while encoding " +
-                             one.substring(eqIndex+1));
+          System.err.println("Unexpected error " + e.getMessage()
+              + " while encoding " + one.substring(eqIndex + 1));
           System.exit(3);
         }
       }
     }
 
-    if(files != null) {
+    if (files != null) {
       conf.set("tmpfiles", files);
     }
 
     URI pathURI = (new Path(planFileName)).toUri();
     InputStream pathData;
-    if(StringUtils.isEmpty(pathURI.getScheme())) {
+    if (StringUtils.isEmpty(pathURI.getScheme())) {
       // default to local file system
       pathData = new FileInputStream(planFileName);
     } else {
@@ -429,17 +473,19 @@
       FileSystem fs = FileSystem.get(conf);
       pathData = fs.open(new Path(planFileName));
     }
-    
-    // workaround for hadoop-17 - libjars are not added to classpath. this affects local
+
+    // workaround for hadoop-17 - libjars are not added to classpath. this
+    // affects local
     // mode execution
-    boolean localMode = HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local");
-    if(localMode) {
+    boolean localMode = HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT)
+        .equals("local");
+    if (localMode) {
       String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
       if (StringUtils.isNotBlank(auxJars)) {
         try {
           addToClassPath(StringUtils.split(auxJars, ","));
         } catch (Exception e) {
-          throw new HiveException (e.getMessage(), e);
+          throw new HiveException(e.getMessage(), e);
         }
       }
     }
@@ -447,28 +493,30 @@
     mapredWork plan = Utilities.deserializeMapRedWork(pathData);
     ExecDriver ed = new ExecDriver(plan, conf, isSilent);
     int ret = ed.execute();
-    if(ret != 0) {
+    if (ret != 0) {
       System.out.println("Job Failed");
       System.exit(2);
     }
   }
 
   /**
-   * Given a Hive Configuration object - generate a command line
-   * fragment for passing such configuration information to ExecDriver
+   * Given a Hive Configuration object - generate a command line fragment for
+   * passing such configuration information to ExecDriver
    */
   public static String generateCmdLine(HiveConf hconf) {
     try {
-      StringBuilder sb = new StringBuilder ();
+      StringBuilder sb = new StringBuilder();
       Properties deltaP = hconf.getChangedProperties();
-      boolean localMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
+      boolean localMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals(
+          "local");
       String hadoopSysDir = "mapred.system.dir";
       String hadoopWorkDir = "mapred.local.dir";
 
-      for(Object one: deltaP.keySet()) {
-        String oneProp = (String)one;
-      
-        if(localMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir)))
+      for (Object one : deltaP.keySet()) {
+        String oneProp = (String) one;
+
+        if (localMode
+            && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir)))
           continue;
 
         String oneValue = deltaP.getProperty(oneProp);
@@ -480,19 +528,23 @@
         sb.append(" ");
       }
 
-      // Multiple concurrent local mode job submissions can cause collisions in working dirs
-      // Workaround is to rename map red working dir to a temp dir in such a case
-      if(localMode) {
+      // Multiple concurrent local mode job submissions can cause collisions in
+      // working dirs
+      // Workaround is to rename map red working dir to a temp dir in such a
+      // case
+      if (localMode) {
         sb.append("-jobconf ");
         sb.append(hadoopSysDir);
         sb.append("=");
-        sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt(), "UTF-8"));
+        sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/"
+            + Utilities.randGen.nextInt(), "UTF-8"));
 
         sb.append(" ");
         sb.append("-jobconf ");
         sb.append(hadoopWorkDir);
         sb.append("=");
-        sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(), "UTF-8"));
+        sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
+            + Utilities.randGen.nextInt(), "UTF-8"));
       }
 
       return sb.toString();
@@ -512,4 +564,3 @@
     return w.getReducer() != null;
   }
 }
-

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=738626&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Wed Jan 28 21:15:25 2009
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.history;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+
+
+public class HiveHistory {
+
+  PrintWriter histStream; // History File stream
+
+  String histFileName; // History file name
+
+  static final private Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory");
+
+  private LogHelper console;
+
+  // Job Hash Map
+  private HashMap<String, QueryInfo> queryInfoMap = new HashMap<String, QueryInfo>();
+
+  // Task Hash Map
+  private HashMap<String, TaskInfo> taskInfoMap = new HashMap<String, TaskInfo>();
+
+  private static final String DELIMITER = " ";
+
+  public static enum RecordTypes {
+    QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd
+  };
+
+  public static enum Keys {
+    SESSION_ID, QUERY_ID, TASK_ID, QUERY_RET_CODE, QUERY_NUM_TASKS, QUERY_STRING, TIME,
+    TASK_RET_CODE, TASK_NAME, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS, TASK_COUNTERS, TASK_NUM_REDUCERS
+  };
+
+  private static final String KEY = "(\\w+)";
+  private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
+
+  private static final Pattern pattern = Pattern.compile(KEY + "=" + "\""
+      + VALUE + "\"");
+
+  // temp buffer for parsed dataa
+  private static Map<String, String> parseBuffer = new HashMap<String, String>();
+
+  /**
+   * Listner interface Parser will call handle function for each record type
+   */
+  public static interface Listener {
+
+    public void handle(RecordTypes recType, Map<String, String> values)
+        throws IOException;
+  }
+
+  /**
+   * Parses history file and calls call back functions
+   * 
+   * @param path
+   * @param l
+   * @throws IOException
+   */
+  public static void parseHiveHistory(String path, Listener l)
+      throws IOException {
+    FileInputStream fi = new FileInputStream(path);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(fi));
+    try {
+      String line = null;
+      StringBuffer buf = new StringBuffer();
+      while ((line = reader.readLine()) != null) {
+        buf.append(line);
+        //if it does not end with " then it is line continuation
+        if (!line.trim().endsWith("\"")) {
+          continue;
+        }
+        parseLine(buf.toString(), l);
+        buf = new StringBuffer();
+      }
+    } finally {
+      try {
+        reader.close();
+      } catch (IOException ex) {
+      }
+    }
+  }
+
+  /**
+   * Parse a single line of history.
+   * 
+   * @param line
+   * @param l
+   * @throws IOException
+   */
+  private static void parseLine(String line, Listener l) throws IOException {
+    // extract the record type
+    int idx = line.indexOf(' ');
+    String recType = line.substring(0, idx);
+    String data = line.substring(idx + 1, line.length());
+
+    Matcher matcher = pattern.matcher(data);
+
+    while (matcher.find()) {
+      String tuple = matcher.group(0);
+      String[] parts = tuple.split("=");
+
+      parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1));
+    }
+
+    l.handle(RecordTypes.valueOf(recType), parseBuffer);
+
+    parseBuffer.clear();
+  }
+
+  public static class Info {
+
+  }
+
+  public static class SessionInfo extends Info {
+    public String sessionId;
+  };
+
+  public static class QueryInfo extends Info {
+    public Map<String, String> hm = new HashMap<String, String>();
+  };
+
+  public static class TaskInfo extends Info {
+    public Map<String, String> hm = new HashMap<String, String>();
+
+  };
+
+  /**
+   * Construct HiveHistory object an open history log file.
+   * 
+   * @param ss
+   */
+  public HiveHistory(SessionState ss) {
+
+    try {
+      console = new LogHelper(LOG);
+      String conf_file_loc = ss.getConf().getVar(
+          HiveConf.ConfVars.HIVEHISTORYFILELOC);
+      if ((conf_file_loc == null) || conf_file_loc.length() == 0)
+      {
+        console.printError("No history file location given");
+        return;
+      }
+      
+      //Create directory 
+      File f = new File(conf_file_loc);
+      if (!f.exists()){
+        if (!f.mkdir()){
+          console.printError("Unable to create log directory "+conf_file_loc );
+          return;
+        }
+      }
+      histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId()
+          + ".txt";
+      console.printInfo("Hive history file=" + histFileName);
+      histStream = new PrintWriter(histFileName);
+
+      HashMap<String, String> hm = new HashMap<String, String>();
+      hm.put(Keys.SESSION_ID.name(), ss.getSessionId());
+      log(RecordTypes.SessionStart, hm);
+    } catch (FileNotFoundException e) {
+      console.printError("FAILED: Failed to open Query Log : " +histFileName+ " "+  e.getMessage(), "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+
+  }
+
+  /**
+   * @return historyFileName
+   */
+  public String getHistFileName() {
+    return histFileName;
+  }
+
+  /**
+   * Write the a history record to history file
+   * 
+   * @param rt
+   * @param keyValMap
+   */
+  void log(RecordTypes rt, Map<String, String> keyValMap) {
+
+    if (histStream == null)
+      return;
+
+    StringBuffer sb = new StringBuffer();
+    sb.append(rt.name());
+
+    for (Map.Entry<String, String> ent : keyValMap.entrySet()) {
+      
+      sb.append(DELIMITER);
+      String key = ent.getKey();
+      String val = ent.getValue();
+      val = val.replace('\n', ' ');
+      sb.append(key + "=\"" + val + "\"");
+
+    }
+    sb.append(DELIMITER);
+    sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\"");
+    histStream.println(sb);
+    histStream.flush();
+
+  }
+
+  /**
+   * Called at the start of job Driver.run()
+   */
+  public void startQuery(String cmd, String id) {
+    SessionState ss = SessionState.get();
+    if (ss == null)
+      return;
+    QueryInfo ji = new QueryInfo();
+
+    ji.hm.put(Keys.QUERY_ID.name(), id);
+    ji.hm.put(Keys.QUERY_STRING.name(), cmd);
+    
+    queryInfoMap.put(id, ji);
+    
+    
+
+    log(RecordTypes.QueryStart, ji.hm);
+
+  }
+
+  /**
+   * Used to set job status and other attributes of a job
+   * 
+   * @param queryId
+   * @param propName
+   * @param propValue
+   */
+  public void setQueryProperty(String queryId, Keys propName, String propValue) {
+    QueryInfo ji = queryInfoMap.get(queryId);
+    if (ji == null)
+      return;
+    ji.hm.put(propName.name(), propValue);
+  }
+
+  /**
+   * Used to set task properties.
+   * 
+   * @param taskId
+   * @param propName
+   * @param propValue
+   */
+  public void setTaskProperty(String queryId, String taskId, Keys propName,
+      String propValue) {
+    String id = queryId + ":" + taskId;
+    TaskInfo ti = taskInfoMap.get(id);
+    if (ti == null)
+      return;
+    ti.hm.put(propName.name(), propValue);
+  }
+
+  /**
+   * Serialize the task counters and set as a task property.
+   * 
+   * @param taskId
+   * @param rj
+   */
+  public void setTaskCounters(String queryId, String taskId, RunningJob rj) {
+    String id = queryId + ":" + taskId;
+    TaskInfo ti = taskInfoMap.get(id);
+    if (ti == null)
+      return;
+    StringBuilder sb = new StringBuilder("");
+    try {
+
+      boolean first = true;
+      for (Group group : rj.getCounters()) {
+        for (Counter counter : group) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(',');
+          }
+          sb.append(group.getDisplayName());
+          sb.append('.');
+          sb.append(counter.getDisplayName());
+          sb.append(':');
+          sb.append(counter.getCounter());
+        }
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString());
+  }
+
+  /**
+   * Called at the end of Job. A Job is sql query.
+   * 
+   * @param queryId
+   */
+  public void endQuery(String queryId) {
+
+    QueryInfo ji = queryInfoMap.get(queryId);
+    if (ji == null)
+      return;
+    log(RecordTypes.QueryEnd, ji.hm);
+  }
+
+  /**
+   * Called at the start of a task. Called by Driver.run() A Job can have
+   * multiple tasks. Tasks will have multiple operator.
+   * 
+   * @param task
+   */
+  public void startTask(String queryId, Task<? extends Serializable> task,
+      String taskName) {
+    SessionState ss = SessionState.get();
+    if (ss == null)
+      return;
+    TaskInfo ti = new TaskInfo();
+
+    ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId());
+    ti.hm.put(Keys.TASK_ID.name(), task.getId());
+    ti.hm.put(Keys.TASK_NAME.name(), taskName);
+
+    String id = queryId + ":" + task.getId();
+    taskInfoMap.put(id, ti);
+
+    log(RecordTypes.TaskStart, ti.hm);
+
+  }
+
+  /**
+   * Called at the end of a task.
+   * 
+   * @param task
+   */
+  public void endTask(String queryId, Task<? extends Serializable> task) {
+    String id = queryId + ":" + task.getId();
+    TaskInfo ti = taskInfoMap.get(id);
+
+    if (ti == null)
+      return;
+    log(RecordTypes.TaskEnd, ti.hm);
+  }
+
+  /**
+   * Called at the end of a task.
+   * 
+   * @param task
+   */
+  public void progressTask(String queryId, Task<? extends Serializable> task) {
+    String id = queryId + ":" + task.getId();
+    TaskInfo ti = taskInfoMap.get(id);
+    if (ti == null)
+      return;
+    log(RecordTypes.TaskProgress, ti.hm);
+
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java?rev=738626&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java Wed Jan 28 21:15:25 2009
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.history;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+
+public class HiveHistoryViewer implements
+    org.apache.hadoop.hive.ql.history.HiveHistory.Listener {
+
+  String historyFile;
+
+  String sessionId;
+
+  // Job Hash Map
+  private HashMap<String, QueryInfo> jobInfoMap = new HashMap<String, QueryInfo>();
+
+  // Task Hash Map
+  private HashMap<String, TaskInfo> taskInfoMap = new HashMap<String, TaskInfo>();
+
+  public HiveHistoryViewer(String path) {
+    historyFile = path;
+    init();
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public Map<String, QueryInfo> getJobInfoMap() {
+    return jobInfoMap;
+  }
+
+  public Map<String, TaskInfo> getTaskInfoMap() {
+    return taskInfoMap;
+  }
+
+  /**
+   * parse history files
+   */
+  void init() {
+
+    try {
+      HiveHistory.parseHiveHistory(historyFile, this);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+  }
+
+  /**
+   * Implementation Listner interface function
+   * 
+   * @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes,
+   *      java.util.Map)
+   */
+  public void handle(RecordTypes recType, Map<String, String> values) {
+
+    if (recType == RecordTypes.SessionStart) {
+      sessionId = values.get(Keys.SESSION_ID.name());
+    } else if (recType == RecordTypes.QueryStart || recType == RecordTypes.QueryEnd) {
+      String key = values.get(Keys.QUERY_ID.name());
+      QueryInfo ji;
+      if (jobInfoMap.containsKey(key)) {
+        ji = jobInfoMap.get(key);
+
+        ji.hm.putAll(values);
+
+      } else {
+        ji = new QueryInfo();
+        ji.hm = new HashMap<String, String>();
+        ji.hm.putAll(values);
+
+        jobInfoMap.put(key, ji);
+
+      }
+    } else if (recType == RecordTypes.TaskStart
+        || recType == RecordTypes.TaskEnd
+        || recType == RecordTypes.TaskProgress) {
+
+      String jobid = values.get(Keys.QUERY_ID.name());
+      String taskid = values.get(Keys.TASK_ID.name());
+      String key = jobid + ":" + taskid;
+      TaskInfo ti;
+      if (taskInfoMap.containsKey(key)) {
+        ti = taskInfoMap.get(key);
+        ti.hm.putAll(values);
+      } else {
+        ti = new TaskInfo();
+        ti.hm = new HashMap<String, String>();
+        ti.hm.putAll(values);
+        taskInfoMap.put(key, ti);
+
+      }
+
+    }
+
+  }
+
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Wed Jan 28 21:15:25 2009
@@ -31,6 +31,7 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -58,6 +59,10 @@
    */
   protected Hive db;
 
+  /*
+   *  HiveHistory Object 
+   */
+  protected HiveHistory hiveHist;
   /**
    * Streams to read/write from
    */
@@ -121,10 +126,15 @@
   }
 
   public void setCmd(String cmdString) {
-    conf.setVar(HiveConf.ConfVars.HIVEQUERYID, cmdString);
+    conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, cmdString);
   }
 
   public String getCmd() {
+    return (conf.getVar(HiveConf.ConfVars.HIVEQUERYSTRING));
+  }
+  
+  
+  public String getQueryId() {
     return (conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
   }
 
@@ -144,6 +154,7 @@
   public static SessionState start(HiveConf conf) {
     SessionState ss = new SessionState (conf);
     ss.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId());
+    ss.hiveHist = new HiveHistory(ss);
     tss.set(ss);
     return (ss);
   }
@@ -154,10 +165,15 @@
    * session object when switching from one session to another
    */
   public static SessionState start(SessionState startSs) {
+   
     tss.set(startSs);
     if(StringUtils.isEmpty(startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID))) {
       startSs.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId());
     }
+    
+    if (startSs.hiveHist == null){
+      startSs.hiveHist = new HiveHistory(startSs);
+    }
     return startSs;
   }
 
@@ -168,7 +184,16 @@
     return tss.get();
   }
 
-
+ 
+  /**
+   * get hiveHitsory object which does structured logging
+   * @return 
+   */
+  public HiveHistory getHiveHistory(){
+    return hiveHist;
+  }
+  
+  
   private static String makeSessionId() {
     GregorianCalendar gc = new GregorianCalendar();
     String userid = System.getProperty("user.name");

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java Wed Jan 28 21:15:25 2009
@@ -159,6 +159,8 @@
     this.logDir = logDir;
     conf = new HiveConf(Driver.class);
 
+    CliSessionState ss = new CliSessionState(conf);
+    SessionState.start(ss);
     // System.out.println(conf.toString());
     testFiles = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
 
@@ -371,8 +373,8 @@
       createSources();
     }
 
-    CliSessionState ss = new CliSessionState(conf);
-
+    //CliSessionState ss = new CliSessionState(conf);
+    SessionState ss = SessionState.get();
     ss.in = System.in;
 
     File qf = new File(outDir, tname);

Added: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java?rev=738626&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (added)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java Wed Jan 28 21:15:25 2009
@@ -0,0 +1,164 @@
+package org.apache.hadoop.hive.ql.history;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.tools.LineageInfo;
+import org.apache.hadoop.hive.service.HiveInterface;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import junit.framework.TestCase;
+
+public class TestHiveHistory extends TestCase {
+
+  static HiveConf conf;
+
+  static private String tmpdir = "/tmp/" + System.getProperty("user.name")
+      + "/";
+  static private Path tmppath = new Path(tmpdir);
+  static private Hive db;
+  static private FileSystem fs;
+
+  /*
+   * intialize the tables
+   */
+
+  protected void setUp(){
+    try {
+      conf = new HiveConf(HiveHistory.class);
+
+      fs = FileSystem.get(conf);
+      if (fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) {
+        throw new RuntimeException(tmpdir + " exists but is not a directory");
+      }
+
+      if (!fs.exists(tmppath)) {
+        if (!fs.mkdirs(tmppath)) {
+          throw new RuntimeException("Could not make scratch directory "
+              + tmpdir);
+        }
+      }
+
+      // copy the test files into hadoop if required.
+      int i = 0;
+      Path[] hadoopDataFile = new Path[2];
+      String[] testFiles = { "kv1.txt", "kv2.txt" };
+      String testFileDir = "file://"
+          + conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+      for (String oneFile : testFiles) {
+        Path localDataFile = new Path(testFileDir, oneFile);
+        hadoopDataFile[i] = new Path(tmppath, oneFile);
+        fs.copyFromLocalFile(false, true, localDataFile, hadoopDataFile[i]);
+        i++;
+      }
+
+      // load the test files into tables
+      i = 0;
+      db = Hive.get(conf);
+      String[] srctables = { "src", "src2" };
+      LinkedList<String> cols = new LinkedList<String>();
+      cols.add("key");
+      cols.add("value");
+      for (String src : srctables) {
+        db.dropTable(src, true, true);
+        db.createTable(src, cols, null, TextInputFormat.class,
+            IgnoreKeyTextOutputFormat.class);
+        db.loadTable(hadoopDataFile[i], src, false);
+        i++;
+      }
+
+    } catch (Throwable e) {
+      e.printStackTrace();
+      throw new RuntimeException("Encountered throwable");
+    }
+}
+
+  /**
+   * check history file output for this query.als
+   */
+  public void testSimpleQuery() {
+    LineageInfo lep = new LineageInfo();
+    try {
+
+      // NOTE: It is critical to do this here so that log4j is reinitialized
+      // before
+      // any of the other core hive classes are loaded
+      SessionState.initHiveLog4j();
+
+      CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
+      ss.in = System.in;
+      try {
+        ss.out = new PrintStream(System.out, true, "UTF-8");
+        ss.err = new PrintStream(System.err, true, "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        System.exit(3);
+      }
+
+      SessionState.start(ss);
+
+      String cmd = "select a.key from src a";
+      Driver d = new Driver();
+      int ret = d.run(cmd);
+      if (ret != 0) {
+        fail("Failed");
+      }
+      HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+          .getHiveHistory().getHistFileName());
+      Map<String, QueryInfo> jobInfoMap = hv.getJobInfoMap();
+      Map<String, TaskInfo> taskInfoMap = hv.getTaskInfoMap();
+      if (jobInfoMap.size() != 1) {
+        fail("jobInfo Map size not 1");
+      }
+
+      if (taskInfoMap.size() != 1) {
+        fail("jobInfo Map size not 1");
+      }
+
+
+      cmd = (String)jobInfoMap.keySet().toArray()[0];
+      QueryInfo ji = jobInfoMap.get(cmd);
+
+      if (!ji.hm.get(Keys.QUERY_NUM_TASKS.name()).equals("1")) {
+        fail("Wrong number of tasks");
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Failed");
+    }
+  }
+
+}

Modified: hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm (original)
+++ hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm Wed Jan 28 21:15:25 2009
@@ -9,6 +9,13 @@
 
 import org.apache.hadoop.hive.ql.QTestUtil;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.history.HiveHistoryViewer;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
 
 import org.antlr.runtime.*;
 import org.antlr.runtime.tree.*;
@@ -62,6 +69,19 @@
       if (ecode != 0) {
         fail("Client Execution failed with error code = " + ecode);
       }
+      if (SessionState.get() != null) {
+        HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+          .getHiveHistory().getHistFileName());
+        Map<String, QueryInfo> jobInfoMap = hv.getJobInfoMap();
+        Map<String, TaskInfo> taskInfoMap = hv.getTaskInfoMap();
+
+        String cmd = (String)jobInfoMap.keySet().toArray()[0];
+        QueryInfo ji = jobInfoMap.get(cmd);
+
+        if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) {
+            fail("Wrong return code in hive history");
+        }
+      }
 
       ecode = qt.checkCliDriverResults("$fname");
       if (ecode != 0) {