You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/19 23:03:05 UTC

svn commit: r805973 [4/19] - in /hadoop/hive/trunk: ./ data/files/ eclipse-templates/ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ metastore/src/gen-py/hive_metastore/ ql/ ql/if/ ql/src/gen-javabean/ ql/src/gen-javabean/org/ ql/src/...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Wed Aug 19 21:02:57 2009
@@ -18,20 +18,63 @@
 
 package org.apache.hadoop.hive.ql;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.LinkedList;
+import java.util.List;
 
+import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
+import org.apache.hadoop.hive.ql.plan.api.NodeType;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.plan.api.TaskType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TTransportException;
+
+
+public class QueryPlan implements Serializable {
+  private static final long serialVersionUID = 1L;
+  
+  static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName());
 
-public class QueryPlan {
   private String queryString;
   private BaseSemanticAnalyzer plan;
   private String queryId;
+  private org.apache.hadoop.hive.ql.plan.api.Query query;
+  private Map<String, Map<String, Long>> counters;
+  private Set<String> done;
+  private Set<String> started;
+
+  private boolean add;
+
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer plan) {
     this.queryString = queryString;
     this.plan = plan;
     this.queryId = makeQueryId();
+    query = new org.apache.hadoop.hive.ql.plan.api.Query();
+    query.setQueryId(this.queryId);
+    query.putToQueryAttributes("queryString", this.queryString);
+    counters = new HashMap<String, Map<String, Long>>();
+    done = new HashSet<String>();
+    started = new HashSet<String>();
   }
 
   public String getQueryStr() {
@@ -57,4 +100,574 @@
                     gc.get(Calendar.HOUR_OF_DAY),
                     gc.get(Calendar.MINUTE), gc.get(Calendar.SECOND));
   }
+
+  /**
+   * TODO each Task should define a getType() method which will return the values
+   * instead of this method
+   * @param task
+   * @return
+   */
+  private int getStageType(Task<? extends Serializable> task) {
+    if (task instanceof ConditionalTask) {
+      return StageType.CONDITIONAL;
+    }
+    if (task instanceof CopyTask) {
+      return StageType.COPY;
+    }
+    if (task instanceof DDLTask) {
+      return StageType.DDL;
+    }
+    if (task instanceof ExecDriver) {
+      return StageType.MAPRED;
+    }
+    if (task instanceof ExplainTask) {
+      return StageType.EXPLAIN;
+    }
+    if (task instanceof FetchTask) {
+      return StageType.FETCH;
+    }
+    if (task instanceof FunctionTask) {
+      return StageType.FUNC;
+    }
+    if (task instanceof MapRedTask) {
+      return StageType.MAPREDLOCAL;
+    }
+    if (task instanceof MoveTask) {
+      return StageType.MOVE;
+    }
+    assert false;
+    return -1;
+  }
+
+  /**
+   * TODO remove this method. add a getType() method for each operator
+   * @param op
+   * @return
+   */
+  private int getOperatorType(Operator<? extends Serializable> op) {
+    if (op instanceof JoinOperator) {
+      return OperatorType.JOIN;
+    }
+    if (op instanceof MapJoinOperator) {
+      return OperatorType.MAPJOIN;
+    }
+    if (op instanceof ExtractOperator) {
+      return OperatorType.EXTRACT;
+    }
+    if (op instanceof FilterOperator) {
+      return OperatorType.FILTER;
+    }
+    if (op instanceof ForwardOperator) {
+      return OperatorType.FORWARD;
+    }
+    if (op instanceof GroupByOperator) {
+      return OperatorType.GROUPBY;
+    }
+    if (op instanceof LimitOperator) {
+      return OperatorType.LIMIT;
+    }
+    if (op instanceof ScriptOperator) {
+      return OperatorType.SCRIPT;
+    }
+    if (op instanceof SelectOperator) {
+      return OperatorType.SELECT;
+    }
+    if (op instanceof TableScanOperator) {
+      return OperatorType.TABLESCAN;
+    }
+    if (op instanceof FileSinkOperator) {
+      return OperatorType.FILESINK;
+    }
+    if (op instanceof ReduceSinkOperator) {
+      return OperatorType.REDUCESINK;
+    }
+    if (op instanceof UnionOperator) {
+      return OperatorType.UNION;
+    }
+    assert false;
+    return -1;
+  }
+
+  /**
+   * generate the operator graph and operator list for the given task based on
+   * the operators corresponding to that task
+   * @param task   api.Task which needs its operator graph populated
+   * @param topOps the set of top operators from which the operator graph for the task
+   *               is hanging
+   */
+  private void populateOperatorGraph(org.apache.hadoop.hive.ql.plan.api.Task task,
+      Collection<Operator<? extends Serializable>> topOps) {
+    
+    task.setOperatorGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
+    task.getOperatorGraph().setNodeType(NodeType.OPERATOR);
+    
+    Queue<Operator<? extends Serializable>> opsToVisit = new LinkedList<Operator<? extends Serializable>>();
+    Set<Operator<? extends Serializable>> opsVisited = new HashSet<Operator<? extends Serializable>>();
+    opsToVisit.addAll(topOps);
+    while (opsToVisit.peek() != null) {
+      Operator<? extends Serializable> op = opsToVisit.remove();
+      opsVisited.add(op);
+      // populate the operator
+      org.apache.hadoop.hive.ql.plan.api.Operator operator = new org.apache.hadoop.hive.ql.plan.api.Operator();
+      operator.setOperatorId(op.getOperatorId());
+      operator.setOperatorType(getOperatorType(op));
+      task.addToOperatorList(operator);
+      // done processing the operator
+      if (op.getChildOperators() != null) {
+        org.apache.hadoop.hive.ql.plan.api.Adjacency entry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+        entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
+        entry.setNode(op.getOperatorId());
+        for (Operator<? extends Serializable> childOp: op.getChildOperators()) {
+          entry.addToChildren(childOp.getOperatorId());
+          if (!opsVisited.contains(childOp)) {
+            opsToVisit.add(childOp);
+          }
+        }
+        task.getOperatorGraph().addToAdjacencyList(entry);
+      }
+    }
+  }
+  
+  /**
+   * Populate api.QueryPlan from exec structures. This includes constructing the
+   * dependency graphs of stages and operators.
+   * 
+   * @throws IOException
+   */
+  private void populateQueryPlan() throws IOException {
+    query.setStageGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
+    query.getStageGraph().setNodeType(NodeType.STAGE);
+
+    Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
+    Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
+    tasksToVisit.addAll(plan.getRootTasks());
+    while (tasksToVisit.size() != 0) {
+      Task<? extends Serializable> task = tasksToVisit.remove();
+      tasksVisited.add(task);
+      // populate stage
+      org.apache.hadoop.hive.ql.plan.api.Stage stage = new org.apache.hadoop.hive.ql.plan.api.Stage();
+      stage.setStageId(task.getId());
+      stage.setStageType(getStageType(task));
+      query.addToStageList(stage);
+      
+      if (task instanceof ExecDriver) {
+        // populate map task
+        ExecDriver mrTask = (ExecDriver)task;
+        org.apache.hadoop.hive.ql.plan.api.Task mapTask = new org.apache.hadoop.hive.ql.plan.api.Task();
+        mapTask.setTaskId(stage.getStageId() + "_MAP");
+        mapTask.setTaskType(TaskType.MAP);
+        stage.addToTaskList(mapTask);
+        populateOperatorGraph(mapTask, mrTask.getWork().getAliasToWork().values());
+        
+        // populate reduce task
+        if (mrTask.hasReduce()) {
+          org.apache.hadoop.hive.ql.plan.api.Task reduceTask = new org.apache.hadoop.hive.ql.plan.api.Task();
+          reduceTask.setTaskId(stage.getStageId() + "_REDUCE");
+          reduceTask.setTaskType(TaskType.REDUCE);
+          stage.addToTaskList(reduceTask);
+          Collection<Operator<? extends Serializable>> reducerTopOps = new ArrayList<Operator<? extends Serializable>>();
+          reducerTopOps.add(mrTask.getWork().getReducer());
+          populateOperatorGraph(reduceTask, reducerTopOps);
+        }
+      }
+      else {
+        org.apache.hadoop.hive.ql.plan.api.Task otherTask = new org.apache.hadoop.hive.ql.plan.api.Task();
+        otherTask.setTaskId(stage.getStageId() + "_OTHER");
+        otherTask.setTaskType(TaskType.OTHER);
+        stage.addToTaskList(otherTask);
+      }
+      if (task instanceof ConditionalTask) {
+        org.apache.hadoop.hive.ql.plan.api.Adjacency listEntry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+        listEntry.setAdjacencyType(AdjacencyType.DISJUNCTIVE);
+        listEntry.setNode(task.getId());
+        ConditionalTask t = (ConditionalTask)task;
+        
+        for (Task<? extends Serializable> listTask: t.getListTasks()) {
+          if (t.getChildTasks() != null) {
+            org.apache.hadoop.hive.ql.plan.api.Adjacency childEntry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+            childEntry.setAdjacencyType(AdjacencyType.DISJUNCTIVE);
+            childEntry.setNode(listTask.getId());
+            // done processing the task
+            for (Task<? extends Serializable> childTask: t.getChildTasks()) {
+              childEntry.addToChildren(childTask.getId());
+              if (!tasksVisited.contains(childTask)) {
+                tasksToVisit.add(childTask);
+              }
+            }
+            query.getStageGraph().addToAdjacencyList(childEntry);
+          }
+        
+          listEntry.addToChildren(listTask.getId());
+          if (!tasksVisited.contains(listTask)) {
+            tasksToVisit.add(listTask);
+          }
+        }
+        query.getStageGraph().addToAdjacencyList(listEntry);
+      }
+      else if (task.getChildTasks() != null) {
+        org.apache.hadoop.hive.ql.plan.api.Adjacency entry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+        entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
+        entry.setNode(task.getId());
+        // done processing the task
+        for (Task<? extends Serializable> childTask: task.getChildTasks()) {
+          entry.addToChildren(childTask.getId());
+          if (!tasksVisited.contains(childTask)) {
+            tasksToVisit.add(childTask);
+          }
+        }
+        query.getStageGraph().addToAdjacencyList(entry);
+      }
+    }
+  }
+
+  /**
+   * From the counters extracted via extractCounters(), update the counters
+   * in the query plan
+   */
+  private void updateCountersInQueryPlan() {
+    query.setStarted(started.contains(query.getQueryId()));
+    query.setDone(done.contains(query.getQueryId()));
+    if (query.getStageList() != null)
+    for (org.apache.hadoop.hive.ql.plan.api.Stage stage: query.getStageList()) {
+      stage.setStarted(started.contains(stage.getStageId()));
+      stage.setStageCounters(counters.get(stage.getStageId()));
+      stage.setDone(done.contains(stage.getStageId()));
+      for (org.apache.hadoop.hive.ql.plan.api.Task task: stage.getTaskList()) {
+        task.setTaskCounters(counters.get(task.getTaskId()));
+        if (task.getTaskType() == TaskType.OTHER) {
+          task.setStarted(started.contains(stage.getStageId()));
+          task.setDone(done.contains(stage.getStageId()));
+        } else {
+          task.setStarted(started.contains(task.getTaskId()));
+          task.setDone(done.contains(task.getTaskId()));
+          for (org.apache.hadoop.hive.ql.plan.api.Operator op: task.getOperatorList()) {
+            // if the task has started, all operators within the task have started
+            op.setStarted(started.contains(task.getTaskId()));
+            op.setOperatorCounters(counters.get(op.getOperatorId()));
+            // if the task is done, all operators are done as well
+            op.setDone(done.contains(task.getTaskId()));
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * extract all the counters from tasks and operators
+   */
+  private void extractCounters() throws IOException {
+    Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
+    Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
+    tasksToVisit.addAll(plan.getRootTasks());
+    while (tasksToVisit.peek() != null) {
+      Task<? extends Serializable> task = tasksToVisit.remove();
+      tasksVisited.add(task);
+      // add children to tasksToVisit
+      if (task.getChildTasks() != null) {
+        for (Task<? extends Serializable> childTask: task.getChildTasks()) {
+          if (!tasksVisited.contains(childTask)) {
+            tasksToVisit.add(childTask);
+          }
+        }
+      }
+
+      if (started.contains(task.getId()) && done.contains(task.getId())) {
+        continue;
+      }
+
+      // get the counters for the task
+      counters.put(task.getId(), task.getCounters());
+
+      // check if task is started
+      if (task.started()) {
+        started.add(task.getId());
+      }
+      if (task.done()) {
+        done.add(task.getId());
+      }
+      if (task instanceof ExecDriver) {
+        ExecDriver mrTask = (ExecDriver)task;
+        extractOperatorCounters(mrTask.getWork().getAliasToWork().values(), task.getId() + "_MAP");
+        if (mrTask.mapStarted()) {
+          started.add(task.getId() + "_MAP");
+        }
+        if (mrTask.mapDone()) {
+          done.add(task.getId() + "_MAP");
+        }
+        if (mrTask.hasReduce()) {
+          Collection<Operator<? extends Serializable>> reducerTopOps = new ArrayList<Operator<? extends Serializable>>();
+          reducerTopOps.add(mrTask.getWork().getReducer());
+          extractOperatorCounters(reducerTopOps, task.getId() + "_REDUCE");
+          if (mrTask.reduceStarted()) {
+            started.add(task.getId() + "_REDUCE");
+          }
+          if (mrTask.reduceDone()) {
+            done.add(task.getId() + "_REDUCE");
+          }
+        }
+      }
+      else if (task instanceof ConditionalTask) {
+        ConditionalTask cTask = (ConditionalTask)task;
+        for (Task<? extends Serializable> listTask: cTask.getListTasks()) {
+          if (!tasksVisited.contains(listTask)) {
+            tasksToVisit.add(listTask);
+          }
+        }
+      }
+    }
+  }
+
+  private void extractOperatorCounters(Collection<Operator<? extends Serializable>> topOps, String taskId) {
+    Queue<Operator<? extends Serializable>> opsToVisit = new LinkedList<Operator<? extends Serializable>>();
+    Set<Operator<? extends Serializable>> opsVisited = new HashSet<Operator<? extends Serializable>>();
+    opsToVisit.addAll(topOps);
+    while (opsToVisit.size() != 0) {
+      Operator<? extends Serializable> op = opsToVisit.remove();
+      opsVisited.add(op);
+      counters.put(op.getOperatorId(), op.getCounters());
+      if (op.getDone()) {
+        done.add(op.getOperatorId());
+      }
+      if (op.getChildOperators() != null) {
+        for (Operator<? extends Serializable> childOp: op.getChildOperators()) {
+          if (!opsVisited.contains(childOp)) {
+            opsToVisit.add(childOp);
+          }
+        }
+      }
+    }
+
+  }
+
+  public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException {
+    if (query.getStageGraph() == null) {
+      populateQueryPlan();
+    }
+    extractCounters();
+    updateCountersInQueryPlan();
+    return query;
+  }
+
+  public String getJSONValue(Object value) {
+    String v = "null";
+    if (value != null) {
+      v = value.toString();
+      if (v.charAt(0) != '[' && v.charAt(0) != '{') {
+        v = "\"" + v + "\"";
+      }
+    }
+    return v;
+  }
+
+  public String getJSONKeyValue(Object key, Object value) {
+   return "\"" + key + "\":" + getJSONValue(value) + ",";
+  }
+
+  @SuppressWarnings("unchecked")
+  private String getJSONList(List list) {
+    if (list == null) {
+      return "null";
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    for (Object entry: list) {
+      sb.append(getJSONValue(entry));
+      sb.append(",");
+    }
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @SuppressWarnings("unchecked")
+  public String getJSONMap(Map map) {
+    if (map == null) {
+      return "null";
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    for (Object entry: map.entrySet()) {
+      Map.Entry e = (Map.Entry)entry;
+      sb.append(getJSONKeyValue(e.getKey(), e.getValue()));
+    }
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  private Object getJSONGraph(org.apache.hadoop.hive.ql.plan.api.Graph graph) {
+    if (graph == null) {
+      return "null";
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(getJSONKeyValue("nodeType", graph.getNodeType()));
+    sb.append(getJSONKeyValue("roots", getJSONList(graph.getRoots())));
+    // adjacency list
+    List<String> adjList = new ArrayList<String>();
+    if (graph.getAdjacencyList() != null) {
+      for (org.apache.hadoop.hive.ql.plan.api.Adjacency adj: graph.getAdjacencyList()) {
+        adjList.add(getJSONAdjacency(adj));
+      }
+    }
+    sb.append(getJSONKeyValue("adjacencyList", getJSONList(adjList)));
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  private String getJSONAdjacency(org.apache.hadoop.hive.ql.plan.api.Adjacency adj) {
+    if (adj == null) {
+      return "null";
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(getJSONKeyValue("node", adj.getNode()));
+    sb.append(getJSONKeyValue("children", getJSONList(adj.getChildren())));
+    sb.append(getJSONKeyValue("adjacencyType", adj.getAdjacencyType()));
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  private String getJSONOperator(org.apache.hadoop.hive.ql.plan.api.Operator op) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(getJSONKeyValue("operatorId", op.getOperatorId()));
+    sb.append(getJSONKeyValue("operatorType", op.getOperatorType()));
+    sb.append(getJSONKeyValue("operatorAttributes", getJSONMap(op.getOperatorAttributes())));
+    sb.append(getJSONKeyValue("operatorCounters", getJSONMap(op.getOperatorCounters())));
+    sb.append(getJSONKeyValue("done", op.isDone()));
+    sb.append(getJSONKeyValue("started", op.isStarted()));
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  private String getJSONTask(org.apache.hadoop.hive.ql.plan.api.Task task) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(getJSONKeyValue("taskId", task.getTaskId()));
+    sb.append(getJSONKeyValue("taskType", task.getTaskType()));
+    sb.append(getJSONKeyValue("taskAttributes", getJSONMap(task.getTaskAttributes())));
+    sb.append(getJSONKeyValue("taskCounters", getJSONMap(task.getTaskCounters())));
+    sb.append(getJSONKeyValue("operatorGraph", getJSONGraph(task.getOperatorGraph())));
+    // operator list
+    List<String> opList = new ArrayList<String>();
+    if (task.getOperatorList() != null) {
+      for (org.apache.hadoop.hive.ql.plan.api.Operator op: task.getOperatorList()) {
+        opList.add(getJSONOperator(op));
+      }
+    }
+    sb.append(getJSONKeyValue("operatorList", getJSONList(opList)));
+    sb.append(getJSONKeyValue("done", task.isDone()));
+    sb.append(getJSONKeyValue("started", task.isStarted()));
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  private String getJSONStage(org.apache.hadoop.hive.ql.plan.api.Stage stage) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(getJSONKeyValue("stageId", stage.getStageId()));
+    sb.append(getJSONKeyValue("stageType", stage.getStageType()));
+    sb.append(getJSONKeyValue("stageAttributes", getJSONMap(stage.getStageAttributes())));
+    sb.append(getJSONKeyValue("stageCounters", getJSONMap(stage.getStageCounters())));
+    List<String> taskList = new ArrayList<String>();
+    if (stage.getTaskList() != null) {
+      for (org.apache.hadoop.hive.ql.plan.api.Task task: stage.getTaskList()) {
+        taskList.add(getJSONTask(task));
+      }
+    }
+    sb.append(getJSONKeyValue("taskList", getJSONList(taskList)));
+    sb.append(getJSONKeyValue("done", stage.isDone()));
+    sb.append(getJSONKeyValue("started", stage.isStarted()));
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  public String getJSONQuery(org.apache.hadoop.hive.ql.plan.api.Query query) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(getJSONKeyValue("queryId", query.getQueryId()));
+    sb.append(getJSONKeyValue("queryType", query.getQueryType()));
+    sb.append(getJSONKeyValue("queryAttributes", getJSONMap(query.getQueryAttributes())));
+    sb.append(getJSONKeyValue("queryCounters", getJSONMap(query.getQueryCounters())));
+    sb.append(getJSONKeyValue("stageGraph", getJSONGraph(query.getStageGraph())));
+    // stageList
+    List<String> stageList = new ArrayList<String>();
+    if (query.getStageList() != null) {
+      for (org.apache.hadoop.hive.ql.plan.api.Stage stage: query.getStageList()) {
+        stageList.add(getJSONStage(stage));
+      }
+    }
+    sb.append(getJSONKeyValue("stageList", getJSONList(stageList)));
+    sb.append(getJSONKeyValue("done", query.isDone()));
+    sb.append(getJSONKeyValue("started", query.isStarted()));
+    sb.deleteCharAt(sb.length()-1);
+    sb.append("}");
+    return sb.toString();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return getJSONQuery(getQueryPlan());
+    }
+    catch (Exception e) {
+      e.printStackTrace();
+      return e.toString();
+    }
+  } 
+
+  public String toThriftJSONString() throws IOException {
+    org.apache.hadoop.hive.ql.plan.api.Query q = getQueryPlan();
+    TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length()*5);
+    TJSONProtocol oprot = new TJSONProtocol(tmb);
+    try {
+      q.write(oprot);
+    } catch (TException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      return q.toString();
+    }
+    return tmb.toString("UTF-8");
+  }
+
+  public String toBinaryString() throws IOException {
+    org.apache.hadoop.hive.ql.plan.api.Query q = getQueryPlan();
+    TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length()*5);
+    TBinaryProtocol oprot = new TBinaryProtocol(tmb);
+    try {
+      q.write(oprot);
+    } catch (TException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      return q.toString();
+    }
+    byte[] buf = new byte[tmb.length()];
+    tmb.read(buf, 0, tmb.length());
+    return new String(buf);
+    //return getQueryPlan().toString();
+    
+  }
+
+  public void setStarted() {
+    started.add(queryId);
+  }
+
+  public void setDone() {
+    done.add(queryId);
+  }
+
+  public Set<String> getStarted() {
+    return started;
+  }
+
+  public Set<String> getDone() {
+    return done;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java Wed Aug 19 21:02:57 2009
@@ -45,7 +45,7 @@
   }
 
   boolean firstRow = true;
-  public void process(Object row, int tag)
+  public void processOp(Object row, int tag)
       throws HiveException {
     ObjectInspector rowInspector = inputObjInspectors[tag];
     if (firstRow) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Wed Aug 19 21:02:57 2009
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 
@@ -38,6 +39,10 @@
   private ConditionalResolver resolver;
   private Object              resolverCtx;
   
+  public ConditionalTask() {
+    super();
+  }
+  
   public boolean isMapRedTask() {
     for (Task<? extends Serializable> task : listTasks)
       if (task.isMapRedTask())
@@ -54,14 +59,15 @@
     return false;
   }
   
-  public void initialize (HiveConf conf) {
+  public void initialize (HiveConf conf, QueryPlan queryPlan) {
+    super.initialize(conf, queryPlan);
     resTask = listTasks.get(resolver.getTaskId(conf, resolverCtx));
-    resTask.initialize(conf);
+    resTask.initialize(conf, queryPlan);
   }
   
   @Override
   public int execute() {
-    return resTask.execute();
+    return resTask.executeTask();
   }
 
   /**

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Wed Aug 19 21:02:57 2009
@@ -35,6 +35,10 @@
 
   private static final long serialVersionUID = 1L;
 
+  public CopyTask() {
+    super();
+  }
+  
   public int execute() {
     FileSystem dstFs = null;
     Path toPath = null;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Aug 19 21:02:57 2009
@@ -75,6 +75,7 @@
 import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.QueryPlan;
 
 /**
  * DDLTask implementation
@@ -88,11 +89,15 @@
   static final private int separator  = Utilities.tabCode;
   static final private int terminator = Utilities.newLineCode;
   
-  public void initialize(HiveConf conf) {
-    super.initialize(conf);
+  public DDLTask() {
+    super();
+  }
+  
+  public void initialize(HiveConf conf, QueryPlan queryPlan) {
+    super.initialize(conf, queryPlan);
     this.conf = conf;
   }
-
+  
   public int execute() {
 
     // Create the db

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Aug 19 21:02:57 2009
@@ -24,8 +24,6 @@
 import java.net.URI;
 import java.net.URLEncoder;
 import java.net.URLDecoder;
-import java.net.URL;
-import java.net.URLClassLoader;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.lang.StringUtils;
@@ -43,24 +41,24 @@
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.io.*;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.varia.NullAppender;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 
 public class ExecDriver extends Task<mapredWork> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
   transient protected JobConf job;
+  transient protected int mapProgress = 0;
+  transient protected int reduceProgress = 0;
 
   /**
    * Constructor when invoked from QL
@@ -99,8 +97,8 @@
   /**
    * Initialization when invoked from QL
    */
-  public void initialize(HiveConf conf) {
-    super.initialize(conf);
+  public void initialize(HiveConf conf, QueryPlan queryPlan) {
+    super.initialize(conf, queryPlan);
     job = new JobConf(conf, ExecDriver.class);
     // NOTE: initialize is only called if it is in non-local mode.
     // In case it's in non-local mode, we need to move the SessionState files
@@ -191,11 +189,37 @@
           + " job  -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
     }
   }
-
+  
   /**
-   * from StreamJob.java
+   * This class contains the state of the running task
+   * Going forward, we will return this handle from execute
+   * and Driver can split execute into start, monitorProgess and postProcess
    */
-  public RunningJob jobProgress(JobClient jc, RunningJob rj) throws IOException {
+  public static class ExecDriverTaskHandle extends TaskHandle {
+    JobClient jc;
+    RunningJob rj;
+    JobClient getJobClient() {
+      return jc;
+    }
+    RunningJob getRunningJob() {
+      return rj;
+    }
+    public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+      this.jc = jc;
+      this.rj = rj;
+    }
+    public void setRunningJob(RunningJob job) {
+      this.rj = job;
+    }
+    public Counters getCounters() throws IOException {
+      return rj.getCounters();
+    }
+  }
+
+  public void progress(TaskHandle taskHandle) throws IOException {
+    ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle; 
+    JobClient jc = th.getJobClient();
+    RunningJob rj = th.getRunningJob();
     String lastReport = "";
     SimpleDateFormat dateFormat
         = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
@@ -206,13 +230,16 @@
         Thread.sleep(1000);
       } catch (InterruptedException e) {
       }
-      rj = jc.getJob(rj.getJobID());
-      String report = " map = " + Math.round(rj.mapProgress() * 100) + "%,  reduce ="
-          + Math.round(rj.reduceProgress() * 100) + "%";
+      th.setRunningJob(jc.getJob(rj.getJobID()));
+      updateCounters(th);
+      
+      String report = " map = " + this.mapProgress + "%,  reduce = " + this.reduceProgress + "%";
 
       if (!report.equals(lastReport)
           || System.currentTimeMillis() >= reportTime + maxReportInterval) {
 
+        // write out serialized plan with counters to log file
+        // LOG.info(queryPlan);
         String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
         SessionState ss = SessionState.get();
         if (ss != null) {
@@ -223,13 +250,21 @@
               Keys.TASK_HADOOP_PROGRESS, output);
           ss.getHiveHistory().progressTask(
               SessionState.get().getQueryId(), this);
+          ss.getHiveHistory().logPlanProgress(queryPlan);
         }
         console.printInfo(output);
         lastReport = report;
         reportTime = System.currentTimeMillis();
       }
     }
-    return rj;
+    setDone();
+    th.setRunningJob(jc.getJob(rj.getJobID()));
+    updateCounters(th);
+    SessionState ss = SessionState.get();
+    if (ss != null) {
+      ss.getHiveHistory().logPlanProgress(queryPlan);
+    }
+    //LOG.info(queryPlan);
   }
 
   /**
@@ -309,10 +344,46 @@
   }
 
   /**
+   * update counters relevant to this task
+   */
+  @Override
+  public void updateCounters(TaskHandle t) throws IOException {
+    ExecDriverTaskHandle th = (ExecDriverTaskHandle)t;
+    RunningJob rj = th.getRunningJob();
+    this.mapProgress = Math.round(rj.mapProgress() * 100);
+    this.reduceProgress = Math.round(rj.mapProgress() * 100);
+    taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long.valueOf(this.mapProgress));
+    taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long.valueOf(this.reduceProgress));
+    Counters ctrs = th.getCounters();
+    for (Operator<? extends Serializable> op: work.getAliasToWork().values()) {
+      op.updateCounters(ctrs);
+    }
+    if (work.getReducer() != null) {
+      work.getReducer().updateCounters(ctrs);
+    }
+  }
+
+  public boolean mapStarted() {
+    return mapProgress > 0;
+  }
+
+  public boolean reduceStarted() {
+    return reduceProgress > 0;
+  }
+
+  public boolean mapDone() {
+    return mapProgress == 100;
+  }
+
+  public boolean reduceDone() {
+    return reduceProgress == 100;
+  }
+
+  
+  /**
    * Execute a query plan using Hadoop
    */
   public int execute() {
-
     try {
       setNumberOfReducers();
     } catch(IOException e) {
@@ -377,6 +448,7 @@
       if (pwd != null)
         job.set(HiveConf.ConfVars.METASTOREPWD.varname, "HIVE");
       JobClient jc = new JobClient(job);
+      
 
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
@@ -391,10 +463,11 @@
       runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL()
           + "&action=kill");
 
+      TaskHandle th = new ExecDriverTaskHandle(jc, rj);
       jobInfo(rj);
-      rj = jobProgress(jc, rj);
+      progress(th);
 
-      if(rj == null) {
+      if (rj == null) {
         // in the corner case where the running job has disappeared from JT memory
         // remember that we did actually submit the job.
         rj = orig_rj;
@@ -568,6 +641,7 @@
 
     mapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
     ExecDriver ed = new ExecDriver(plan, conf, isSilent);
+
     int ret = ed.execute();
     if (ret != 0) {
       System.out.println("Job Failed");

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Wed Aug 19 21:02:57 2009
@@ -44,6 +44,10 @@
 public class ExplainTask extends Task<explainWork> implements Serializable {
   private static final long serialVersionUID = 1L;
 
+  public ExplainTask() {
+    super();
+  }
+  
   public int execute() {
     
     try {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Wed Aug 19 21:02:57 2009
@@ -38,7 +38,7 @@
     initializeChildren(hconf);
   }
 
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
     forward(eval.evaluate(row), outputObjInspector);
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Wed Aug 19 21:02:57 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.plan.fetchWork;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -70,8 +71,12 @@
  	private LazySimpleSerDe mSerde;
  	private int totalRows;
   
-  public void initialize (HiveConf conf) {
-    super.initialize(conf);
+ 	public FetchTask() {
+ 	  super();
+ 	}
+ 	
+  public void initialize (HiveConf conf, QueryPlan queryPlan) {
+    super.initialize(conf, queryPlan);
     
    	try {
        // Create a file system handle  

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Aug 19 21:02:57 2009
@@ -155,7 +155,16 @@
   }
 
   Writable recordValue; 
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
+    // Since File Sink is a terminal operator, forward is not called - so, maintain the number of output rows explicitly
+    if (counterNameToEnum != null) {
+      ++this.outputRows;
+      if (this.outputRows % 1000 == 0) {
+        incrCounter(numOutputRowsCntr, outputRows);
+        this.outputRows = 0;
+      }
+    }
+
     try {
       if (reporter != null)
         reporter.progress();
@@ -173,10 +182,18 @@
     }
   }
 
+  @Override
   public void close(boolean abort) throws HiveException {
+
     if (state == State.CLOSE) 
       return;
-  
+
+    // Since close of super class is not invoked, update counters
+    if (counterNameToEnum != null) {
+      incrCounter(numInputRowsCntr, inputRows);
+      incrCounter(numOutputRowsCntr, outputRows);
+      incrCounter(timeTakenCntr, totalTime);
+    }
     state = State.CLOSE;
     if (!abort) {
       if (outWriter != null) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Wed Aug 19 21:02:57 2009
@@ -61,7 +61,7 @@
     initializeChildren(hconf);
   }
 
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
     ObjectInspector rowInspector = inputObjInspectors[tag];
     if (conditionInspector == null) {
       conditionInspector = (PrimitiveObjectInspector)conditionEvaluator.initialize(rowInspector);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Wed Aug 19 21:02:57 2009
@@ -31,7 +31,7 @@
   private static final long serialVersionUID = 1L;
 
   @Override
-  public void process(Object row, int tag)
+  public void processOp(Object row, int tag)
       throws HiveException {
     forward(row, inputObjInspectors[tag]);    
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Wed Aug 19 21:02:57 2009
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FunctionWork;
 import org.apache.hadoop.hive.ql.plan.createFunctionDesc;
@@ -39,8 +40,12 @@
 
   transient HiveConf conf;
   
-  public void initialize(HiveConf conf) {
-    super.initialize(conf);
+  public FunctionTask() {
+    super();
+  }
+  
+  public void initialize(HiveConf conf, QueryPlan queryPlan) {
+    super.initialize(conf, queryPlan);
     this.conf = conf;
   }
   

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Aug 19 21:02:57 2009
@@ -458,7 +458,7 @@
       keysCurrentGroup.clear();
   }
   
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
     firstRow = false;
     ObjectInspector rowInspector = inputObjInspectors[tag];
     // Total number of input rows is needed for hash aggregation only

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Wed Aug 19 21:02:57 2009
@@ -40,7 +40,7 @@
     initializeChildren(hconf);
   }
   
-  public void process(Object row, int tag)
+  public void processOp(Object row, int tag)
       throws HiveException {
     try {
       // get alias

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Wed Aug 19 21:02:57 2009
@@ -40,7 +40,7 @@
     currCount = 0;
   }
 
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
     if (currCount < limit) {
       forward(row, inputObjInspectors[tag]);
       currCount++;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Aug 19 21:02:57 2009
@@ -197,7 +197,7 @@
   }
 
   @Override
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
     try {
 
       // get alias

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Aug 19 21:02:57 2009
@@ -306,7 +306,7 @@
     }
   }
 
-  public void process(Object row, int tag)
+  public void processOp(Object row, int tag)
       throws HiveException {
     throw new HiveException("Hive 2 Internal error: should not be called!");
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Wed Aug 19 21:02:57 2009
@@ -43,6 +43,10 @@
     
   private static final long serialVersionUID = 1L;
 
+  public MapRedTask() {
+    super();
+  }
+  
   public int execute() {
 
     try {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Aug 19 21:02:57 2009
@@ -43,6 +43,10 @@
 
   private static final long serialVersionUID = 1L;
 
+  public MoveTask() {
+    super();
+  }
+  
   public int execute() {
 
     try {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Aug 19 21:02:57 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.explain;
 import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -37,6 +39,7 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
@@ -50,6 +53,25 @@
   
   protected List<Operator<? extends Serializable>> childOperators;
   protected List<Operator<? extends Serializable>> parentOperators;
+  protected String operatorId;
+  /**
+   * List of counter names associated with the operator
+   * It contains the following default counters
+   *   NUM_INPUT_ROWS
+   *   NUM_OUTPUT_ROWS
+   *   TIME_TAKEN
+   * Individual operators can add to this list via addToCounterNames methods
+   */
+  protected ArrayList<String> counterNames;
+  
+  /**
+   * Each operator has its own map of its counter names to disjoint
+   * ProgressCounter - it is populated at compile time and is read in
+   * at run-time while extracting the operator specific counts
+   */
+  protected HashMap<String, ProgressCounter> counterNameToEnum;
+  
+
   private static int seqId;
 
   // It can be optimized later so that an operator operator (init/close) is performed
@@ -323,14 +345,27 @@
     initialize(hconf, null);    
   }
 
+  
    /**
    * Process the row.
    * @param row  The object representing the row.
    * @param tag  The tag of the row usually means which parent this row comes from.
    *             Rows with the same tag should have exactly the same rowInspector all the time.
    */
-  public abstract void process(Object row, int tag) throws HiveException;
- 
+  public abstract void processOp(Object row, int tag) throws HiveException;
+  
+  /**
+   * Process the row.
+   * @param row  The object representing the row.
+   * @param tag  The tag of the row usually means which parent this row comes from.
+   *             Rows with the same tag should have exactly the same rowInspector all the time.
+   */
+  public void process(Object row, int tag) throws HiveException {
+    preProcessCounter();
+    processOp(row, tag);
+    postProcessCounter();
+  }
+  
   // If a operator wants to do some work at the beginning of a group
   public void startGroup() throws HiveException {
     LOG.debug("Starting group");
@@ -345,7 +380,7 @@
     LOG.debug("Start group Done");
   }  
   
-  // If a operator wants to do some work at the beginning of a group
+  // If a operator wants to do some work at the end of a group
   public void endGroup() throws HiveException {
     LOG.debug("Ending group");
     
@@ -363,6 +398,12 @@
     if (state == State.CLOSE) 
       return;
 
+    if (counterNameToEnum != null) {
+      incrCounter(numInputRowsCntr, inputRows);
+      incrCounter(numOutputRowsCntr, outputRows);
+      incrCounter(timeTakenCntr, totalTime);
+    }
+    
     LOG.info(id + " forwarded " + cntr + " rows");
 
     try {
@@ -457,6 +498,14 @@
   }
 
   protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+    
+    if ((++outputRows % 1000) == 0) {
+      if (counterNameToEnum != null) {
+        incrCounter(numOutputRowsCntr, outputRows);
+        outputRows = 0;
+      }
+    }
+    
     if (LOG.isInfoEnabled()) {
       cntr++;
       if (cntr == nextCntr) {
@@ -608,4 +657,197 @@
         Arrays.asList(fieldObjectInspectors));
   }
   
+  /**
+   * All counter stuff below this
+   */
+  
+  /**
+   * TODO This is a hack for hadoop 0.17 which only supports enum counters
+   */
+  public static enum ProgressCounter { 
+    C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, 
+    C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, 
+    C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, 
+    C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, 
+    C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80, 
+    C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96, 
+    C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112, 
+    C113, C114, C115, C116, C117, C118, C119, C120, C121, C122, C123, C124, C125, C126, C127, C128
+  };
+
+  private static int totalNumCntrs = 128;
+  
+  /**
+   * populated at runtime from hadoop counters at run time in the client
+   */
+  transient protected Map<String, Long> counters;
+  
+  /**
+   * keeps track of unique ProgressCounter enums used
+   * this value is used at compile time while assigning ProgressCounter
+   * enums to counter names
+   */
+  private static int lastEnumUsed;  
+
+  transient protected long inputRows = 0;
+  transient protected long outputRows = 0;
+  transient protected long beginTime = 0;
+  transient protected long totalTime = 0;
+  
+  /**
+   * this is called before operator process to buffer some counters
+   */
+  private void preProcessCounter()
+  {
+    inputRows++;
+    
+    if (((inputRows % 1000) == 0) && (counterNameToEnum != null)) {
+      incrCounter(numInputRowsCntr, inputRows);
+      incrCounter(timeTakenCntr, totalTime);
+      inputRows = 0 ;
+      totalTime = 0;
+    }
+    beginTime = System.currentTimeMillis();
+  }
+
+  /**
+   * this is called after operator process to buffer some counters
+   */
+  private void postProcessCounter()
+  {
+    totalTime += (System.currentTimeMillis() - beginTime);
+  }
+
+  
+  /**
+   * this is called in operators in map or reduce tasks
+   * @param name
+   * @param amount
+   */
+  protected void incrCounter(String name, long amount)
+  {
+    String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name;
+    ProgressCounter pc = counterNameToEnum.get(counterName);
+
+    // Currently, we maintain 128 counters per plan - in case of a bigger tree, we may run out of them
+    if (pc == null)
+      LOG.warn("Using too many counters. Increase the total number of counters for " + counterName);
+    else if (reporter != null)
+      reporter.incrCounter(pc, amount);
+  }
+
+  public ArrayList<String> getCounterNames() {
+    return counterNames;
+  }
+
+  public void setCounterNames(ArrayList<String> counterNames) {
+    this.counterNames = counterNames;
+  }
+
+  public String getOperatorId() {
+    return operatorId;
+  }
+
+  public void initOperatorId() {
+    setOperatorId(getName() + "_" + this.id);
+  }
+
+  public void setOperatorId(String operatorId) {
+    this.operatorId = operatorId;
+  }
+  
+  public Map<String, Long> getCounters() {
+    return counters;
+  }
+  
+  /**
+   * called in ExecDriver.progress periodically
+   * @param ctrs counters from the running job
+   */
+  @SuppressWarnings("unchecked")
+  public void updateCounters(Counters ctrs) {
+    if (counters == null) {
+      counters = new HashMap<String, Long>();
+    }
+
+    // For some old unit tests, the counters will not be populated. Eventually, the old tests should be removed
+    if (counterNameToEnum == null)
+      return;
+
+    for (Map.Entry<String, ProgressCounter> counter: counterNameToEnum.entrySet()) {
+      counters.put(counter.getKey(), ctrs.getCounter(counter.getValue()));
+    }
+    // update counters of child operators
+    // this wont be an infinite loop since the operator graph is acyclic
+    // but, some operators may be updated more than once and that's ok
+    if (getChildren() != null) {
+      for (Node op: getChildren()) {
+        ((Operator<? extends Serializable>)op).updateCounters(ctrs);
+      }
+    }
+  }
+
+  // A given query can have multiple map-reduce jobs
+  public static void resetLastEnumUsed() {
+    lastEnumUsed = 0;
+  }
+
+  /**
+   * Called only in SemanticAnalyzer after all operators have added their
+   * own set of counter names
+   */
+  public void assignCounterNameToEnum() {
+    if (counterNameToEnum != null) {
+      return;
+    }
+    counterNameToEnum = new HashMap<String, ProgressCounter>();
+    for (String counterName: getCounterNames()) {
+      ++lastEnumUsed;
+      
+      // TODO Hack for hadoop-0.17
+      // Currently, only maximum number of 'totalNumCntrs' can be used. If you want
+      // to add more counters, increase the number of counters in ProgressCounter
+      if (lastEnumUsed > totalNumCntrs) {
+        LOG.warn("Using too many counters. Increase the total number of counters");
+        return;
+      }      
+      String enumName = "C" + lastEnumUsed;
+      ProgressCounter ctr = ProgressCounter.valueOf(enumName);
+      counterNameToEnum.put(counterName, ctr);
+    }
+  }
+
+  protected static String numInputRowsCntr  = "NUM_INPUT_ROWS";
+  protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
+  protected static String timeTakenCntr     = "TIME_TAKEN";
+  
+  public void initializeCounters() {
+    initOperatorId();
+    counterNames = new ArrayList<String>();
+    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numInputRowsCntr);
+    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numOutputRowsCntr);
+    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + timeTakenCntr);
+    List<String> newCntrs = getAdditionalCounters();
+    if (newCntrs != null) {
+      counterNames.addAll(newCntrs);
+    }
+  }
+
+  /*
+   * By default, the list is empty - if an operator wants to add more counters, it should override this method
+   * and provide the new list.
+   */
+  private List<String> getAdditionalCounters() {
+    return null;
+  }
+
+  public HashMap<String, ProgressCounter> getCounterNameToEnum() {
+    return counterNameToEnum;
+  }
+
+  public void setCounterNameToEnum(HashMap<String, ProgressCounter> counterNameToEnum) {
+    this.counterNameToEnum = counterNameToEnum;
+  }
+
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Aug 19 21:02:57 2009
@@ -60,7 +60,9 @@
     for(opTuple o: opvec) {
       if(o.descClass == opClass) {
         try {
-          return (Operator<T>)o.opClass.newInstance();
+          Operator<T> op = (Operator<T>)o.opClass.newInstance();
+          op.initializeCounters();
+          return op;
         } catch (Exception e) {
           e.printStackTrace();
           throw new RuntimeException(e);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Aug 19 21:02:57 2009
@@ -126,7 +126,7 @@
   boolean firstRow;
   
   transient Random random;
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
@@ -203,6 +203,12 @@
     try {
       if (out != null) {
         out.collect(keyWritable, value);
+        // Since this is a terminal operator, update counters explicitly - forward is not called
+        ++this.outputRows;
+        if (this.outputRows % 1000 == 0) {
+          incrCounter(numOutputRowsCntr, outputRows);
+          this.outputRows = 0;
+        }
       }
     } catch (IOException e) {
       throw new HiveException (e);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Wed Aug 19 21:02:57 2009
@@ -258,7 +258,7 @@
   }
 
   Text text = new Text();
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
 
     if(scriptError != null) {
       throw new HiveException(scriptError);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Wed Aug 19 21:02:57 2009
@@ -58,7 +58,7 @@
     initializeChildren(hconf);
   }
 
-  public void process(Object row, int tag)
+  public void processOp(Object row, int tag)
       throws HiveException {
 
     // Just forward the row as is

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Wed Aug 19 21:02:57 2009
@@ -37,7 +37,7 @@
    * i.e table data is not only read by the mapper, this operator will be enhanced to read the table.
    **/
   @Override
-  public void process(Object row, int tag)
+  public void processOp(Object row, int tag)
       throws HiveException {
     forward(row, inputObjInspectors[tag]);    
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Aug 19 21:02:57 2009
@@ -21,6 +21,7 @@
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -38,11 +39,15 @@
 public abstract class Task <T extends Serializable> implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  transient boolean isdone;
+  transient protected boolean started;
+  transient protected boolean isdone;
   transient protected HiveConf conf;
   transient protected Hive db;
   transient protected Log LOG;
   transient protected LogHelper console;
+  transient protected QueryPlan queryPlan;
+  transient protected TaskHandle taskHandle;
+  transient protected Map<String, Long> taskCounters;
 
   // Bean methods
 
@@ -51,13 +56,17 @@
 
   public Task() {
     isdone = false;
+    started = false;
     LOG = LogFactory.getLog(this.getClass().getName());
+    this.taskCounters = new HashMap<String, Long>();
   }
 
-  public void initialize (HiveConf conf) {
+  public void initialize (HiveConf conf, QueryPlan queryPlan) {
+    this.queryPlan = queryPlan;
     isdone = false;
+    started = false;
     this.conf = conf;
-    
+
     SessionState ss = SessionState.get();
     try {
       if (ss == null) {
@@ -77,7 +86,45 @@
     console = new LogHelper(LOG);
   }
 
-  public abstract int execute();
+  /**
+   * This method is called in the Driver on every task. It updates counters
+   * and calls execute(), which is overridden in each task
+   * @return return value of execute()
+   */
+  public int executeTask() {
+    try {
+      SessionState ss = SessionState.get();
+      this.setStarted();
+      if (ss != null) {
+        ss.getHiveHistory().logPlanProgress(queryPlan);
+      }
+      int retval = execute();
+      this.setDone();
+      if (ss != null) {
+        ss.getHiveHistory().logPlanProgress(queryPlan);
+      }
+      return retval;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  /**
+   * This method is overridden in each Task.
+   * TODO execute should return a TaskHandle.
+   * @return status of executing the task
+   */
+  protected abstract int execute();
+  
+  /**
+   * Update the progress of the task within taskHandle and also
+   * dump the progress information to the history file
+   * @param taskHandle task handle returned by execute
+   * @throws IOException 
+   */
+  public void progress(TaskHandle taskHandle) throws IOException {
+    // do nothing by default
+  }
   
   // dummy method - FetchTask overwrites this
   public boolean fetch(Vector<String> res) throws IOException { 
@@ -135,6 +182,14 @@
     }
   }
 
+  public void setStarted() {
+    this.started = true;
+  }
+
+  public boolean started() {
+    return started;
+  }
+
   public boolean done() {
     return isdone;
   }
@@ -182,4 +237,13 @@
   public boolean hasReduce() {
     return false;
   }
+
+  public void updateCounters(TaskHandle th) throws IOException {
+    // default, do nothing
+  }
+
+  public Map<String, Long> getCounters() {
+    return taskCounters;
+  }
+
 }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java?rev=805973&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java Wed Aug 19 21:02:57 2009
@@ -0,0 +1,17 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.Counters;
+
+public class TaskHandle {
+  // The eventual goal is to monitor the progress of all the tasks, not only the map reduce task.
+  // The execute() method of the tasks will return immediately, and return a task specific handle to 
+  // monitor the progress of that task. 
+  // Right now, the behavior is kind of broken, ExecDriver's execute method calls progress - instead it should
+  // be invoked by Driver
+  public Counters getCounters() throws IOException {
+    // default implementation
+    return null;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Wed Aug 19 21:02:57 2009
@@ -110,7 +110,7 @@
   }
   
   @Override
-  public void process(Object row, int tag) throws HiveException {
+  public void processOp(Object row, int tag) throws HiveException {
 
     StructObjectInspector soi = parentObjInspectors[tag];
       List<? extends StructField> fields = parentFields[tag];

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Aug 19 21:02:57 2009
@@ -169,6 +169,8 @@
     // workaround for java 1.5
     e.setPersistenceDelegate( ExpressionTypes.class, new EnumDelegate() );
     e.setPersistenceDelegate( groupByDesc.Mode.class, new EnumDelegate());
+    e.setPersistenceDelegate( Operator.ProgressCounter.class, new EnumDelegate());
+
     e.writeObject(t);
     e.close();
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Wed Aug 19 21:02:57 2009
@@ -35,6 +35,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -65,7 +66,7 @@
   private static final String DELIMITER = " ";
 
   public static enum RecordTypes {
-    QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd
+    QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd, Counters
   };
 
   public static enum Keys {
@@ -426,6 +427,18 @@
     log(RecordTypes.TaskProgress, ti.hm);
 
   }
+
+  /**
+   * write out counters
+   */
+  static Map<String, String> ctrmap = null;
+  public void logPlanProgress(QueryPlan plan) throws IOException {
+    if (ctrmap == null) {
+      ctrmap = new HashMap<String, String>();
+    }
+    ctrmap.put("plan", plan.toString());
+    log(RecordTypes.Counters, ctrmap);
+  }
   
   /**
    * Set the table to id map

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Aug 19 21:02:57 2009
@@ -4311,14 +4311,60 @@
     // reduce sink does not have any kids - since the plan by now has been broken up into multiple
     // tasks, iterate over all tasks.
     // For each task, go over all operators recursively
-    for(Task<? extends Serializable> rootTask: rootTasks)
+    for (Task<? extends Serializable> rootTask: rootTasks)
       breakTaskTree(rootTask);
 
     // For each task, set the key descriptor for the reducer
-    for(Task<? extends Serializable> rootTask: rootTasks)
+    for (Task<? extends Serializable> rootTask: rootTasks)
       setKeyDescTaskTree(rootTask);
+    
+    // For each operator, generate the counters
+    for (Task<? extends Serializable> rootTask: rootTasks)
+      generateCountersTask(rootTask);
+  }
+
+  // loop over all the tasks recursviely
+  private void generateCountersTask(Task<? extends Serializable> task) { 
+    if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) {
+      HashMap<String, Operator<? extends Serializable>> opMap = ((mapredWork)task.getWork()).getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends Serializable> op: opMap.values()) {
+          generateCountersOperator(op);
+        }
+      }
+      
+      Operator <? extends Serializable> reducer = ((mapredWork)task.getWork()).getReducer();
+      if (reducer != null) {
+        LOG.info("Generating counters for operator " + reducer);
+        generateCountersOperator(reducer);
+      }
+    }
+    else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask)task).getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks)
+        generateCountersTask(tsk);
+    }
+
+    // Start the counters from scratch - a hack for hadoop 17.
+    Operator.resetLastEnumUsed();
+
+    if (task.getChildTasks() == null)
+      return;
+    
+    for (Task<? extends Serializable> childTask :  task.getChildTasks())
+      generateCountersTask(childTask);
   }
+  
+  private void generateCountersOperator(Operator<? extends Serializable> op) {
+    op.assignCounterNameToEnum();
 
+    if (op.getChildOperators() == null)
+      return;
+    
+    for (Operator<? extends Serializable> child: op.getChildOperators())
+      generateCountersOperator(child);
+  }
+  
   // loop over all the tasks recursviely
   private void breakTaskTree(Task<? extends Serializable> task) { 
     
@@ -4329,6 +4375,11 @@
           breakOperatorTree(op);
         }
     }
+    else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask)task).getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks)
+        breakTaskTree(tsk);
+    }
 
     if (task.getChildTasks() == null)
       return;

Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java Wed Aug 19 21:02:57 2009
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.plan;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-
-public class TestAddPartition extends TestCase {
-
-  private static final String PART1_NAME = "part1";
-  private static final String PART2_NAME = "part2";
-
-  public TestAddPartition() throws Exception {
-    super();
-  }
-
-  public void testAddPartition() throws Exception {
-    Configuration conf = new Configuration();
-    HiveConf hiveConf = new HiveConf(conf, TestAddPartition.class);
-    HiveMetaStoreClient client = null;
-
-    try {
-      client = new HiveMetaStoreClient(hiveConf);
-
-      String dbName = "testdb";
-      String tableName = "tablename";
-
-      Table tbl = new Table();
-      tbl.setTableName(tableName);
-      tbl.setDbName(dbName);
-      tbl.setParameters(new HashMap<String, String>());
-
-      StorageDescriptor sd = new StorageDescriptor();
-      sd.setSerdeInfo(new SerDeInfo());
-      sd.getSerdeInfo().setName(tbl.getTableName());
-      sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
-
-      List<FieldSchema> fss = new ArrayList<FieldSchema>();
-      fss.add(new FieldSchema("name", Constants.STRING_TYPE_NAME, ""));
-      sd.setCols(fss);
-      tbl.setSd(sd);
-
-      tbl.setPartitionKeys(new ArrayList<FieldSchema>());
-      tbl.getPartitionKeys().add(
-          new FieldSchema(PART1_NAME, Constants.STRING_TYPE_NAME, ""));
-      tbl.getPartitionKeys().add(
-          new FieldSchema(PART2_NAME, Constants.STRING_TYPE_NAME, ""));
-
-      client.dropTable(dbName, tableName);
-      client.dropDatabase(dbName);
-
-      client.createDatabase(dbName, "newloc");
-      client.createTable(tbl);
-
-      tbl = client.getTable(dbName, tableName);
-
-      List<String> partValues = new ArrayList<String>();
-      partValues.add("value1");
-      partValues.add("value2");
-
-      Map<String, String> part1 = new HashMap<String, String>();
-      part1.put(PART1_NAME, "value1");
-      part1.put(PART2_NAME, "value2");
-      
-      List<Map<String, String>> partitions = new ArrayList<Map<String, String>>();
-      partitions.add(part1);
-      
-      // no partitions yet
-      List<Partition> parts = client.listPartitions(dbName, tableName,
-          (short) -1);
-      assertTrue(parts.isEmpty());
-
-      String partitionLocation = PART1_NAME + Path.SEPARATOR + PART2_NAME;
-      // add the partitions
-      for (Map<String,String> map : partitions) {
-        AddPartitionDesc addPartition = new AddPartitionDesc(dbName, 
-            tableName, map, partitionLocation);
-        Task<DDLWork> task = TaskFactory.get(new DDLWork(addPartition), hiveConf);
-        task.initialize(hiveConf);
-        assertEquals(0, task.execute());
-      }
-
-      // should have one
-      parts = client.listPartitions(dbName, tableName, (short) -1);
-      assertEquals(1, parts.size());
-      Partition insertedPart = parts.get(0);
-      assertEquals(tbl.getSd().getLocation() + Path.SEPARATOR + partitionLocation, 
-          insertedPart.getSd().getLocation());
-
-      client.dropPartition(dbName, tableName, insertedPart.getValues());
-
-      // add without location specified
-
-      AddPartitionDesc addPartition = new AddPartitionDesc(dbName, tableName, part1, null);
-      Task<DDLWork> task = TaskFactory.get(new DDLWork(addPartition), hiveConf);
-      task.initialize(hiveConf);
-      assertEquals(0, task.execute());
-      parts = client.listPartitions(dbName, tableName, (short) -1);
-      assertEquals(1, parts.size());
-
-      // see that this fails properly
-      addPartition = new AddPartitionDesc(dbName, "doesnotexist", part1, null);
-      task = TaskFactory.get(new DDLWork(addPartition), hiveConf);
-      task.initialize(hiveConf);
-      assertEquals(1, task.execute());
-    } finally {
-      if (client != null) {
-        client.close();
-      }
-    }
-  }
-
-}
\ No newline at end of file