You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/19 23:03:05 UTC
svn commit: r805973 [4/19] - in /hadoop/hive/trunk: ./ data/files/
eclipse-templates/
metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/
metastore/src/gen-py/hive_metastore/ ql/ ql/if/ ql/src/gen-javabean/
ql/src/gen-javabean/org/ ql/src/...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Wed Aug 19 21:02:57 2009
@@ -18,20 +18,63 @@
package org.apache.hadoop.hive.ql;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collection;
import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
+import org.apache.hadoop.hive.ql.plan.api.NodeType;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.plan.api.TaskType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TTransportException;
+
+
+public class QueryPlan implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName());
-public class QueryPlan {
private String queryString;
private BaseSemanticAnalyzer plan;
private String queryId;
+ private org.apache.hadoop.hive.ql.plan.api.Query query;
+ private Map<String, Map<String, Long>> counters;
+ private Set<String> done;
+ private Set<String> started;
+
+ private boolean add;
+
public QueryPlan(String queryString, BaseSemanticAnalyzer plan) {
this.queryString = queryString;
this.plan = plan;
this.queryId = makeQueryId();
+ query = new org.apache.hadoop.hive.ql.plan.api.Query();
+ query.setQueryId(this.queryId);
+ query.putToQueryAttributes("queryString", this.queryString);
+ counters = new HashMap<String, Map<String, Long>>();
+ done = new HashSet<String>();
+ started = new HashSet<String>();
}
public String getQueryStr() {
@@ -57,4 +100,574 @@
gc.get(Calendar.HOUR_OF_DAY),
gc.get(Calendar.MINUTE), gc.get(Calendar.SECOND));
}
+
+ /**
+ * TODO each Task should define a getType() method which will return the values
+ * instead of this method
+ * @param task
+ * @return
+ */
+ private int getStageType(Task<? extends Serializable> task) {
+ if (task instanceof ConditionalTask) {
+ return StageType.CONDITIONAL;
+ }
+ if (task instanceof CopyTask) {
+ return StageType.COPY;
+ }
+ if (task instanceof DDLTask) {
+ return StageType.DDL;
+ }
+ if (task instanceof ExecDriver) {
+ return StageType.MAPRED;
+ }
+ if (task instanceof ExplainTask) {
+ return StageType.EXPLAIN;
+ }
+ if (task instanceof FetchTask) {
+ return StageType.FETCH;
+ }
+ if (task instanceof FunctionTask) {
+ return StageType.FUNC;
+ }
+ if (task instanceof MapRedTask) {
+ return StageType.MAPREDLOCAL;
+ }
+ if (task instanceof MoveTask) {
+ return StageType.MOVE;
+ }
+ assert false;
+ return -1;
+ }
+
+ /**
+ * TODO remove this method. add a getType() method for each operator
+ * @param op
+ * @return
+ */
+ private int getOperatorType(Operator<? extends Serializable> op) {
+ if (op instanceof JoinOperator) {
+ return OperatorType.JOIN;
+ }
+ if (op instanceof MapJoinOperator) {
+ return OperatorType.MAPJOIN;
+ }
+ if (op instanceof ExtractOperator) {
+ return OperatorType.EXTRACT;
+ }
+ if (op instanceof FilterOperator) {
+ return OperatorType.FILTER;
+ }
+ if (op instanceof ForwardOperator) {
+ return OperatorType.FORWARD;
+ }
+ if (op instanceof GroupByOperator) {
+ return OperatorType.GROUPBY;
+ }
+ if (op instanceof LimitOperator) {
+ return OperatorType.LIMIT;
+ }
+ if (op instanceof ScriptOperator) {
+ return OperatorType.SCRIPT;
+ }
+ if (op instanceof SelectOperator) {
+ return OperatorType.SELECT;
+ }
+ if (op instanceof TableScanOperator) {
+ return OperatorType.TABLESCAN;
+ }
+ if (op instanceof FileSinkOperator) {
+ return OperatorType.FILESINK;
+ }
+ if (op instanceof ReduceSinkOperator) {
+ return OperatorType.REDUCESINK;
+ }
+ if (op instanceof UnionOperator) {
+ return OperatorType.UNION;
+ }
+ assert false;
+ return -1;
+ }
+
+ /**
+ * generate the operator graph and operator list for the given task based on
+ * the operators corresponding to that task
+ * @param task api.Task which needs its operator graph populated
+ * @param topOps the set of top operators from which the operator graph for the task
+ * is hanging
+ */
+ private void populateOperatorGraph(org.apache.hadoop.hive.ql.plan.api.Task task,
+ Collection<Operator<? extends Serializable>> topOps) {
+
+ task.setOperatorGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
+ task.getOperatorGraph().setNodeType(NodeType.OPERATOR);
+
+ Queue<Operator<? extends Serializable>> opsToVisit = new LinkedList<Operator<? extends Serializable>>();
+ Set<Operator<? extends Serializable>> opsVisited = new HashSet<Operator<? extends Serializable>>();
+ opsToVisit.addAll(topOps);
+ while (opsToVisit.peek() != null) {
+ Operator<? extends Serializable> op = opsToVisit.remove();
+ opsVisited.add(op);
+ // populate the operator
+ org.apache.hadoop.hive.ql.plan.api.Operator operator = new org.apache.hadoop.hive.ql.plan.api.Operator();
+ operator.setOperatorId(op.getOperatorId());
+ operator.setOperatorType(getOperatorType(op));
+ task.addToOperatorList(operator);
+ // done processing the operator
+ if (op.getChildOperators() != null) {
+ org.apache.hadoop.hive.ql.plan.api.Adjacency entry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+ entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
+ entry.setNode(op.getOperatorId());
+ for (Operator<? extends Serializable> childOp: op.getChildOperators()) {
+ entry.addToChildren(childOp.getOperatorId());
+ if (!opsVisited.contains(childOp)) {
+ opsToVisit.add(childOp);
+ }
+ }
+ task.getOperatorGraph().addToAdjacencyList(entry);
+ }
+ }
+ }
+
+ /**
+ * Populate api.QueryPlan from exec structures. This includes constructing the
+ * dependency graphs of stages and operators.
+ *
+ * @throws IOException
+ */
+ private void populateQueryPlan() throws IOException {
+ query.setStageGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
+ query.getStageGraph().setNodeType(NodeType.STAGE);
+
+ Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
+ Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
+ tasksToVisit.addAll(plan.getRootTasks());
+ while (tasksToVisit.size() != 0) {
+ Task<? extends Serializable> task = tasksToVisit.remove();
+ tasksVisited.add(task);
+ // populate stage
+ org.apache.hadoop.hive.ql.plan.api.Stage stage = new org.apache.hadoop.hive.ql.plan.api.Stage();
+ stage.setStageId(task.getId());
+ stage.setStageType(getStageType(task));
+ query.addToStageList(stage);
+
+ if (task instanceof ExecDriver) {
+ // populate map task
+ ExecDriver mrTask = (ExecDriver)task;
+ org.apache.hadoop.hive.ql.plan.api.Task mapTask = new org.apache.hadoop.hive.ql.plan.api.Task();
+ mapTask.setTaskId(stage.getStageId() + "_MAP");
+ mapTask.setTaskType(TaskType.MAP);
+ stage.addToTaskList(mapTask);
+ populateOperatorGraph(mapTask, mrTask.getWork().getAliasToWork().values());
+
+ // populate reduce task
+ if (mrTask.hasReduce()) {
+ org.apache.hadoop.hive.ql.plan.api.Task reduceTask = new org.apache.hadoop.hive.ql.plan.api.Task();
+ reduceTask.setTaskId(stage.getStageId() + "_REDUCE");
+ reduceTask.setTaskType(TaskType.REDUCE);
+ stage.addToTaskList(reduceTask);
+ Collection<Operator<? extends Serializable>> reducerTopOps = new ArrayList<Operator<? extends Serializable>>();
+ reducerTopOps.add(mrTask.getWork().getReducer());
+ populateOperatorGraph(reduceTask, reducerTopOps);
+ }
+ }
+ else {
+ org.apache.hadoop.hive.ql.plan.api.Task otherTask = new org.apache.hadoop.hive.ql.plan.api.Task();
+ otherTask.setTaskId(stage.getStageId() + "_OTHER");
+ otherTask.setTaskType(TaskType.OTHER);
+ stage.addToTaskList(otherTask);
+ }
+ if (task instanceof ConditionalTask) {
+ org.apache.hadoop.hive.ql.plan.api.Adjacency listEntry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+ listEntry.setAdjacencyType(AdjacencyType.DISJUNCTIVE);
+ listEntry.setNode(task.getId());
+ ConditionalTask t = (ConditionalTask)task;
+
+ for (Task<? extends Serializable> listTask: t.getListTasks()) {
+ if (t.getChildTasks() != null) {
+ org.apache.hadoop.hive.ql.plan.api.Adjacency childEntry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+ childEntry.setAdjacencyType(AdjacencyType.DISJUNCTIVE);
+ childEntry.setNode(listTask.getId());
+ // done processing the task
+ for (Task<? extends Serializable> childTask: t.getChildTasks()) {
+ childEntry.addToChildren(childTask.getId());
+ if (!tasksVisited.contains(childTask)) {
+ tasksToVisit.add(childTask);
+ }
+ }
+ query.getStageGraph().addToAdjacencyList(childEntry);
+ }
+
+ listEntry.addToChildren(listTask.getId());
+ if (!tasksVisited.contains(listTask)) {
+ tasksToVisit.add(listTask);
+ }
+ }
+ query.getStageGraph().addToAdjacencyList(listEntry);
+ }
+ else if (task.getChildTasks() != null) {
+ org.apache.hadoop.hive.ql.plan.api.Adjacency entry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
+ entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
+ entry.setNode(task.getId());
+ // done processing the task
+ for (Task<? extends Serializable> childTask: task.getChildTasks()) {
+ entry.addToChildren(childTask.getId());
+ if (!tasksVisited.contains(childTask)) {
+ tasksToVisit.add(childTask);
+ }
+ }
+ query.getStageGraph().addToAdjacencyList(entry);
+ }
+ }
+ }
+
+ /**
+ * From the counters extracted via extractCounters(), update the counters
+ * in the query plan
+ */
+ private void updateCountersInQueryPlan() {
+ query.setStarted(started.contains(query.getQueryId()));
+ query.setDone(done.contains(query.getQueryId()));
+ if (query.getStageList() != null)
+ for (org.apache.hadoop.hive.ql.plan.api.Stage stage: query.getStageList()) {
+ stage.setStarted(started.contains(stage.getStageId()));
+ stage.setStageCounters(counters.get(stage.getStageId()));
+ stage.setDone(done.contains(stage.getStageId()));
+ for (org.apache.hadoop.hive.ql.plan.api.Task task: stage.getTaskList()) {
+ task.setTaskCounters(counters.get(task.getTaskId()));
+ if (task.getTaskType() == TaskType.OTHER) {
+ task.setStarted(started.contains(stage.getStageId()));
+ task.setDone(done.contains(stage.getStageId()));
+ } else {
+ task.setStarted(started.contains(task.getTaskId()));
+ task.setDone(done.contains(task.getTaskId()));
+ for (org.apache.hadoop.hive.ql.plan.api.Operator op: task.getOperatorList()) {
+ // if the task has started, all operators within the task have started
+ op.setStarted(started.contains(task.getTaskId()));
+ op.setOperatorCounters(counters.get(op.getOperatorId()));
+ // if the task is done, all operators are done as well
+ op.setDone(done.contains(task.getTaskId()));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * extract all the counters from tasks and operators
+ */
+ private void extractCounters() throws IOException {
+ Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
+ Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
+ tasksToVisit.addAll(plan.getRootTasks());
+ while (tasksToVisit.peek() != null) {
+ Task<? extends Serializable> task = tasksToVisit.remove();
+ tasksVisited.add(task);
+ // add children to tasksToVisit
+ if (task.getChildTasks() != null) {
+ for (Task<? extends Serializable> childTask: task.getChildTasks()) {
+ if (!tasksVisited.contains(childTask)) {
+ tasksToVisit.add(childTask);
+ }
+ }
+ }
+
+ if (started.contains(task.getId()) && done.contains(task.getId())) {
+ continue;
+ }
+
+ // get the counters for the task
+ counters.put(task.getId(), task.getCounters());
+
+ // check if task is started
+ if (task.started()) {
+ started.add(task.getId());
+ }
+ if (task.done()) {
+ done.add(task.getId());
+ }
+ if (task instanceof ExecDriver) {
+ ExecDriver mrTask = (ExecDriver)task;
+ extractOperatorCounters(mrTask.getWork().getAliasToWork().values(), task.getId() + "_MAP");
+ if (mrTask.mapStarted()) {
+ started.add(task.getId() + "_MAP");
+ }
+ if (mrTask.mapDone()) {
+ done.add(task.getId() + "_MAP");
+ }
+ if (mrTask.hasReduce()) {
+ Collection<Operator<? extends Serializable>> reducerTopOps = new ArrayList<Operator<? extends Serializable>>();
+ reducerTopOps.add(mrTask.getWork().getReducer());
+ extractOperatorCounters(reducerTopOps, task.getId() + "_REDUCE");
+ if (mrTask.reduceStarted()) {
+ started.add(task.getId() + "_REDUCE");
+ }
+ if (mrTask.reduceDone()) {
+ done.add(task.getId() + "_REDUCE");
+ }
+ }
+ }
+ else if (task instanceof ConditionalTask) {
+ ConditionalTask cTask = (ConditionalTask)task;
+ for (Task<? extends Serializable> listTask: cTask.getListTasks()) {
+ if (!tasksVisited.contains(listTask)) {
+ tasksToVisit.add(listTask);
+ }
+ }
+ }
+ }
+ }
+
+ private void extractOperatorCounters(Collection<Operator<? extends Serializable>> topOps, String taskId) {
+ Queue<Operator<? extends Serializable>> opsToVisit = new LinkedList<Operator<? extends Serializable>>();
+ Set<Operator<? extends Serializable>> opsVisited = new HashSet<Operator<? extends Serializable>>();
+ opsToVisit.addAll(topOps);
+ while (opsToVisit.size() != 0) {
+ Operator<? extends Serializable> op = opsToVisit.remove();
+ opsVisited.add(op);
+ counters.put(op.getOperatorId(), op.getCounters());
+ if (op.getDone()) {
+ done.add(op.getOperatorId());
+ }
+ if (op.getChildOperators() != null) {
+ for (Operator<? extends Serializable> childOp: op.getChildOperators()) {
+ if (!opsVisited.contains(childOp)) {
+ opsToVisit.add(childOp);
+ }
+ }
+ }
+ }
+
+ }
+
+ public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException {
+ if (query.getStageGraph() == null) {
+ populateQueryPlan();
+ }
+ extractCounters();
+ updateCountersInQueryPlan();
+ return query;
+ }
+
+ public String getJSONValue(Object value) {
+ String v = "null";
+ if (value != null) {
+ v = value.toString();
+ if (v.charAt(0) != '[' && v.charAt(0) != '{') {
+ v = "\"" + v + "\"";
+ }
+ }
+ return v;
+ }
+
+ public String getJSONKeyValue(Object key, Object value) {
+ return "\"" + key + "\":" + getJSONValue(value) + ",";
+ }
+
+ @SuppressWarnings("unchecked")
+ private String getJSONList(List list) {
+ if (list == null) {
+ return "null";
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (Object entry: list) {
+ sb.append(getJSONValue(entry));
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ public String getJSONMap(Map map) {
+ if (map == null) {
+ return "null";
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ for (Object entry: map.entrySet()) {
+ Map.Entry e = (Map.Entry)entry;
+ sb.append(getJSONKeyValue(e.getKey(), e.getValue()));
+ }
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private Object getJSONGraph(org.apache.hadoop.hive.ql.plan.api.Graph graph) {
+ if (graph == null) {
+ return "null";
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(getJSONKeyValue("nodeType", graph.getNodeType()));
+ sb.append(getJSONKeyValue("roots", getJSONList(graph.getRoots())));
+ // adjacency list
+ List<String> adjList = new ArrayList<String>();
+ if (graph.getAdjacencyList() != null) {
+ for (org.apache.hadoop.hive.ql.plan.api.Adjacency adj: graph.getAdjacencyList()) {
+ adjList.add(getJSONAdjacency(adj));
+ }
+ }
+ sb.append(getJSONKeyValue("adjacencyList", getJSONList(adjList)));
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private String getJSONAdjacency(org.apache.hadoop.hive.ql.plan.api.Adjacency adj) {
+ if (adj == null) {
+ return "null";
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(getJSONKeyValue("node", adj.getNode()));
+ sb.append(getJSONKeyValue("children", getJSONList(adj.getChildren())));
+ sb.append(getJSONKeyValue("adjacencyType", adj.getAdjacencyType()));
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private String getJSONOperator(org.apache.hadoop.hive.ql.plan.api.Operator op) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(getJSONKeyValue("operatorId", op.getOperatorId()));
+ sb.append(getJSONKeyValue("operatorType", op.getOperatorType()));
+ sb.append(getJSONKeyValue("operatorAttributes", getJSONMap(op.getOperatorAttributes())));
+ sb.append(getJSONKeyValue("operatorCounters", getJSONMap(op.getOperatorCounters())));
+ sb.append(getJSONKeyValue("done", op.isDone()));
+ sb.append(getJSONKeyValue("started", op.isStarted()));
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private String getJSONTask(org.apache.hadoop.hive.ql.plan.api.Task task) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(getJSONKeyValue("taskId", task.getTaskId()));
+ sb.append(getJSONKeyValue("taskType", task.getTaskType()));
+ sb.append(getJSONKeyValue("taskAttributes", getJSONMap(task.getTaskAttributes())));
+ sb.append(getJSONKeyValue("taskCounters", getJSONMap(task.getTaskCounters())));
+ sb.append(getJSONKeyValue("operatorGraph", getJSONGraph(task.getOperatorGraph())));
+ // operator list
+ List<String> opList = new ArrayList<String>();
+ if (task.getOperatorList() != null) {
+ for (org.apache.hadoop.hive.ql.plan.api.Operator op: task.getOperatorList()) {
+ opList.add(getJSONOperator(op));
+ }
+ }
+ sb.append(getJSONKeyValue("operatorList", getJSONList(opList)));
+ sb.append(getJSONKeyValue("done", task.isDone()));
+ sb.append(getJSONKeyValue("started", task.isStarted()));
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private String getJSONStage(org.apache.hadoop.hive.ql.plan.api.Stage stage) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(getJSONKeyValue("stageId", stage.getStageId()));
+ sb.append(getJSONKeyValue("stageType", stage.getStageType()));
+ sb.append(getJSONKeyValue("stageAttributes", getJSONMap(stage.getStageAttributes())));
+ sb.append(getJSONKeyValue("stageCounters", getJSONMap(stage.getStageCounters())));
+ List<String> taskList = new ArrayList<String>();
+ if (stage.getTaskList() != null) {
+ for (org.apache.hadoop.hive.ql.plan.api.Task task: stage.getTaskList()) {
+ taskList.add(getJSONTask(task));
+ }
+ }
+ sb.append(getJSONKeyValue("taskList", getJSONList(taskList)));
+ sb.append(getJSONKeyValue("done", stage.isDone()));
+ sb.append(getJSONKeyValue("started", stage.isStarted()));
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ public String getJSONQuery(org.apache.hadoop.hive.ql.plan.api.Query query) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(getJSONKeyValue("queryId", query.getQueryId()));
+ sb.append(getJSONKeyValue("queryType", query.getQueryType()));
+ sb.append(getJSONKeyValue("queryAttributes", getJSONMap(query.getQueryAttributes())));
+ sb.append(getJSONKeyValue("queryCounters", getJSONMap(query.getQueryCounters())));
+ sb.append(getJSONKeyValue("stageGraph", getJSONGraph(query.getStageGraph())));
+ // stageList
+ List<String> stageList = new ArrayList<String>();
+ if (query.getStageList() != null) {
+ for (org.apache.hadoop.hive.ql.plan.api.Stage stage: query.getStageList()) {
+ stageList.add(getJSONStage(stage));
+ }
+ }
+ sb.append(getJSONKeyValue("stageList", getJSONList(stageList)));
+ sb.append(getJSONKeyValue("done", query.isDone()));
+ sb.append(getJSONKeyValue("started", query.isStarted()));
+ sb.deleteCharAt(sb.length()-1);
+ sb.append("}");
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return getJSONQuery(getQueryPlan());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ return e.toString();
+ }
+ }
+
+ public String toThriftJSONString() throws IOException {
+ org.apache.hadoop.hive.ql.plan.api.Query q = getQueryPlan();
+ TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length()*5);
+ TJSONProtocol oprot = new TJSONProtocol(tmb);
+ try {
+ q.write(oprot);
+ } catch (TException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return q.toString();
+ }
+ return tmb.toString("UTF-8");
+ }
+
+ public String toBinaryString() throws IOException {
+ org.apache.hadoop.hive.ql.plan.api.Query q = getQueryPlan();
+ TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length()*5);
+ TBinaryProtocol oprot = new TBinaryProtocol(tmb);
+ try {
+ q.write(oprot);
+ } catch (TException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return q.toString();
+ }
+ byte[] buf = new byte[tmb.length()];
+ tmb.read(buf, 0, tmb.length());
+ return new String(buf);
+ //return getQueryPlan().toString();
+
+ }
+
+ public void setStarted() {
+ started.add(queryId);
+ }
+
+ public void setDone() {
+ done.add(queryId);
+ }
+
+ public Set<String> getStarted() {
+ return started;
+ }
+
+ public Set<String> getDone() {
+ return done;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java Wed Aug 19 21:02:57 2009
@@ -45,7 +45,7 @@
}
boolean firstRow = true;
- public void process(Object row, int tag)
+ public void processOp(Object row, int tag)
throws HiveException {
ObjectInspector rowInspector = inputObjInspectors[tag];
if (firstRow) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Wed Aug 19 21:02:57 2009
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -38,6 +39,10 @@
private ConditionalResolver resolver;
private Object resolverCtx;
+ public ConditionalTask() {
+ super();
+ }
+
public boolean isMapRedTask() {
for (Task<? extends Serializable> task : listTasks)
if (task.isMapRedTask())
@@ -54,14 +59,15 @@
return false;
}
- public void initialize (HiveConf conf) {
+ public void initialize (HiveConf conf, QueryPlan queryPlan) {
+ super.initialize(conf, queryPlan);
resTask = listTasks.get(resolver.getTaskId(conf, resolverCtx));
- resTask.initialize(conf);
+ resTask.initialize(conf, queryPlan);
}
@Override
public int execute() {
- return resTask.execute();
+ return resTask.executeTask();
}
/**
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Wed Aug 19 21:02:57 2009
@@ -35,6 +35,10 @@
private static final long serialVersionUID = 1L;
+ public CopyTask() {
+ super();
+ }
+
public int execute() {
FileSystem dstFs = null;
Path toPath = null;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Aug 19 21:02:57 2009
@@ -75,6 +75,7 @@
import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.QueryPlan;
/**
* DDLTask implementation
@@ -88,11 +89,15 @@
static final private int separator = Utilities.tabCode;
static final private int terminator = Utilities.newLineCode;
- public void initialize(HiveConf conf) {
- super.initialize(conf);
+ public DDLTask() {
+ super();
+ }
+
+ public void initialize(HiveConf conf, QueryPlan queryPlan) {
+ super.initialize(conf, queryPlan);
this.conf = conf;
}
-
+
public int execute() {
// Create the db
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Aug 19 21:02:57 2009
@@ -24,8 +24,6 @@
import java.net.URI;
import java.net.URLEncoder;
import java.net.URLDecoder;
-import java.net.URL;
-import java.net.URLClassLoader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.StringUtils;
@@ -43,24 +41,24 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.io.*;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.varia.NullAppender;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
public class ExecDriver extends Task<mapredWork> implements Serializable {
private static final long serialVersionUID = 1L;
transient protected JobConf job;
+ transient protected int mapProgress = 0;
+ transient protected int reduceProgress = 0;
/**
* Constructor when invoked from QL
@@ -99,8 +97,8 @@
/**
* Initialization when invoked from QL
*/
- public void initialize(HiveConf conf) {
- super.initialize(conf);
+ public void initialize(HiveConf conf, QueryPlan queryPlan) {
+ super.initialize(conf, queryPlan);
job = new JobConf(conf, ExecDriver.class);
// NOTE: initialize is only called if it is in non-local mode.
// In case it's in non-local mode, we need to move the SessionState files
@@ -191,11 +189,37 @@
+ " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
}
}
-
+
/**
- * from StreamJob.java
+ * This class contains the state of the running task
+ * Going forward, we will return this handle from execute
+ * and Driver can split execute into start, monitorProgess and postProcess
*/
- public RunningJob jobProgress(JobClient jc, RunningJob rj) throws IOException {
+ public static class ExecDriverTaskHandle extends TaskHandle {
+ JobClient jc;
+ RunningJob rj;
+ JobClient getJobClient() {
+ return jc;
+ }
+ RunningJob getRunningJob() {
+ return rj;
+ }
+ public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+ this.jc = jc;
+ this.rj = rj;
+ }
+ public void setRunningJob(RunningJob job) {
+ this.rj = job;
+ }
+ public Counters getCounters() throws IOException {
+ return rj.getCounters();
+ }
+ }
+
+ public void progress(TaskHandle taskHandle) throws IOException {
+ ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle;
+ JobClient jc = th.getJobClient();
+ RunningJob rj = th.getRunningJob();
String lastReport = "";
SimpleDateFormat dateFormat
= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
@@ -206,13 +230,16 @@
Thread.sleep(1000);
} catch (InterruptedException e) {
}
- rj = jc.getJob(rj.getJobID());
- String report = " map = " + Math.round(rj.mapProgress() * 100) + "%, reduce ="
- + Math.round(rj.reduceProgress() * 100) + "%";
+ th.setRunningJob(jc.getJob(rj.getJobID()));
+ updateCounters(th);
+
+ String report = " map = " + this.mapProgress + "%, reduce = " + this.reduceProgress + "%";
if (!report.equals(lastReport)
|| System.currentTimeMillis() >= reportTime + maxReportInterval) {
+ // write out serialized plan with counters to log file
+ // LOG.info(queryPlan);
String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
SessionState ss = SessionState.get();
if (ss != null) {
@@ -223,13 +250,21 @@
Keys.TASK_HADOOP_PROGRESS, output);
ss.getHiveHistory().progressTask(
SessionState.get().getQueryId(), this);
+ ss.getHiveHistory().logPlanProgress(queryPlan);
}
console.printInfo(output);
lastReport = report;
reportTime = System.currentTimeMillis();
}
}
- return rj;
+ setDone();
+ th.setRunningJob(jc.getJob(rj.getJobID()));
+ updateCounters(th);
+ SessionState ss = SessionState.get();
+ if (ss != null) {
+ ss.getHiveHistory().logPlanProgress(queryPlan);
+ }
+ //LOG.info(queryPlan);
}
/**
@@ -309,10 +344,46 @@
}
/**
+ * update counters relevant to this task
+ */
+ @Override
+ public void updateCounters(TaskHandle t) throws IOException {
+ ExecDriverTaskHandle th = (ExecDriverTaskHandle)t;
+ RunningJob rj = th.getRunningJob();
+ this.mapProgress = Math.round(rj.mapProgress() * 100);
+ this.reduceProgress = Math.round(rj.mapProgress() * 100);
+ taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long.valueOf(this.mapProgress));
+ taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long.valueOf(this.reduceProgress));
+ Counters ctrs = th.getCounters();
+ for (Operator<? extends Serializable> op: work.getAliasToWork().values()) {
+ op.updateCounters(ctrs);
+ }
+ if (work.getReducer() != null) {
+ work.getReducer().updateCounters(ctrs);
+ }
+ }
+
+ public boolean mapStarted() {
+ return mapProgress > 0;
+ }
+
+ public boolean reduceStarted() {
+ return reduceProgress > 0;
+ }
+
+ public boolean mapDone() {
+ return mapProgress == 100;
+ }
+
+ public boolean reduceDone() {
+ return reduceProgress == 100;
+ }
+
+
+ /**
* Execute a query plan using Hadoop
*/
public int execute() {
-
try {
setNumberOfReducers();
} catch(IOException e) {
@@ -377,6 +448,7 @@
if (pwd != null)
job.set(HiveConf.ConfVars.METASTOREPWD.varname, "HIVE");
JobClient jc = new JobClient(job);
+
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
@@ -391,10 +463,11 @@
runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL()
+ "&action=kill");
+ TaskHandle th = new ExecDriverTaskHandle(jc, rj);
jobInfo(rj);
- rj = jobProgress(jc, rj);
+ progress(th);
- if(rj == null) {
+ if (rj == null) {
// in the corner case where the running job has disappeared from JT memory
// remember that we did actually submit the job.
rj = orig_rj;
@@ -568,6 +641,7 @@
mapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
+
int ret = ed.execute();
if (ret != 0) {
System.out.println("Job Failed");
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Wed Aug 19 21:02:57 2009
@@ -44,6 +44,10 @@
public class ExplainTask extends Task<explainWork> implements Serializable {
private static final long serialVersionUID = 1L;
+ public ExplainTask() {
+ super();
+ }
+
public int execute() {
try {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Wed Aug 19 21:02:57 2009
@@ -38,7 +38,7 @@
initializeChildren(hconf);
}
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
forward(eval.evaluate(row), outputObjInspector);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Wed Aug 19 21:02:57 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.plan.fetchWork;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -70,8 +71,12 @@
private LazySimpleSerDe mSerde;
private int totalRows;
- public void initialize (HiveConf conf) {
- super.initialize(conf);
+ public FetchTask() {
+ super();
+ }
+
+ public void initialize (HiveConf conf, QueryPlan queryPlan) {
+ super.initialize(conf, queryPlan);
try {
// Create a file system handle
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Aug 19 21:02:57 2009
@@ -155,7 +155,16 @@
}
Writable recordValue;
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
+ // Since File Sink is a terminal operator, forward is not called - so, maintain the number of output rows explicitly
+ if (counterNameToEnum != null) {
+ ++this.outputRows;
+ if (this.outputRows % 1000 == 0) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ this.outputRows = 0;
+ }
+ }
+
try {
if (reporter != null)
reporter.progress();
@@ -173,10 +182,18 @@
}
}
+ @Override
public void close(boolean abort) throws HiveException {
+
if (state == State.CLOSE)
return;
-
+
+ // Since close of super class is not invoked, update counters
+ if (counterNameToEnum != null) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(numOutputRowsCntr, outputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ }
state = State.CLOSE;
if (!abort) {
if (outWriter != null) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Wed Aug 19 21:02:57 2009
@@ -61,7 +61,7 @@
initializeChildren(hconf);
}
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
ObjectInspector rowInspector = inputObjInspectors[tag];
if (conditionInspector == null) {
conditionInspector = (PrimitiveObjectInspector)conditionEvaluator.initialize(rowInspector);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Wed Aug 19 21:02:57 2009
@@ -31,7 +31,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void process(Object row, int tag)
+ public void processOp(Object row, int tag)
throws HiveException {
forward(row, inputObjInspectors[tag]);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Wed Aug 19 21:02:57 2009
@@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FunctionWork;
import org.apache.hadoop.hive.ql.plan.createFunctionDesc;
@@ -39,8 +40,12 @@
transient HiveConf conf;
- public void initialize(HiveConf conf) {
- super.initialize(conf);
+ public FunctionTask() {
+ super();
+ }
+
+ public void initialize(HiveConf conf, QueryPlan queryPlan) {
+ super.initialize(conf, queryPlan);
this.conf = conf;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Aug 19 21:02:57 2009
@@ -458,7 +458,7 @@
keysCurrentGroup.clear();
}
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
firstRow = false;
ObjectInspector rowInspector = inputObjInspectors[tag];
// Total number of input rows is needed for hash aggregation only
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Wed Aug 19 21:02:57 2009
@@ -40,7 +40,7 @@
initializeChildren(hconf);
}
- public void process(Object row, int tag)
+ public void processOp(Object row, int tag)
throws HiveException {
try {
// get alias
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Wed Aug 19 21:02:57 2009
@@ -40,7 +40,7 @@
currCount = 0;
}
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
if (currCount < limit) {
forward(row, inputObjInspectors[tag]);
currCount++;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Wed Aug 19 21:02:57 2009
@@ -197,7 +197,7 @@
}
@Override
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
try {
// get alias
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Aug 19 21:02:57 2009
@@ -306,7 +306,7 @@
}
}
- public void process(Object row, int tag)
+ public void processOp(Object row, int tag)
throws HiveException {
throw new HiveException("Hive 2 Internal error: should not be called!");
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Wed Aug 19 21:02:57 2009
@@ -43,6 +43,10 @@
private static final long serialVersionUID = 1L;
+ public MapRedTask() {
+ super();
+ }
+
public int execute() {
try {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Aug 19 21:02:57 2009
@@ -43,6 +43,10 @@
private static final long serialVersionUID = 1L;
+ public MoveTask() {
+ super();
+ }
+
public int execute() {
try {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Aug 19 21:02:57 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -30,6 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.explain;
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -37,6 +39,7 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Reporter;
/**
@@ -50,6 +53,25 @@
protected List<Operator<? extends Serializable>> childOperators;
protected List<Operator<? extends Serializable>> parentOperators;
+ protected String operatorId;
+ /**
+ * List of counter names associated with the operator
+ * It contains the following default counters
+ * NUM_INPUT_ROWS
+ * NUM_OUTPUT_ROWS
+ * TIME_TAKEN
+ * Individual operators can add to this list via addToCounterNames methods
+ */
+ protected ArrayList<String> counterNames;
+
+ /**
+ * Each operator has its own map of its counter names to disjoint
+ * ProgressCounter - it is populated at compile time and is read in
+ * at run-time while extracting the operator specific counts
+ */
+ protected HashMap<String, ProgressCounter> counterNameToEnum;
+
+
private static int seqId;
// It can be optimized later so that an operator operator (init/close) is performed
@@ -323,14 +345,27 @@
initialize(hconf, null);
}
+
/**
* Process the row.
* @param row The object representing the row.
* @param tag The tag of the row usually means which parent this row comes from.
* Rows with the same tag should have exactly the same rowInspector all the time.
*/
- public abstract void process(Object row, int tag) throws HiveException;
-
+ public abstract void processOp(Object row, int tag) throws HiveException;
+
+ /**
+ * Process the row.
+ * @param row The object representing the row.
+ * @param tag The tag of the row usually means which parent this row comes from.
+ * Rows with the same tag should have exactly the same rowInspector all the time.
+ */
+ public void process(Object row, int tag) throws HiveException {
+ preProcessCounter();
+ processOp(row, tag);
+ postProcessCounter();
+ }
+
// If a operator wants to do some work at the beginning of a group
public void startGroup() throws HiveException {
LOG.debug("Starting group");
@@ -345,7 +380,7 @@
LOG.debug("Start group Done");
}
- // If a operator wants to do some work at the beginning of a group
+ // If a operator wants to do some work at the end of a group
public void endGroup() throws HiveException {
LOG.debug("Ending group");
@@ -363,6 +398,12 @@
if (state == State.CLOSE)
return;
+ if (counterNameToEnum != null) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(numOutputRowsCntr, outputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ }
+
LOG.info(id + " forwarded " + cntr + " rows");
try {
@@ -457,6 +498,14 @@
}
protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+
+ if ((++outputRows % 1000) == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
+ }
+
if (LOG.isInfoEnabled()) {
cntr++;
if (cntr == nextCntr) {
@@ -608,4 +657,197 @@
Arrays.asList(fieldObjectInspectors));
}
+ /**
+ * All counter stuff below this
+ */
+
+ /**
+ * TODO This is a hack for hadoop 0.17 which only supports enum counters
+ */
+ public static enum ProgressCounter {
+ C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16,
+ C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32,
+ C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48,
+ C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64,
+ C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
+ C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96,
+ C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112,
+ C113, C114, C115, C116, C117, C118, C119, C120, C121, C122, C123, C124, C125, C126, C127, C128
+ };
+
+ private static int totalNumCntrs = 128;
+
+ /**
+ * populated at runtime from hadoop counters at run time in the client
+ */
+ transient protected Map<String, Long> counters;
+
+ /**
+ * keeps track of unique ProgressCounter enums used
+ * this value is used at compile time while assigning ProgressCounter
+ * enums to counter names
+ */
+ private static int lastEnumUsed;
+
+ transient protected long inputRows = 0;
+ transient protected long outputRows = 0;
+ transient protected long beginTime = 0;
+ transient protected long totalTime = 0;
+
+ /**
+ * this is called before operator process to buffer some counters
+ */
+ private void preProcessCounter()
+ {
+ inputRows++;
+
+ if (((inputRows % 1000) == 0) && (counterNameToEnum != null)) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ inputRows = 0 ;
+ totalTime = 0;
+ }
+ beginTime = System.currentTimeMillis();
+ }
+
+ /**
+ * this is called after operator process to buffer some counters
+ */
+ private void postProcessCounter()
+ {
+ totalTime += (System.currentTimeMillis() - beginTime);
+ }
+
+
+ /**
+ * this is called in operators in map or reduce tasks
+ * @param name
+ * @param amount
+ */
+ protected void incrCounter(String name, long amount)
+ {
+ String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name;
+ ProgressCounter pc = counterNameToEnum.get(counterName);
+
+ // Currently, we maintain 128 counters per plan - in case of a bigger tree, we may run out of them
+ if (pc == null)
+ LOG.warn("Using too many counters. Increase the total number of counters for " + counterName);
+ else if (reporter != null)
+ reporter.incrCounter(pc, amount);
+ }
+
+ public ArrayList<String> getCounterNames() {
+ return counterNames;
+ }
+
+ public void setCounterNames(ArrayList<String> counterNames) {
+ this.counterNames = counterNames;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public void initOperatorId() {
+ setOperatorId(getName() + "_" + this.id);
+ }
+
+ public void setOperatorId(String operatorId) {
+ this.operatorId = operatorId;
+ }
+
+ public Map<String, Long> getCounters() {
+ return counters;
+ }
+
+ /**
+ * called in ExecDriver.progress periodically
+ * @param ctrs counters from the running job
+ */
+ @SuppressWarnings("unchecked")
+ public void updateCounters(Counters ctrs) {
+ if (counters == null) {
+ counters = new HashMap<String, Long>();
+ }
+
+ // For some old unit tests, the counters will not be populated. Eventually, the old tests should be removed
+ if (counterNameToEnum == null)
+ return;
+
+ for (Map.Entry<String, ProgressCounter> counter: counterNameToEnum.entrySet()) {
+ counters.put(counter.getKey(), ctrs.getCounter(counter.getValue()));
+ }
+ // update counters of child operators
+ // this wont be an infinite loop since the operator graph is acyclic
+ // but, some operators may be updated more than once and that's ok
+ if (getChildren() != null) {
+ for (Node op: getChildren()) {
+ ((Operator<? extends Serializable>)op).updateCounters(ctrs);
+ }
+ }
+ }
+
+ // A given query can have multiple map-reduce jobs
+ public static void resetLastEnumUsed() {
+ lastEnumUsed = 0;
+ }
+
+ /**
+ * Called only in SemanticAnalyzer after all operators have added their
+ * own set of counter names
+ */
+ public void assignCounterNameToEnum() {
+ if (counterNameToEnum != null) {
+ return;
+ }
+ counterNameToEnum = new HashMap<String, ProgressCounter>();
+ for (String counterName: getCounterNames()) {
+ ++lastEnumUsed;
+
+ // TODO Hack for hadoop-0.17
+ // Currently, only maximum number of 'totalNumCntrs' can be used. If you want
+ // to add more counters, increase the number of counters in ProgressCounter
+ if (lastEnumUsed > totalNumCntrs) {
+ LOG.warn("Using too many counters. Increase the total number of counters");
+ return;
+ }
+ String enumName = "C" + lastEnumUsed;
+ ProgressCounter ctr = ProgressCounter.valueOf(enumName);
+ counterNameToEnum.put(counterName, ctr);
+ }
+ }
+
+ protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
+ protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
+ protected static String timeTakenCntr = "TIME_TAKEN";
+
+ public void initializeCounters() {
+ initOperatorId();
+ counterNames = new ArrayList<String>();
+ counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numInputRowsCntr);
+ counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numOutputRowsCntr);
+ counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + timeTakenCntr);
+ List<String> newCntrs = getAdditionalCounters();
+ if (newCntrs != null) {
+ counterNames.addAll(newCntrs);
+ }
+ }
+
+ /*
+ * By default, the list is empty - if an operator wants to add more counters, it should override this method
+ * and provide the new list.
+ */
+ private List<String> getAdditionalCounters() {
+ return null;
+ }
+
+ public HashMap<String, ProgressCounter> getCounterNameToEnum() {
+ return counterNameToEnum;
+ }
+
+ public void setCounterNameToEnum(HashMap<String, ProgressCounter> counterNameToEnum) {
+ this.counterNameToEnum = counterNameToEnum;
+ }
+
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Wed Aug 19 21:02:57 2009
@@ -60,7 +60,9 @@
for(opTuple o: opvec) {
if(o.descClass == opClass) {
try {
- return (Operator<T>)o.opClass.newInstance();
+ Operator<T> op = (Operator<T>)o.opClass.newInstance();
+ op.initializeCounters();
+ return op;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Aug 19 21:02:57 2009
@@ -126,7 +126,7 @@
boolean firstRow;
transient Random random;
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
if (firstRow) {
@@ -203,6 +203,12 @@
try {
if (out != null) {
out.collect(keyWritable, value);
+ // Since this is a terminal operator, update counters explicitly - forward is not called
+ ++this.outputRows;
+ if (this.outputRows % 1000 == 0) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ this.outputRows = 0;
+ }
}
} catch (IOException e) {
throw new HiveException (e);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Wed Aug 19 21:02:57 2009
@@ -258,7 +258,7 @@
}
Text text = new Text();
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
if(scriptError != null) {
throw new HiveException(scriptError);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Wed Aug 19 21:02:57 2009
@@ -58,7 +58,7 @@
initializeChildren(hconf);
}
- public void process(Object row, int tag)
+ public void processOp(Object row, int tag)
throws HiveException {
// Just forward the row as is
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Wed Aug 19 21:02:57 2009
@@ -37,7 +37,7 @@
* i.e table data is not only read by the mapper, this operator will be enhanced to read the table.
**/
@Override
- public void process(Object row, int tag)
+ public void processOp(Object row, int tag)
throws HiveException {
forward(row, inputObjInspectors[tag]);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Aug 19 21:02:57 2009
@@ -21,6 +21,7 @@
import java.io.*;
import java.util.*;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -38,11 +39,15 @@
public abstract class Task <T extends Serializable> implements Serializable {
private static final long serialVersionUID = 1L;
- transient boolean isdone;
+ transient protected boolean started;
+ transient protected boolean isdone;
transient protected HiveConf conf;
transient protected Hive db;
transient protected Log LOG;
transient protected LogHelper console;
+ transient protected QueryPlan queryPlan;
+ transient protected TaskHandle taskHandle;
+ transient protected Map<String, Long> taskCounters;
// Bean methods
@@ -51,13 +56,17 @@
public Task() {
isdone = false;
+ started = false;
LOG = LogFactory.getLog(this.getClass().getName());
+ this.taskCounters = new HashMap<String, Long>();
}
- public void initialize (HiveConf conf) {
+ public void initialize (HiveConf conf, QueryPlan queryPlan) {
+ this.queryPlan = queryPlan;
isdone = false;
+ started = false;
this.conf = conf;
-
+
SessionState ss = SessionState.get();
try {
if (ss == null) {
@@ -77,7 +86,45 @@
console = new LogHelper(LOG);
}
- public abstract int execute();
+ /**
+ * This method is called in the Driver on every task. It updates counters
+ * and calls execute(), which is overridden in each task
+ * @return return value of execute()
+ */
+ public int executeTask() {
+ try {
+ SessionState ss = SessionState.get();
+ this.setStarted();
+ if (ss != null) {
+ ss.getHiveHistory().logPlanProgress(queryPlan);
+ }
+ int retval = execute();
+ this.setDone();
+ if (ss != null) {
+ ss.getHiveHistory().logPlanProgress(queryPlan);
+ }
+ return retval;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ /**
+ * This method is overridden in each Task.
+ * TODO execute should return a TaskHandle.
+ * @return status of executing the task
+ */
+ protected abstract int execute();
+
+ /**
+ * Update the progress of the task within taskHandle and also
+ * dump the progress information to the history file
+ * @param taskHandle task handle returned by execute
+ * @throws IOException
+ */
+ public void progress(TaskHandle taskHandle) throws IOException {
+ // do nothing by default
+ }
// dummy method - FetchTask overwrites this
public boolean fetch(Vector<String> res) throws IOException {
@@ -135,6 +182,14 @@
}
}
+ public void setStarted() {
+ this.started = true;
+ }
+
+ public boolean started() {
+ return started;
+ }
+
public boolean done() {
return isdone;
}
@@ -182,4 +237,13 @@
public boolean hasReduce() {
return false;
}
+
+ public void updateCounters(TaskHandle th) throws IOException {
+ // default, do nothing
+ }
+
+ public Map<String, Long> getCounters() {
+ return taskCounters;
+ }
+
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java?rev=805973&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java Wed Aug 19 21:02:57 2009
@@ -0,0 +1,17 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.Counters;
+
+public class TaskHandle {
+ // The eventual goal is to monitor the progress of all the tasks, not only the map reduce task.
+ // The execute() method of the tasks will return immediately, and return a task specific handle to
+ // monitor the progress of that task.
+ // Right now, the behavior is kind of broken, ExecDriver's execute method calls progress - instead it should
+ // be invoked by Driver
+ public Counters getCounters() throws IOException {
+ // default implementation
+ return null;
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Wed Aug 19 21:02:57 2009
@@ -110,7 +110,7 @@
}
@Override
- public void process(Object row, int tag) throws HiveException {
+ public void processOp(Object row, int tag) throws HiveException {
StructObjectInspector soi = parentObjInspectors[tag];
List<? extends StructField> fields = parentFields[tag];
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Aug 19 21:02:57 2009
@@ -169,6 +169,8 @@
// workaround for java 1.5
e.setPersistenceDelegate( ExpressionTypes.class, new EnumDelegate() );
e.setPersistenceDelegate( groupByDesc.Mode.class, new EnumDelegate());
+ e.setPersistenceDelegate( Operator.ProgressCounter.class, new EnumDelegate());
+
e.writeObject(t);
e.close();
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Wed Aug 19 21:02:57 2009
@@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -65,7 +66,7 @@
private static final String DELIMITER = " ";
public static enum RecordTypes {
- QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd
+ QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd, Counters
};
public static enum Keys {
@@ -426,6 +427,18 @@
log(RecordTypes.TaskProgress, ti.hm);
}
+
+ /**
+ * write out counters
+ */
+ static Map<String, String> ctrmap = null;
+ public void logPlanProgress(QueryPlan plan) throws IOException {
+ if (ctrmap == null) {
+ ctrmap = new HashMap<String, String>();
+ }
+ ctrmap.put("plan", plan.toString());
+ log(RecordTypes.Counters, ctrmap);
+ }
/**
* Set the table to id map
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Aug 19 21:02:57 2009
@@ -4311,14 +4311,60 @@
// reduce sink does not have any kids - since the plan by now has been broken up into multiple
// tasks, iterate over all tasks.
// For each task, go over all operators recursively
- for(Task<? extends Serializable> rootTask: rootTasks)
+ for (Task<? extends Serializable> rootTask: rootTasks)
breakTaskTree(rootTask);
// For each task, set the key descriptor for the reducer
- for(Task<? extends Serializable> rootTask: rootTasks)
+ for (Task<? extends Serializable> rootTask: rootTasks)
setKeyDescTaskTree(rootTask);
+
+ // For each operator, generate the counters
+ for (Task<? extends Serializable> rootTask: rootTasks)
+ generateCountersTask(rootTask);
+ }
+
+ // loop over all the tasks recursviely
+ private void generateCountersTask(Task<? extends Serializable> task) {
+ if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) {
+ HashMap<String, Operator<? extends Serializable>> opMap = ((mapredWork)task.getWork()).getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator<? extends Serializable> op: opMap.values()) {
+ generateCountersOperator(op);
+ }
+ }
+
+ Operator <? extends Serializable> reducer = ((mapredWork)task.getWork()).getReducer();
+ if (reducer != null) {
+ LOG.info("Generating counters for operator " + reducer);
+ generateCountersOperator(reducer);
+ }
+ }
+ else if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask)task).getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks)
+ generateCountersTask(tsk);
+ }
+
+ // Start the counters from scratch - a hack for hadoop 17.
+ Operator.resetLastEnumUsed();
+
+ if (task.getChildTasks() == null)
+ return;
+
+ for (Task<? extends Serializable> childTask : task.getChildTasks())
+ generateCountersTask(childTask);
}
+
+ private void generateCountersOperator(Operator<? extends Serializable> op) {
+ op.assignCounterNameToEnum();
+ if (op.getChildOperators() == null)
+ return;
+
+ for (Operator<? extends Serializable> child: op.getChildOperators())
+ generateCountersOperator(child);
+ }
+
// loop over all the tasks recursviely
private void breakTaskTree(Task<? extends Serializable> task) {
@@ -4329,6 +4375,11 @@
breakOperatorTree(op);
}
}
+ else if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask)task).getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks)
+ breakTaskTree(tsk);
+ }
if (task.getChildTasks() == null)
return;
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java?rev=805973&r1=805972&r2=805973&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestAddPartition.java Wed Aug 19 21:02:57 2009
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.plan;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-
-public class TestAddPartition extends TestCase {
-
- private static final String PART1_NAME = "part1";
- private static final String PART2_NAME = "part2";
-
- public TestAddPartition() throws Exception {
- super();
- }
-
- public void testAddPartition() throws Exception {
- Configuration conf = new Configuration();
- HiveConf hiveConf = new HiveConf(conf, TestAddPartition.class);
- HiveMetaStoreClient client = null;
-
- try {
- client = new HiveMetaStoreClient(hiveConf);
-
- String dbName = "testdb";
- String tableName = "tablename";
-
- Table tbl = new Table();
- tbl.setTableName(tableName);
- tbl.setDbName(dbName);
- tbl.setParameters(new HashMap<String, String>());
-
- StorageDescriptor sd = new StorageDescriptor();
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
-
- List<FieldSchema> fss = new ArrayList<FieldSchema>();
- fss.add(new FieldSchema("name", Constants.STRING_TYPE_NAME, ""));
- sd.setCols(fss);
- tbl.setSd(sd);
-
- tbl.setPartitionKeys(new ArrayList<FieldSchema>());
- tbl.getPartitionKeys().add(
- new FieldSchema(PART1_NAME, Constants.STRING_TYPE_NAME, ""));
- tbl.getPartitionKeys().add(
- new FieldSchema(PART2_NAME, Constants.STRING_TYPE_NAME, ""));
-
- client.dropTable(dbName, tableName);
- client.dropDatabase(dbName);
-
- client.createDatabase(dbName, "newloc");
- client.createTable(tbl);
-
- tbl = client.getTable(dbName, tableName);
-
- List<String> partValues = new ArrayList<String>();
- partValues.add("value1");
- partValues.add("value2");
-
- Map<String, String> part1 = new HashMap<String, String>();
- part1.put(PART1_NAME, "value1");
- part1.put(PART2_NAME, "value2");
-
- List<Map<String, String>> partitions = new ArrayList<Map<String, String>>();
- partitions.add(part1);
-
- // no partitions yet
- List<Partition> parts = client.listPartitions(dbName, tableName,
- (short) -1);
- assertTrue(parts.isEmpty());
-
- String partitionLocation = PART1_NAME + Path.SEPARATOR + PART2_NAME;
- // add the partitions
- for (Map<String,String> map : partitions) {
- AddPartitionDesc addPartition = new AddPartitionDesc(dbName,
- tableName, map, partitionLocation);
- Task<DDLWork> task = TaskFactory.get(new DDLWork(addPartition), hiveConf);
- task.initialize(hiveConf);
- assertEquals(0, task.execute());
- }
-
- // should have one
- parts = client.listPartitions(dbName, tableName, (short) -1);
- assertEquals(1, parts.size());
- Partition insertedPart = parts.get(0);
- assertEquals(tbl.getSd().getLocation() + Path.SEPARATOR + partitionLocation,
- insertedPart.getSd().getLocation());
-
- client.dropPartition(dbName, tableName, insertedPart.getValues());
-
- // add without location specified
-
- AddPartitionDesc addPartition = new AddPartitionDesc(dbName, tableName, part1, null);
- Task<DDLWork> task = TaskFactory.get(new DDLWork(addPartition), hiveConf);
- task.initialize(hiveConf);
- assertEquals(0, task.execute());
- parts = client.listPartitions(dbName, tableName, (short) -1);
- assertEquals(1, parts.size());
-
- // see that this fails properly
- addPartition = new AddPartitionDesc(dbName, "doesnotexist", part1, null);
- task = TaskFactory.get(new DDLWork(addPartition), hiveConf);
- task.initialize(hiveConf);
- assertEquals(1, task.execute());
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
\ No newline at end of file