You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [2/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Thu Jan 21 10:37:58 2010
@@ -47,29 +47,25 @@
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
-
public class QueryPlan implements Serializable {
private static final long serialVersionUID = 1L;
-
- static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName());
- private String queryString;
- private BaseSemanticAnalyzer plan;
- private String queryId;
- private org.apache.hadoop.hive.ql.plan.api.Query query;
- private Map<String, Map<String, Long>> counters;
- private Set<String> done;
- private Set<String> started;
-
- private boolean add;
+ static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName());
+ private final String queryString;
+ private final BaseSemanticAnalyzer plan;
+ private final String queryId;
+ private final org.apache.hadoop.hive.ql.plan.api.Query query;
+ private final Map<String, Map<String, Long>> counters;
+ private final Set<String> done;
+ private final Set<String> started;
public QueryPlan(String queryString, BaseSemanticAnalyzer plan) {
this.queryString = queryString;
this.plan = plan;
- this.queryId = makeQueryId();
+ queryId = makeQueryId();
query = new org.apache.hadoop.hive.ql.plan.api.Query();
- query.setQueryId(this.queryId);
+ query.setQueryId(queryId);
query.putToQueryAttributes("queryString", this.queryString);
counters = new HashMap<String, Map<String, Long>>();
done = new HashSet<String>();
@@ -92,27 +88,31 @@
GregorianCalendar gc = new GregorianCalendar();
String userid = System.getProperty("user.name");
- return userid + "_" +
- String.format("%1$4d%2$02d%3$02d%4$02d%5$02d%5$02d", gc.get(Calendar.YEAR),
- gc.get(Calendar.MONTH) + 1,
- gc.get(Calendar.DAY_OF_MONTH),
- gc.get(Calendar.HOUR_OF_DAY),
- gc.get(Calendar.MINUTE), gc.get(Calendar.SECOND));
+ return userid
+ + "_"
+ + String.format("%1$4d%2$02d%3$02d%4$02d%5$02d%5$02d", gc
+ .get(Calendar.YEAR), gc.get(Calendar.MONTH) + 1, gc
+ .get(Calendar.DAY_OF_MONTH), gc.get(Calendar.HOUR_OF_DAY), gc
+ .get(Calendar.MINUTE), gc.get(Calendar.SECOND));
}
/**
* generate the operator graph and operator list for the given task based on
* the operators corresponding to that task
- * @param task api.Task which needs its operator graph populated
- * @param topOps the set of top operators from which the operator graph for the task
- * is hanging
+ *
+ * @param task
+ * api.Task which needs its operator graph populated
+ * @param topOps
+ * the set of top operators from which the operator graph for the
+ * task is hanging
*/
- private void populateOperatorGraph(org.apache.hadoop.hive.ql.plan.api.Task task,
+ private void populateOperatorGraph(
+ org.apache.hadoop.hive.ql.plan.api.Task task,
Collection<Operator<? extends Serializable>> topOps) {
-
+
task.setOperatorGraph(new org.apache.hadoop.hive.ql.plan.api.Graph());
task.getOperatorGraph().setNodeType(NodeType.OPERATOR);
-
+
Queue<Operator<? extends Serializable>> opsToVisit = new LinkedList<Operator<? extends Serializable>>();
Set<Operator<? extends Serializable>> opsVisited = new HashSet<Operator<? extends Serializable>>();
opsToVisit.addAll(topOps);
@@ -129,7 +129,7 @@
org.apache.hadoop.hive.ql.plan.api.Adjacency entry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
entry.setNode(op.getOperatorId());
- for (Operator<? extends Serializable> childOp: op.getChildOperators()) {
+ for (Operator<? extends Serializable> childOp : op.getChildOperators()) {
entry.addToChildren(childOp.getOperatorId());
if (!opsVisited.contains(childOp)) {
opsToVisit.add(childOp);
@@ -139,7 +139,7 @@
}
}
}
-
+
/**
* Populate api.QueryPlan from exec structures. This includes constructing the
* dependency graphs of stages and operators.
@@ -161,16 +161,17 @@
stage.setStageId(task.getId());
stage.setStageType(task.getType());
query.addToStageList(stage);
-
+
if (task instanceof ExecDriver) {
// populate map task
- ExecDriver mrTask = (ExecDriver)task;
+ ExecDriver mrTask = (ExecDriver) task;
org.apache.hadoop.hive.ql.plan.api.Task mapTask = new org.apache.hadoop.hive.ql.plan.api.Task();
mapTask.setTaskId(stage.getStageId() + "_MAP");
mapTask.setTaskType(TaskType.MAP);
stage.addToTaskList(mapTask);
- populateOperatorGraph(mapTask, mrTask.getWork().getAliasToWork().values());
-
+ populateOperatorGraph(mapTask, mrTask.getWork().getAliasToWork()
+ .values());
+
// populate reduce task
if (mrTask.hasReduce()) {
org.apache.hadoop.hive.ql.plan.api.Task reduceTask = new org.apache.hadoop.hive.ql.plan.api.Task();
@@ -181,8 +182,7 @@
reducerTopOps.add(mrTask.getWork().getReducer());
populateOperatorGraph(reduceTask, reducerTopOps);
}
- }
- else {
+ } else {
org.apache.hadoop.hive.ql.plan.api.Task otherTask = new org.apache.hadoop.hive.ql.plan.api.Task();
otherTask.setTaskId(stage.getStageId() + "_OTHER");
otherTask.setTaskType(TaskType.OTHER);
@@ -192,15 +192,15 @@
org.apache.hadoop.hive.ql.plan.api.Adjacency listEntry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
listEntry.setAdjacencyType(AdjacencyType.DISJUNCTIVE);
listEntry.setNode(task.getId());
- ConditionalTask t = (ConditionalTask)task;
-
- for (Task<? extends Serializable> listTask: t.getListTasks()) {
+ ConditionalTask t = (ConditionalTask) task;
+
+ for (Task<? extends Serializable> listTask : t.getListTasks()) {
if (t.getChildTasks() != null) {
org.apache.hadoop.hive.ql.plan.api.Adjacency childEntry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
childEntry.setAdjacencyType(AdjacencyType.DISJUNCTIVE);
childEntry.setNode(listTask.getId());
// done processing the task
- for (Task<? extends Serializable> childTask: t.getChildTasks()) {
+ for (Task<? extends Serializable> childTask : t.getChildTasks()) {
childEntry.addToChildren(childTask.getId());
if (!tasksVisited.contains(childTask)) {
tasksToVisit.add(childTask);
@@ -208,20 +208,19 @@
}
query.getStageGraph().addToAdjacencyList(childEntry);
}
-
+
listEntry.addToChildren(listTask.getId());
if (!tasksVisited.contains(listTask)) {
tasksToVisit.add(listTask);
}
}
query.getStageGraph().addToAdjacencyList(listEntry);
- }
- else if (task.getChildTasks() != null) {
+ } else if (task.getChildTasks() != null) {
org.apache.hadoop.hive.ql.plan.api.Adjacency entry = new org.apache.hadoop.hive.ql.plan.api.Adjacency();
entry.setAdjacencyType(AdjacencyType.CONJUNCTIVE);
entry.setNode(task.getId());
// done processing the task
- for (Task<? extends Serializable> childTask: task.getChildTasks()) {
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
entry.addToChildren(childTask.getId());
if (!tasksVisited.contains(childTask)) {
tasksToVisit.add(childTask);
@@ -233,37 +232,41 @@
}
/**
- * From the counters extracted via extractCounters(), update the counters
- * in the query plan
+ * From the counters extracted via extractCounters(), update the counters in
+ * the query plan
*/
private void updateCountersInQueryPlan() {
query.setStarted(started.contains(query.getQueryId()));
query.setDone(done.contains(query.getQueryId()));
- if (query.getStageList() != null)
- for (org.apache.hadoop.hive.ql.plan.api.Stage stage: query.getStageList()) {
- stage.setStarted(started.contains(stage.getStageId()));
- stage.setStageCounters(counters.get(stage.getStageId()));
- stage.setDone(done.contains(stage.getStageId()));
- for (org.apache.hadoop.hive.ql.plan.api.Task task: stage.getTaskList()) {
- task.setTaskCounters(counters.get(task.getTaskId()));
- if (task.getTaskType() == TaskType.OTHER) {
- task.setStarted(started.contains(stage.getStageId()));
- task.setDone(done.contains(stage.getStageId()));
- } else {
- task.setStarted(started.contains(task.getTaskId()));
- task.setDone(done.contains(task.getTaskId()));
- for (org.apache.hadoop.hive.ql.plan.api.Operator op: task.getOperatorList()) {
- // if the task has started, all operators within the task have started
- op.setStarted(started.contains(task.getTaskId()));
- op.setOperatorCounters(counters.get(op.getOperatorId()));
- // if the task is done, all operators are done as well
- op.setDone(done.contains(task.getTaskId()));
+ if (query.getStageList() != null) {
+ for (org.apache.hadoop.hive.ql.plan.api.Stage stage : query
+ .getStageList()) {
+ stage.setStarted(started.contains(stage.getStageId()));
+ stage.setStageCounters(counters.get(stage.getStageId()));
+ stage.setDone(done.contains(stage.getStageId()));
+ for (org.apache.hadoop.hive.ql.plan.api.Task task : stage.getTaskList()) {
+ task.setTaskCounters(counters.get(task.getTaskId()));
+ if (task.getTaskType() == TaskType.OTHER) {
+ task.setStarted(started.contains(stage.getStageId()));
+ task.setDone(done.contains(stage.getStageId()));
+ } else {
+ task.setStarted(started.contains(task.getTaskId()));
+ task.setDone(done.contains(task.getTaskId()));
+ for (org.apache.hadoop.hive.ql.plan.api.Operator op : task
+ .getOperatorList()) {
+ // if the task has started, all operators within the task have
+ // started
+ op.setStarted(started.contains(task.getTaskId()));
+ op.setOperatorCounters(counters.get(op.getOperatorId()));
+ // if the task is done, all operators are done as well
+ op.setDone(done.contains(task.getTaskId()));
+ }
}
}
}
}
}
-
+
/**
* extract all the counters from tasks and operators
*/
@@ -276,7 +279,7 @@
tasksVisited.add(task);
// add children to tasksToVisit
if (task.getChildTasks() != null) {
- for (Task<? extends Serializable> childTask: task.getChildTasks()) {
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
if (!tasksVisited.contains(childTask)) {
tasksToVisit.add(childTask);
}
@@ -298,8 +301,9 @@
done.add(task.getId());
}
if (task instanceof ExecDriver) {
- ExecDriver mrTask = (ExecDriver)task;
- extractOperatorCounters(mrTask.getWork().getAliasToWork().values(), task.getId() + "_MAP");
+ ExecDriver mrTask = (ExecDriver) task;
+ extractOperatorCounters(mrTask.getWork().getAliasToWork().values(),
+ task.getId() + "_MAP");
if (mrTask.mapStarted()) {
started.add(task.getId() + "_MAP");
}
@@ -317,10 +321,9 @@
done.add(task.getId() + "_REDUCE");
}
}
- }
- else if (task instanceof ConditionalTask) {
- ConditionalTask cTask = (ConditionalTask)task;
- for (Task<? extends Serializable> listTask: cTask.getListTasks()) {
+ } else if (task instanceof ConditionalTask) {
+ ConditionalTask cTask = (ConditionalTask) task;
+ for (Task<? extends Serializable> listTask : cTask.getListTasks()) {
if (!tasksVisited.contains(listTask)) {
tasksToVisit.add(listTask);
}
@@ -329,7 +332,8 @@
}
}
- private void extractOperatorCounters(Collection<Operator<? extends Serializable>> topOps, String taskId) {
+ private void extractOperatorCounters(
+ Collection<Operator<? extends Serializable>> topOps, String taskId) {
Queue<Operator<? extends Serializable>> opsToVisit = new LinkedList<Operator<? extends Serializable>>();
Set<Operator<? extends Serializable>> opsVisited = new HashSet<Operator<? extends Serializable>>();
opsToVisit.addAll(topOps);
@@ -341,7 +345,7 @@
done.add(op.getOperatorId());
}
if (op.getChildOperators() != null) {
- for (Operator<? extends Serializable> childOp: op.getChildOperators()) {
+ for (Operator<? extends Serializable> childOp : op.getChildOperators()) {
if (!opsVisited.contains(childOp)) {
opsToVisit.add(childOp);
}
@@ -351,7 +355,8 @@
}
- public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException {
+ public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan()
+ throws IOException {
if (query.getStageGraph() == null) {
populateQueryPlan();
}
@@ -372,7 +377,7 @@
}
public String getJSONKeyValue(Object key, Object value) {
- return "\"" + key + "\":" + getJSONValue(value) + ",";
+ return "\"" + key + "\":" + getJSONValue(value) + ",";
}
@SuppressWarnings("unchecked")
@@ -382,11 +387,11 @@
}
StringBuilder sb = new StringBuilder();
sb.append("[");
- for (Object entry: list) {
+ for (Object entry : list) {
sb.append(getJSONValue(entry));
sb.append(",");
}
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("]");
return sb.toString();
}
@@ -398,11 +403,11 @@
}
StringBuilder sb = new StringBuilder();
sb.append("{");
- for (Object entry: map.entrySet()) {
- Map.Entry e = (Map.Entry)entry;
+ for (Object entry : map.entrySet()) {
+ Map.Entry e = (Map.Entry) entry;
sb.append(getJSONKeyValue(e.getKey(), e.getValue()));
}
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
@@ -418,17 +423,19 @@
// adjacency list
List<String> adjList = new ArrayList<String>();
if (graph.getAdjacencyList() != null) {
- for (org.apache.hadoop.hive.ql.plan.api.Adjacency adj: graph.getAdjacencyList()) {
+ for (org.apache.hadoop.hive.ql.plan.api.Adjacency adj : graph
+ .getAdjacencyList()) {
adjList.add(getJSONAdjacency(adj));
}
}
sb.append(getJSONKeyValue("adjacencyList", getJSONList(adjList)));
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
- private String getJSONAdjacency(org.apache.hadoop.hive.ql.plan.api.Adjacency adj) {
+ private String getJSONAdjacency(
+ org.apache.hadoop.hive.ql.plan.api.Adjacency adj) {
if (adj == null) {
return "null";
}
@@ -437,7 +444,7 @@
sb.append(getJSONKeyValue("node", adj.getNode()));
sb.append(getJSONKeyValue("children", getJSONList(adj.getChildren())));
sb.append(getJSONKeyValue("adjacencyType", adj.getAdjacencyType()));
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
@@ -447,11 +454,13 @@
sb.append("{");
sb.append(getJSONKeyValue("operatorId", op.getOperatorId()));
sb.append(getJSONKeyValue("operatorType", op.getOperatorType()));
- sb.append(getJSONKeyValue("operatorAttributes", getJSONMap(op.getOperatorAttributes())));
- sb.append(getJSONKeyValue("operatorCounters", getJSONMap(op.getOperatorCounters())));
+ sb.append(getJSONKeyValue("operatorAttributes", getJSONMap(op
+ .getOperatorAttributes())));
+ sb.append(getJSONKeyValue("operatorCounters", getJSONMap(op
+ .getOperatorCounters())));
sb.append(getJSONKeyValue("done", op.isDone()));
sb.append(getJSONKeyValue("started", op.isStarted()));
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
@@ -461,20 +470,24 @@
sb.append("{");
sb.append(getJSONKeyValue("taskId", task.getTaskId()));
sb.append(getJSONKeyValue("taskType", task.getTaskType()));
- sb.append(getJSONKeyValue("taskAttributes", getJSONMap(task.getTaskAttributes())));
- sb.append(getJSONKeyValue("taskCounters", getJSONMap(task.getTaskCounters())));
- sb.append(getJSONKeyValue("operatorGraph", getJSONGraph(task.getOperatorGraph())));
+ sb.append(getJSONKeyValue("taskAttributes", getJSONMap(task
+ .getTaskAttributes())));
+ sb.append(getJSONKeyValue("taskCounters",
+ getJSONMap(task.getTaskCounters())));
+ sb.append(getJSONKeyValue("operatorGraph", getJSONGraph(task
+ .getOperatorGraph())));
// operator list
List<String> opList = new ArrayList<String>();
if (task.getOperatorList() != null) {
- for (org.apache.hadoop.hive.ql.plan.api.Operator op: task.getOperatorList()) {
+ for (org.apache.hadoop.hive.ql.plan.api.Operator op : task
+ .getOperatorList()) {
opList.add(getJSONOperator(op));
}
}
sb.append(getJSONKeyValue("operatorList", getJSONList(opList)));
sb.append(getJSONKeyValue("done", task.isDone()));
sb.append(getJSONKeyValue("started", task.isStarted()));
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
@@ -484,18 +497,20 @@
sb.append("{");
sb.append(getJSONKeyValue("stageId", stage.getStageId()));
sb.append(getJSONKeyValue("stageType", stage.getStageType()));
- sb.append(getJSONKeyValue("stageAttributes", getJSONMap(stage.getStageAttributes())));
- sb.append(getJSONKeyValue("stageCounters", getJSONMap(stage.getStageCounters())));
+ sb.append(getJSONKeyValue("stageAttributes", getJSONMap(stage
+ .getStageAttributes())));
+ sb.append(getJSONKeyValue("stageCounters", getJSONMap(stage
+ .getStageCounters())));
List<String> taskList = new ArrayList<String>();
if (stage.getTaskList() != null) {
- for (org.apache.hadoop.hive.ql.plan.api.Task task: stage.getTaskList()) {
+ for (org.apache.hadoop.hive.ql.plan.api.Task task : stage.getTaskList()) {
taskList.add(getJSONTask(task));
}
}
sb.append(getJSONKeyValue("taskList", getJSONList(taskList)));
sb.append(getJSONKeyValue("done", stage.isDone()));
sb.append(getJSONKeyValue("started", stage.isStarted()));
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
@@ -505,20 +520,25 @@
sb.append("{");
sb.append(getJSONKeyValue("queryId", query.getQueryId()));
sb.append(getJSONKeyValue("queryType", query.getQueryType()));
- sb.append(getJSONKeyValue("queryAttributes", getJSONMap(query.getQueryAttributes())));
- sb.append(getJSONKeyValue("queryCounters", getJSONMap(query.getQueryCounters())));
- sb.append(getJSONKeyValue("stageGraph", getJSONGraph(query.getStageGraph())));
+ sb.append(getJSONKeyValue("queryAttributes", getJSONMap(query
+ .getQueryAttributes())));
+ sb.append(getJSONKeyValue("queryCounters", getJSONMap(query
+ .getQueryCounters())));
+ sb
+ .append(getJSONKeyValue("stageGraph", getJSONGraph(query
+ .getStageGraph())));
// stageList
List<String> stageList = new ArrayList<String>();
if (query.getStageList() != null) {
- for (org.apache.hadoop.hive.ql.plan.api.Stage stage: query.getStageList()) {
+ for (org.apache.hadoop.hive.ql.plan.api.Stage stage : query
+ .getStageList()) {
stageList.add(getJSONStage(stage));
}
}
sb.append(getJSONKeyValue("stageList", getJSONList(stageList)));
sb.append(getJSONKeyValue("done", query.isDone()));
sb.append(getJSONKeyValue("started", query.isStarted()));
- sb.deleteCharAt(sb.length()-1);
+ sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}
@@ -527,16 +547,15 @@
public String toString() {
try {
return getJSONQuery(getQueryPlan());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
return e.toString();
}
- }
+ }
public String toThriftJSONString() throws IOException {
org.apache.hadoop.hive.ql.plan.api.Query q = getQueryPlan();
- TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length()*5);
+ TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length() * 5);
TJSONProtocol oprot = new TJSONProtocol(tmb);
try {
q.write(oprot);
@@ -550,7 +569,7 @@
public String toBinaryString() throws IOException {
org.apache.hadoop.hive.ql.plan.api.Query q = getQueryPlan();
- TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length()*5);
+ TMemoryBuffer tmb = new TMemoryBuffer(q.toString().length() * 5);
TBinaryProtocol oprot = new TBinaryProtocol(tmb);
try {
q.write(oprot);
@@ -562,8 +581,8 @@
byte[] buf = new byte[tmb.length()];
tmb.read(buf, 0, tmb.length());
return new String(buf);
- //return getQueryPlan().toString();
-
+ // return getQueryPlan().toString();
+
}
public void setStarted() {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AmbiguousMethodException.java Thu Jan 21 10:37:58 2010
@@ -23,8 +23,9 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
- * Exception thrown by the UDF and UDAF method resolvers in case a unique method is not found.
- *
+ * Exception thrown by the UDF and UDAF method resolvers in case a unique method
+ * is not found.
+ *
*/
public class AmbiguousMethodException extends UDFArgumentException {
@@ -37,28 +38,31 @@
* The UDF or UDAF class that has the ambiguity.
*/
Class<?> funcClass;
-
+
/**
* The list of parameter types.
*/
List<TypeInfo> argTypeInfos;
-
+
/**
* Constructor.
*
- * @param funcClass The UDF or UDAF class.
- * @param argTypeInfos The list of argument types that lead to an ambiguity.
+ * @param funcClass
+ * The UDF or UDAF class.
+ * @param argTypeInfos
+ * The list of argument types that lead to an ambiguity.
*/
- public AmbiguousMethodException(Class<?> funcClass, List<TypeInfo> argTypeInfos) {
+ public AmbiguousMethodException(Class<?> funcClass,
+ List<TypeInfo> argTypeInfos) {
super("Ambiguous method for " + funcClass + " with " + argTypeInfos);
this.funcClass = funcClass;
this.argTypeInfos = argTypeInfos;
}
-
+
Class<?> getFunctionClass() {
return funcClass;
}
-
+
List<TypeInfo> getArgTypeList() {
return argTypeInfos;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java Thu Jan 21 10:37:58 2010
@@ -26,55 +26,61 @@
import org.apache.hadoop.mapred.Reporter;
/**
- * AutoProgressor periodically sends updates to the job tracker so that
- * it doesn't consider this task attempt dead if there is a long period of
+ * AutoProgressor periodically sends updates to the job tracker so that it
+ * doesn't consider this task attempt dead if there is a long period of
* inactivity.
*/
public class AutoProgressor {
protected Log LOG = LogFactory.getLog(this.getClass().getName());
- // Timer that reports every 5 minutes to the jobtracker. This ensures that
- // even if the operator returning rows for greater than that
- // duration, a progress report is sent to the tracker so that the tracker
+ // Timer that reports every 5 minutes to the jobtracker. This ensures that
+ // even if the operator returning rows for greater than that
+ // duration, a progress report is sent to the tracker so that the tracker
// does not think that the job is dead.
Timer rpTimer = null;
// Name of the class to report for
String logClassName = null;
int notificationInterval;
Reporter reporter;
-
+
class ReporterTask extends TimerTask {
-
+
/**
* Reporter to report progress to the jobtracker.
*/
private Reporter rp;
-
+
/**
* Constructor.
*/
public ReporterTask(Reporter rp) {
- if (rp != null)
+ if (rp != null) {
this.rp = rp;
+ }
}
-
+
@Override
public void run() {
if (rp != null) {
- LOG.info("ReporterTask calling reporter.progress() for " + logClassName);
+ LOG
+ .info("ReporterTask calling reporter.progress() for "
+ + logClassName);
rp.progress();
}
}
}
-
- AutoProgressor(String logClassName, Reporter reporter, int notificationInterval) {
+
+ AutoProgressor(String logClassName, Reporter reporter,
+ int notificationInterval) {
this.logClassName = logClassName;
this.reporter = reporter;
}
public void go() {
- LOG.info("Running ReporterTask every " + notificationInterval + " miliseconds.");
+ LOG.info("Running ReporterTask every " + notificationInterval
+ + " miliseconds.");
rpTimer = new Timer(true);
- rpTimer.scheduleAtFixedRate(new ReporterTask(reporter), 0, notificationInterval);
+ rpTimer.scheduleAtFixedRate(new ReporterTask(reporter), 0,
+ notificationInterval);
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ByteWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ByteWritable.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ByteWritable.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ByteWritable.java Thu Jan 21 10:37:58 2010
@@ -18,7 +18,10 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
@@ -28,67 +31,71 @@
public void write(DataOutput out) throws IOException {
out.writeByte(value);
}
+
public void readFields(DataInput in) throws IOException {
- value = (int)in.readByte();
+ value = in.readByte();
}
public ByteWritable(int b) {
value = b & 0xff;
}
+
public ByteWritable() {
value = 0;
}
- public void set (int b) {
+ public void set(int b) {
value = b & 0xff;
}
/** Compares two ByteWritables. */
public int compareTo(Object o) {
- int thisValue = this.value;
- int thatValue = ((ByteWritable)o).value;
- return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
+ int thisValue = value;
+ int thatValue = ((ByteWritable) o).value;
+ return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
}
- public boolean equals (Object o) {
- if(!(o instanceof ByteWritable)) {
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ByteWritable)) {
return false;
}
- ByteWritable that = (ByteWritable)o;
- if(this == that)
+ ByteWritable that = (ByteWritable) o;
+ if (this == that) {
return true;
+ }
- if(this.value == that.value)
+ if (value == that.value) {
return true;
- else
+ } else {
return false;
+ }
}
+ @Override
public int hashCode() {
return (value);
}
-
- /** A Comparator optimized for BytesWritable. */
+
+ /** A Comparator optimized for BytesWritable. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(ByteWritable.class);
}
-
+
/**
* Compare the buffers in serialized form.
*/
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- /** ok - we are implementing a dummy byte
- int a = b1[s1] & 0xff;
- int b = b2[s1] & 0xff;
- if(a!=b)
- return a -b;
- */
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ /**
+ * ok - we are implementing a dummy byte int a = b1[s1] & 0xff; int b =
+ * b2[s1] & 0xff; if(a!=b) return a -b;
+ */
return 0;
}
}
-
+
static {
// register this comparator
WritableComparator.define(ByteWritable.class, new Comparator());
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java Thu Jan 21 10:37:58 2010
@@ -18,42 +18,46 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.*;
-import java.io.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.collectDesc;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.conf.Configuration;
/**
* Buffers rows emitted by other operators
**/
-public class CollectOperator extends Operator <collectDesc> implements Serializable {
+public class CollectOperator extends Operator<collectDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
transient protected ArrayList<Object> rowList;
transient protected ObjectInspector standardRowInspector;
transient int maxSize;
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
- rowList = new ArrayList<Object> ();
+ rowList = new ArrayList<Object>();
maxSize = conf.getBufferSize().intValue();
}
boolean firstRow = true;
- public void processOp(Object row, int tag)
- throws HiveException {
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
ObjectInspector rowInspector = inputObjInspectors[tag];
if (firstRow) {
firstRow = false;
// Get the standard ObjectInspector of the row
- this.standardRowInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+ standardRowInspector = ObjectInspectorUtils
+ .getStandardObjectInspector(rowInspector);
}
-
+
if (rowList.size() < maxSize) {
// Create a standard copy of the object.
Object o = ObjectInspectorUtils.copyToStandardObject(row, rowInspector);
@@ -61,9 +65,9 @@
}
forward(row, rowInspector);
}
-
+
public void retrieve(InspectableObject result) {
- assert(result != null);
+ assert (result != null);
if (rowList.isEmpty()) {
result.o = null;
result.oi = null;
@@ -73,5 +77,4 @@
}
}
-
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Thu Jan 21 10:37:58 2010
@@ -18,16 +18,15 @@
package org.apache.hadoop.hive.ql.exec;
-import java.lang.Class;
-import java.io.*;
+import java.io.Serializable;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
- * Implementation for ColumnInfo which contains the internal name for the
- * column (the one that is used by the operator to access the column) and
- * the type (identified by a java class).
+ * Implementation for ColumnInfo which contains the internal name for the column
+ * (the one that is used by the operator to access the column) and the type
+ * (identified by a java class).
**/
public class ColumnInfo implements Serializable {
@@ -35,40 +34,41 @@
private static final long serialVersionUID = 1L;
private String internalName;
-
- private String alias = null; // [optional] alias of the column (external name as seen by the users)
+
+ private String alias = null; // [optional] alias of the column (external name
+ // as seen by the users)
/**
* Store the alias of the table where available.
*/
private String tabAlias;
-
+
/**
* Indicates whether the column is a partition column.
*/
private boolean isPartitionCol;
-
+
transient private TypeInfo type;
public ColumnInfo() {
}
- public ColumnInfo(String internalName, TypeInfo type,
- String tabAlias, boolean isPartitionCol) {
+ public ColumnInfo(String internalName, TypeInfo type, String tabAlias,
+ boolean isPartitionCol) {
this.internalName = internalName;
this.type = type;
this.tabAlias = tabAlias;
this.isPartitionCol = isPartitionCol;
}
-
- public ColumnInfo(String internalName, Class type,
- String tabAlias, boolean isPartitionCol) {
+
+ public ColumnInfo(String internalName, Class type, String tabAlias,
+ boolean isPartitionCol) {
this.internalName = internalName;
this.type = TypeInfoFactory.getPrimitiveTypeInfoFromPrimitiveWritable(type);
this.tabAlias = tabAlias;
this.isPartitionCol = isPartitionCol;
}
-
+
public TypeInfo getType() {
return type;
}
@@ -76,7 +76,7 @@
public String getInternalName() {
return internalName;
}
-
+
public void setType(TypeInfo type) {
this.type = type;
}
@@ -86,25 +86,27 @@
}
public String getTabAlias() {
- return this.tabAlias;
+ return tabAlias;
}
-
+
public boolean getIsPartitionCol() {
- return this.isPartitionCol;
+ return isPartitionCol;
}
+
/**
* Returns the string representation of the ColumnInfo.
*/
+ @Override
public String toString() {
return internalName + ": " + type;
}
-
+
public void setAlias(String col_alias) {
alias = col_alias;
}
-
+
public String getAlias() {
return alias;
}
-
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Thu Jan 21 10:37:58 2010
@@ -31,6 +31,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
@@ -45,16 +46,17 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Join operator implementation.
*/
-public abstract class CommonJoinOperator<T extends joinDesc> extends Operator<T> implements Serializable {
+public abstract class CommonJoinOperator<T extends joinDesc> extends
+ Operator<T> implements Serializable {
private static final long serialVersionUID = 1L;
- static final protected Log LOG = LogFactory.getLog(CommonJoinOperator.class.getName());
+ static final protected Log LOG = LogFactory.getLog(CommonJoinOperator.class
+ .getName());
public static class IntermediateObject {
ArrayList<Object>[] objs;
@@ -82,7 +84,7 @@
}
public Object topObj() {
- return objs[curSize-1];
+ return objs[curSize - 1];
}
}
@@ -100,28 +102,38 @@
*/
transient protected Map<Byte, List<ObjectInspector>> joinValuesStandardObjectInspectors;
- transient static protected Byte[] order; // order in which the results should be output
+ transient static protected Byte[] order; // order in which the results should
+ // be output
transient protected joinCond[] condn;
transient protected boolean noOuterJoin;
transient private Object[] dummyObj; // for outer joins, contains the
- // potential nulls for the concerned
- // aliases
- transient protected RowContainer<ArrayList<Object>>[] dummyObjVectors; // empty rows for each table
+ // potential nulls for the concerned
+ // aliases
+ transient protected RowContainer<ArrayList<Object>>[] dummyObjVectors; // empty
+ // rows
+ // for
+ // each
+ // table
transient protected int totalSz; // total size of the composite object
- // keys are the column names. basically this maps the position of the column in
+ // keys are the column names. basically this maps the position of the column
+ // in
// the output of the CommonJoinOperator to the input columnInfo.
transient private Map<Integer, Set<String>> posToAliasMap;
-
+
transient LazyBinarySerDe[] spillTableSerDe;
- transient protected Map<Byte, tableDesc> spillTableDesc; // spill tables are used if the join input is too large to fit in memory
+ transient protected Map<Byte, tableDesc> spillTableDesc; // spill tables are
+ // used if the join
+ // input is too large
+ // to fit in memory
- HashMap<Byte, RowContainer<ArrayList<Object>>> storage; // map b/w table alias to RowContainer
+ HashMap<Byte, RowContainer<ArrayList<Object>>> storage; // map b/w table alias
+ // to RowContainer
int joinEmitInterval = -1;
int joinCacheSize = 0;
int nextSz = 0;
transient Byte lastAlias = null;
-
+
transient boolean handleSkewJoin = false;
protected int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
@@ -129,19 +141,21 @@
int total = 0;
- Iterator<Map.Entry<Byte, List<exprNodeDesc>>> entryIter = inputMap.entrySet().iterator();
+ Iterator<Map.Entry<Byte, List<exprNodeDesc>>> entryIter = inputMap
+ .entrySet().iterator();
while (entryIter.hasNext()) {
- Map.Entry<Byte, List<exprNodeDesc>> e = (Map.Entry<Byte, List<exprNodeDesc>>) entryIter.next();
+ Map.Entry<Byte, List<exprNodeDesc>> e = entryIter.next();
Byte key = order[e.getKey()];
- List<exprNodeDesc> expr = (List<exprNodeDesc>) e.getValue();
+ List<exprNodeDesc> expr = e.getValue();
int sz = expr.size();
total += sz;
List<ExprNodeEvaluator> valueFields = new ArrayList<ExprNodeEvaluator>();
- for (int j = 0; j < sz; j++)
+ for (int j = 0; j < sz; j++) {
valueFields.add(ExprNodeEvaluatorFactory.get(expr.get(j)));
+ }
outMap.put(key, valueFields);
}
@@ -150,14 +164,15 @@
}
protected static HashMap<Byte, List<ObjectInspector>> getObjectInspectorsFromEvaluators(
- Map<Byte, List<ExprNodeEvaluator>> exprEntries, ObjectInspector[] inputObjInspector)
- throws HiveException {
+ Map<Byte, List<ExprNodeEvaluator>> exprEntries,
+ ObjectInspector[] inputObjInspector) throws HiveException {
HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
- for(Entry<Byte, List<ExprNodeEvaluator>> exprEntry : exprEntries.entrySet()) {
+ for (Entry<Byte, List<ExprNodeEvaluator>> exprEntry : exprEntries
+ .entrySet()) {
Byte alias = exprEntry.getKey();
List<ExprNodeEvaluator> exprList = exprEntry.getValue();
ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
- for (int i=0; i<exprList.size(); i++) {
+ for (int i = 0; i < exprList.size(); i++) {
fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias]));
}
result.put(alias, fieldOIList);
@@ -168,13 +183,15 @@
protected static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
Map<Byte, List<ObjectInspector>> aliasToObjectInspectors) {
HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
- for(Entry<Byte, List<ObjectInspector>> oiEntry: aliasToObjectInspectors.entrySet()) {
+ for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors
+ .entrySet()) {
Byte alias = oiEntry.getKey();
List<ObjectInspector> oiList = oiEntry.getValue();
- ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
- for (int i=0; i<oiList.size(); i++) {
- fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
- ObjectInspectorCopyOption.WRITABLE));
+ ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(
+ oiList.size());
+ for (int i = 0; i < oiList.size(); i++) {
+ fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList
+ .get(i), ObjectInspectorCopyOption.WRITABLE));
}
result.put(alias, fieldOIList);
}
@@ -182,8 +199,9 @@
}
- protected static <T extends joinDesc> ObjectInspector getJoinOutputObjectInspector(Byte[] order,
- Map<Byte, List<ObjectInspector>> aliasToObjectInspectors, T conf) {
+ protected static <T extends joinDesc> ObjectInspector getJoinOutputObjectInspector(
+ Byte[] order, Map<Byte, List<ObjectInspector>> aliasToObjectInspectors,
+ T conf) {
ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
for (Byte alias : order) {
List<ObjectInspector> oiList = aliasToObjectInspectors.get(alias);
@@ -191,15 +209,19 @@
}
StructObjectInspector joinOutputObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(conf.getOutputColumnNames(), structFieldObjectInspectors);
+ .getStandardStructObjectInspector(conf.getOutputColumnNames(),
+ structFieldObjectInspectors);
return joinOutputObjectInspector;
}
Configuration hconf;
+
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
this.handleSkewJoin = conf.getHandleSkewJoin();
this.hconf = hconf;
- LOG.info("COMMONJOIN " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
+ LOG.info("COMMONJOIN "
+ + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
totalSz = 0;
// Map that contains the rows for each alias
storage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
@@ -216,32 +238,37 @@
totalSz = populateJoinKeyValue(joinValues, conf.getExprs());
- joinValuesObjectInspectors = getObjectInspectorsFromEvaluators(joinValues, inputObjInspectors);
+ joinValuesObjectInspectors = getObjectInspectorsFromEvaluators(joinValues,
+ inputObjInspectors);
joinValuesStandardObjectInspectors = getStandardObjectInspectors(joinValuesObjectInspectors);
dummyObj = new Object[numAliases];
dummyObjVectors = new RowContainer[numAliases];
- joinEmitInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINEMITINTERVAL);
- joinCacheSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINCACHESIZE);
-
- // construct dummy null row (indicating empty table) and
- // construct spill table serde which is used if input is too
+ joinEmitInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVEJOINEMITINTERVAL);
+ joinCacheSize = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVEJOINCACHESIZE);
+
+ // construct dummy null row (indicating empty table) and
+ // construct spill table serde which is used if input is too
// large to fit into main memory.
byte pos = 0;
for (Byte alias : order) {
int sz = conf.getExprs().get(alias).size();
ArrayList<Object> nr = new ArrayList<Object>(sz);
-
- for (int j = 0; j < sz; j++)
+
+ for (int j = 0; j < sz; j++) {
nr.add(null);
+ }
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
- RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos, alias, 1);
+ RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos,
+ alias, 1);
values.add((ArrayList<Object>) dummyObj[pos]);
dummyObjVectors[pos] = values;
- // if serde is null, the input doesn't need to be spilled out
+ // if serde is null, the input doesn't need to be spilled out
// e.g., the output columns does not contains the input table
RowContainer rc = getRowContainer(hconf, pos, alias, joinCacheSize);
storage.put(pos, rc);
@@ -251,39 +278,45 @@
forwardCache = new Object[totalSz];
- outputObjInspector = getJoinOutputObjectInspector(order, joinValuesStandardObjectInspectors, conf);
- LOG.info("JOIN " + ((StructObjectInspector)outputObjInspector).getTypeName() + " totalsz = " + totalSz);
-
+ outputObjInspector = getJoinOutputObjectInspector(order,
+ joinValuesStandardObjectInspectors, conf);
+ LOG.info("JOIN "
+ + ((StructObjectInspector) outputObjInspector).getTypeName()
+ + " totalsz = " + totalSz);
+
}
- RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias, int containerSize)
- throws HiveException {
+ RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias,
+ int containerSize) throws HiveException {
tableDesc tblDesc = getSpillTableDesc(alias);
SerDe serde = getSpillSerDe(alias);
-
- if ( serde == null )
+
+ if (serde == null) {
containerSize = 1;
-
+ }
+
RowContainer rc = new RowContainer(containerSize, hconf);
StructObjectInspector rcOI = null;
- if(tblDesc != null) {
- // arbitrary column names used internally for serializing to spill table
+ if (tblDesc != null) {
+ // arbitrary column names used internally for serializing to spill table
List<String> colNames = Utilities.getColumnNames(tblDesc.getProperties());
// object inspector for serializing input tuples
rcOI = ObjectInspectorFactory.getStandardStructObjectInspector(colNames,
- joinValuesStandardObjectInspectors.get(pos));
+ joinValuesStandardObjectInspectors.get(pos));
}
rc.setSerDe(serde, rcOI);
rc.setTableDesc(tblDesc);
return rc;
}
-
+
private SerDe getSpillSerDe(byte alias) {
tableDesc desc = getSpillTableDesc(alias);
- if ( desc == null )
+ if (desc == null) {
return null;
- SerDe sd = (SerDe) ReflectionUtils.newInstance(desc.getDeserializerClass(), null);
+ }
+ SerDe sd = (SerDe) ReflectionUtils.newInstance(desc.getDeserializerClass(),
+ null);
try {
sd.initialize(null, desc.getProperties());
} catch (SerDeException e) {
@@ -292,64 +325,72 @@
}
return sd;
}
-
+
transient boolean newGroupStarted = false;
-
+
public tableDesc getSpillTableDesc(Byte alias) {
- if(spillTableDesc == null || spillTableDesc.size() == 0)
+ if (spillTableDesc == null || spillTableDesc.size() == 0) {
initSpillTables();
+ }
return spillTableDesc.get(alias);
}
-
+
public Map<Byte, tableDesc> getSpillTableDesc() {
- if(spillTableDesc == null)
+ if (spillTableDesc == null) {
initSpillTables();
+ }
return spillTableDesc;
}
-
+
private void initSpillTables() {
Map<Byte, List<exprNodeDesc>> exprs = conf.getExprs();
spillTableDesc = new HashMap<Byte, tableDesc>(exprs.size());
for (int tag = 0; tag < exprs.size(); tag++) {
- List<exprNodeDesc> valueCols = exprs.get((byte)tag);
+ List<exprNodeDesc> valueCols = exprs.get((byte) tag);
int columnSize = valueCols.size();
StringBuffer colNames = new StringBuffer();
StringBuffer colTypes = new StringBuffer();
- if ( columnSize <= 0 )
+ if (columnSize <= 0) {
continue;
+ }
for (int k = 0; k < columnSize; k++) {
- String newColName = tag + "_VALUE_" + k; // any name, it does not matter.
+ String newColName = tag + "_VALUE_" + k; // any name, it does not
+ // matter.
colNames.append(newColName);
- colNames.append(',');
- colTypes.append(valueCols.get(k).getTypeString());
- colTypes.append(',');
+ colNames.append(',');
+ colTypes.append(valueCols.get(k).getTypeString());
+ colTypes.append(',');
}
// remove the last ','
- colNames.setLength(colNames.length()-1);
- colTypes.setLength(colTypes.length()-1);
- tableDesc tblDesc =
- new tableDesc(LazyBinarySerDe.class,
- SequenceFileInputFormat.class,
- HiveSequenceFileOutputFormat.class,
- Utilities.makeProperties(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, colNames.toString(),
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes.toString()));
- spillTableDesc.put((byte)tag, tblDesc);
+ colNames.setLength(colNames.length() - 1);
+ colTypes.setLength(colTypes.length() - 1);
+ tableDesc tblDesc = new tableDesc(LazyBinarySerDe.class,
+ SequenceFileInputFormat.class, HiveSequenceFileOutputFormat.class,
+ Utilities.makeProperties(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, ""
+ + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, colNames
+ .toString(),
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
+ colTypes.toString()));
+ spillTableDesc.put((byte) tag, tblDesc);
}
}
-
+
+ @Override
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
newGroupStarted = true;
- for (RowContainer<ArrayList<Object>> alw: storage.values()) {
+ for (RowContainer<ArrayList<Object>> alw : storage.values()) {
alw.clear();
}
}
protected int getNextSize(int sz) {
// A very simple counter to keep track of join entries for a key
- if (sz >= 100000)
+ if (sz >= 100000) {
return sz + 100000;
+ }
return 2 * sz;
}
@@ -357,19 +398,19 @@
transient protected Byte alias;
/**
- * Return the value as a standard object.
- * StandardObject can be inspected by a standard ObjectInspector.
+ * Return the value as a standard object. StandardObject can be inspected by a
+ * standard ObjectInspector.
*/
protected static ArrayList<Object> computeValues(Object row,
- List<ExprNodeEvaluator> valueFields, List<ObjectInspector> valueFieldsOI) throws HiveException {
+ List<ExprNodeEvaluator> valueFields, List<ObjectInspector> valueFieldsOI)
+ throws HiveException {
// Compute the values
ArrayList<Object> nr = new ArrayList<Object>(valueFields.size());
- for (int i=0; i<valueFields.size(); i++) {
-
- nr.add((Object) ObjectInspectorUtils.copyToStandardObject(
- valueFields.get(i).evaluate(row),
- valueFieldsOI.get(i),
+ for (int i = 0; i < valueFields.size(); i++) {
+
+ nr.add(ObjectInspectorUtils.copyToStandardObject(valueFields.get(i)
+ .evaluate(row), valueFieldsOI.get(i),
ObjectInspectorCopyOption.WRITABLE));
}
@@ -399,15 +440,18 @@
}
private void copyOldArray(boolean[] src, boolean[] dest) {
- for (int i = 0; i < src.length; i++)
+ for (int i = 0; i < src.length; i++) {
dest[i] = src[i];
+ }
}
- private ArrayList<boolean[]> joinObjectsInnerJoin(ArrayList<boolean[]> resNulls,
- ArrayList<boolean[]> inputNulls, ArrayList<Object> newObj,
- IntermediateObject intObj, int left, boolean newObjNull) {
- if (newObjNull)
+ private ArrayList<boolean[]> joinObjectsInnerJoin(
+ ArrayList<boolean[]> resNulls, ArrayList<boolean[]> inputNulls,
+ ArrayList<Object> newObj, IntermediateObject intObj, int left,
+ boolean newObjNull) {
+ if (newObjNull) {
return resNulls;
+ }
Iterator<boolean[]> nullsIter = inputNulls.iterator();
while (nullsIter.hasNext()) {
boolean[] oldNulls = nullsIter.next();
@@ -425,14 +469,13 @@
/**
* Implement semi join operator.
*/
- private ArrayList<boolean[]> joinObjectsLeftSemiJoin(ArrayList<boolean[]> resNulls,
- ArrayList<boolean[]> inputNulls,
- ArrayList<Object> newObj,
- IntermediateObject intObj,
- int left,
- boolean newObjNull) {
- if (newObjNull)
+ private ArrayList<boolean[]> joinObjectsLeftSemiJoin(
+ ArrayList<boolean[]> resNulls, ArrayList<boolean[]> inputNulls,
+ ArrayList<Object> newObj, IntermediateObject intObj, int left,
+ boolean newObjNull) {
+ if (newObjNull) {
return resNulls;
+ }
Iterator<boolean[]> nullsIter = inputNulls.iterator();
while (nullsIter.hasNext()) {
boolean[] oldNulls = nullsIter.next();
@@ -457,10 +500,11 @@
boolean oldObjNull = oldNulls[left];
boolean[] newNulls = new boolean[intObj.getCurSize()];
copyOldArray(oldNulls, newNulls);
- if (oldObjNull)
+ if (oldObjNull) {
newNulls[oldNulls.length] = true;
- else
+ } else {
newNulls[oldNulls.length] = newObjNull;
+ }
resNulls.add(newNulls);
}
return resNulls;
@@ -470,14 +514,16 @@
ArrayList<boolean[]> resNulls, ArrayList<boolean[]> inputNulls,
ArrayList<Object> newObj, IntermediateObject intObj, int left,
boolean newObjNull, boolean firstRow) {
- if (newObjNull)
+ if (newObjNull) {
return resNulls;
+ }
if (inputNulls.isEmpty() && firstRow) {
boolean[] newNulls = new boolean[intObj.getCurSize()];
- for (int i = 0; i < intObj.getCurSize() - 1; i++)
+ for (int i = 0; i < intObj.getCurSize() - 1; i++) {
newNulls[i] = true;
- newNulls[intObj.getCurSize()-1] = newObjNull;
+ }
+ newNulls[intObj.getCurSize() - 1] = newObjNull;
resNulls.add(newNulls);
return resNulls;
}
@@ -505,8 +551,9 @@
resNulls.add(newNulls);
} else if (allOldObjsNull) {
boolean[] newNulls = new boolean[intObj.getCurSize()];
- for (int i = 0; i < intObj.getCurSize() - 1; i++)
+ for (int i = 0; i < intObj.getCurSize() - 1; i++) {
newNulls[i] = true;
+ }
newNulls[oldNulls.length] = newObjNull;
resNulls.add(newNulls);
return resNulls;
@@ -533,9 +580,10 @@
if (inputNulls.isEmpty() && firstRow) {
boolean[] newNulls = new boolean[intObj.getCurSize()];
- for (int i = 0; i < intObj.getCurSize() - 1; i++)
+ for (int i = 0; i < intObj.getCurSize() - 1; i++) {
newNulls[i] = true;
- newNulls[intObj.getCurSize()-1] = newObjNull;
+ }
+ newNulls[intObj.getCurSize() - 1] = newObjNull;
resNulls.add(newNulls);
return resNulls;
}
@@ -570,8 +618,9 @@
if (allOldObjsNull && !rhsPreserved) {
newNulls = new boolean[intObj.getCurSize()];
- for (int i = 0; i < oldNulls.length; i++)
+ for (int i = 0; i < oldNulls.length; i++) {
newNulls[i] = true;
+ }
newNulls[oldNulls.length] = false;
resNulls.add(newNulls);
rhsPreserved = true;
@@ -589,13 +638,14 @@
* inner join. The outer joins are processed appropriately.
*/
private ArrayList<boolean[]> joinObjects(ArrayList<boolean[]> inputNulls,
- ArrayList<Object> newObj, IntermediateObject intObj,
- int joinPos, boolean firstRow) {
+ ArrayList<Object> newObj, IntermediateObject intObj, int joinPos,
+ boolean firstRow) {
ArrayList<boolean[]> resNulls = new ArrayList<boolean[]>();
boolean newObjNull = newObj == dummyObj[joinPos] ? true : false;
if (joinPos == 0) {
- if (newObjNull)
+ if (newObjNull) {
return null;
+ }
boolean[] nulls = new boolean[1];
nulls[0] = newObjNull;
resNulls.add(nulls);
@@ -609,32 +659,35 @@
if (((type == joinDesc.RIGHT_OUTER_JOIN) || (type == joinDesc.FULL_OUTER_JOIN))
&& !newObjNull && (inputNulls == null) && firstRow) {
boolean[] newNulls = new boolean[intObj.getCurSize()];
- for (int i = 0; i < newNulls.length - 1; i++)
+ for (int i = 0; i < newNulls.length - 1; i++) {
newNulls[i] = true;
+ }
newNulls[newNulls.length - 1] = false;
resNulls.add(newNulls);
return resNulls;
}
- if (inputNulls == null)
+ if (inputNulls == null) {
return null;
+ }
- if (type == joinDesc.INNER_JOIN)
+ if (type == joinDesc.INNER_JOIN) {
return joinObjectsInnerJoin(resNulls, inputNulls, newObj, intObj, left,
newObjNull);
- else if (type == joinDesc.LEFT_OUTER_JOIN)
+ } else if (type == joinDesc.LEFT_OUTER_JOIN) {
return joinObjectsLeftOuterJoin(resNulls, inputNulls, newObj, intObj,
left, newObjNull);
- else if (type == joinDesc.RIGHT_OUTER_JOIN)
+ } else if (type == joinDesc.RIGHT_OUTER_JOIN) {
return joinObjectsRightOuterJoin(resNulls, inputNulls, newObj, intObj,
- left, newObjNull, firstRow);
- else if (type == joinDesc.LEFT_SEMI_JOIN)
+ left, newObjNull, firstRow);
+ } else if (type == joinDesc.LEFT_SEMI_JOIN) {
return joinObjectsLeftSemiJoin(resNulls, inputNulls, newObj, intObj,
- left, newObjNull);
+ left, newObjNull);
+ }
assert (type == joinDesc.FULL_OUTER_JOIN);
return joinObjectsFullOuterJoin(resNulls, inputNulls, newObj, intObj, left,
- newObjNull, firstRow);
+ newObjNull, firstRow);
}
/*
@@ -645,7 +698,7 @@
* are accounted for, the output is forwared appropriately.
*/
private void genObject(ArrayList<boolean[]> inputNulls, int aliasNum,
- IntermediateObject intObj, boolean firstRow) throws HiveException {
+ IntermediateObject intObj, boolean firstRow) throws HiveException {
boolean childFirstRow = firstRow;
boolean skipping = false;
@@ -653,23 +706,22 @@
// search for match in the rhs table
RowContainer<ArrayList<Object>> aliasRes = storage.get(order[aliasNum]);
-
- for (ArrayList<Object> newObj = aliasRes.first();
- newObj != null;
- newObj = aliasRes.next()) {
+
+ for (ArrayList<Object> newObj = aliasRes.first(); newObj != null; newObj = aliasRes
+ .next()) {
// check for skipping in case of left semi join
- if (aliasNum > 0 &&
- condn[aliasNum - 1].getType() == joinDesc.LEFT_SEMI_JOIN &&
- newObj != dummyObj[aliasNum] ) { // successful match
+ if (aliasNum > 0
+ && condn[aliasNum - 1].getType() == joinDesc.LEFT_SEMI_JOIN
+ && newObj != dummyObj[aliasNum]) { // successful match
skipping = true;
}
intObj.pushObj(newObj);
-
+
// execute the actual join algorithm
- ArrayList<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj,
- aliasNum, childFirstRow);
+ ArrayList<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj,
+ aliasNum, childFirstRow);
// recursively call the join the other rhs tables
genObject(newNulls, aliasNum + 1, intObj, firstRow);
@@ -677,14 +729,16 @@
intObj.popObj();
firstRow = false;
- // if left-semi-join found a match, skipping the rest of the rows in the rhs table of the semijoin
- if ( skipping ) {
+ // if left-semi-join found a match, skipping the rest of the rows in the
+ // rhs table of the semijoin
+ if (skipping) {
break;
}
}
} else {
- if (inputNulls == null)
+ if (inputNulls == null) {
return;
+ }
Iterator<boolean[]> nullsIter = inputNulls.iterator();
while (nullsIter.hasNext()) {
boolean[] nullsVec = nullsIter.next();
@@ -695,18 +749,18 @@
/**
* Forward a record of join results.
- *
+ *
* @throws HiveException
*/
+ @Override
public void endGroup() throws HiveException {
LOG.trace("Join Op: endGroup called: numValues=" + numAliases);
-
checkAndGenObject();
}
private void genUniqueJoinObject(int aliasNum, IntermediateObject intObj)
- throws HiveException {
+ throws HiveException {
if (aliasNum == numAliases) {
int p = 0;
for (int i = 0; i < numAliases; i++) {
@@ -722,33 +776,30 @@
}
RowContainer<ArrayList<Object>> alias = storage.get(order[aliasNum]);
- for (ArrayList<Object> row = alias.first();
- row != null;
- row = alias.next() ) {
+ for (ArrayList<Object> row = alias.first(); row != null; row = alias.next()) {
intObj.pushObj(row);
- genUniqueJoinObject(aliasNum+1, intObj);
+ genUniqueJoinObject(aliasNum + 1, intObj);
intObj.popObj();
}
}
protected void checkAndGenObject() throws HiveException {
if (condn[0].getType() == joinDesc.UNIQUE_JOIN) {
- IntermediateObject intObj =
- new IntermediateObject(new ArrayList[numAliases], 0);
+ new IntermediateObject(new ArrayList[numAliases], 0);
// Check if results need to be emitted.
// Results only need to be emitted if there is a non-null entry in a table
// that is preserved or if there are no non-null entries
boolean preserve = false; // Will be true if there is a non-null entry
- // in a preserved table
+ // in a preserved table
boolean hasNulls = false; // Will be true if there are null entries
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- RowContainer<ArrayList<Object>> alw = storage.get(alias);
- if ( alw.size() == 0) {
- alw.add((ArrayList<Object>)dummyObj[i]);
+ RowContainer<ArrayList<Object>> alw = storage.get(alias);
+ if (alw.size() == 0) {
+ alw.add((ArrayList<Object>) dummyObj[i]);
hasNulls = true;
- } else if(condn[i].getPreserved()) {
+ } else if (condn[i].getPreserved()) {
preserve = true;
}
}
@@ -758,38 +809,42 @@
}
LOG.trace("calling genUniqueJoinObject");
- genUniqueJoinObject(0, new IntermediateObject(new ArrayList[numAliases], 0));
+ genUniqueJoinObject(0, new IntermediateObject(new ArrayList[numAliases],
+ 0));
LOG.trace("called genUniqueJoinObject");
} else {
// does any result need to be emitted
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- RowContainer<ArrayList<Object>> alw = storage.get(alias);
+ RowContainer<ArrayList<Object>> alw = storage.get(alias);
if (alw.size() == 0) {
if (noOuterJoin) {
LOG.trace("No data for alias=" + i);
return;
} else {
- alw.add((ArrayList<Object>)dummyObj[i]);
+ alw.add((ArrayList<Object>) dummyObj[i]);
}
}
}
LOG.trace("calling genObject");
- genObject(null, 0, new IntermediateObject(new ArrayList[numAliases], 0), true);
+ genObject(null, 0, new IntermediateObject(new ArrayList[numAliases], 0),
+ true);
LOG.trace("called genObject");
}
}
/**
* All done
- *
+ *
*/
+ @Override
public void closeOp(boolean abort) throws HiveException {
LOG.trace("Join Op close");
- for ( RowContainer<ArrayList<Object>> alw: storage.values() ) {
- if(alw != null) //it maybe null for mapjoins
- alw.clear(); // clean up the temp files
+ for (RowContainer<ArrayList<Object>> alw : storage.values()) {
+ if (alw != null) {
+ alw.clear(); // clean up the temp files
+ }
}
storage.clear();
}
@@ -807,7 +862,8 @@
}
/**
- * @param posToAliasMap the posToAliasMap to set
+ * @param posToAliasMap
+ * the posToAliasMap to set
*/
public void setPosToAliasMap(Map<Integer, Set<String>> posToAliasMap) {
this.posToAliasMap = posToAliasMap;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ComparisonOpMethodResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ComparisonOpMethodResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ComparisonOpMethodResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ComparisonOpMethodResolver.java Thu Jan 21 10:37:58 2010
@@ -23,54 +23,52 @@
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
- * The class implements the method resolution for operators like
- * (> < <= >= = <>). The resolution logic is as follows:
- * 1. If one of the parameters is null, then it resolves to
- * evaluate(Double, Double)
- * 2. If both of the parameters are of type T, then it resolves to
- * evaluate(T, T)
- * 3. If 1 and 2 fails then it resolves to evaluate(Double, Double).
+ * The class implements the method resolution for operators like (> < <= >= =
+ * <>). The resolution logic is as follows: 1. If one of the parameters is null,
+ * then it resolves to evaluate(Double, Double) 2. If both of the parameters are
+ * of type T, then it resolves to evaluate(T, T) 3. If 1 and 2 fails then it
+ * resolves to evaluate(Double, Double).
*/
public class ComparisonOpMethodResolver implements UDFMethodResolver {
/**
* The udfclass for which resolution is needed.
*/
- private Class<? extends UDF> udfClass;
-
+ private final Class<? extends UDF> udfClass;
+
/**
* Constuctor.
*/
public ComparisonOpMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.hive.ql.exec.UDFMethodResolver#getEvalMethod(java.util.List)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.exec.UDFMethodResolver#getEvalMethod(java.util
+ * .List)
*/
@Override
public Method getEvalMethod(List<TypeInfo> argTypeInfos)
throws AmbiguousMethodException {
- assert(argTypeInfos.size() == 2);
+ assert (argTypeInfos.size() == 2);
List<TypeInfo> pTypeInfos = null;
- if (argTypeInfos.get(0).equals(TypeInfoFactory.voidTypeInfo) ||
- argTypeInfos.get(1).equals(TypeInfoFactory.voidTypeInfo)) {
+ if (argTypeInfos.get(0).equals(TypeInfoFactory.voidTypeInfo)
+ || argTypeInfos.get(1).equals(TypeInfoFactory.voidTypeInfo)) {
pTypeInfos = new ArrayList<TypeInfo>();
pTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
pTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
- }
- else if (argTypeInfos.get(0) == argTypeInfos.get(1)) {
+ } else if (argTypeInfos.get(0) == argTypeInfos.get(1)) {
pTypeInfos = argTypeInfos;
- }
- else {
+ } else {
pTypeInfos = new ArrayList<TypeInfo>();
pTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
pTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
@@ -78,18 +76,19 @@
Method udfMethod = null;
- for(Method m: Arrays.asList(udfClass.getMethods())) {
+ for (Method m : Arrays.asList(udfClass.getMethods())) {
if (m.getName().equals("evaluate")) {
- List<TypeInfo> acceptedTypeInfos = TypeInfoUtils.getParameterTypeInfos(m, pTypeInfos.size());
+ List<TypeInfo> acceptedTypeInfos = TypeInfoUtils.getParameterTypeInfos(
+ m, pTypeInfos.size());
if (acceptedTypeInfos == null) {
// null means the method does not accept number of arguments passed.
continue;
}
-
+
boolean match = (acceptedTypeInfos.size() == pTypeInfos.size());
- for(int i=0; i<pTypeInfos.size() && match; i++) {
+ for (int i = 0; i < pTypeInfos.size() && match; i++) {
TypeInfo accepted = acceptedTypeInfos.get(i);
if (accepted != pTypeInfos.get(i)) {
match = false;
@@ -99,14 +98,13 @@
if (match) {
if (udfMethod != null) {
throw new AmbiguousMethodException(udfClass, argTypeInfos);
- }
- else {
+ } else {
udfMethod = m;
}
}
}
}
- return udfMethod;
+ return udfMethod;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Thu Jan 21 10:37:58 2010
@@ -32,60 +32,70 @@
* Conditional Task implementation
**/
-public class ConditionalTask extends Task<ConditionalWork> implements Serializable {
+public class ConditionalTask extends Task<ConditionalWork> implements
+ Serializable {
private static final long serialVersionUID = 1L;
private List<Task<? extends Serializable>> listTasks;
-
+
private boolean resolved = false;
private List<Task<? extends Serializable>> resTasks;
-
+
private ConditionalResolver resolver;
- private Object resolverCtx;
-
+ private Object resolverCtx;
+
public ConditionalTask() {
super();
}
-
+
+ @Override
public boolean isMapRedTask() {
- for (Task<? extends Serializable> task : listTasks)
- if (task.isMapRedTask())
+ for (Task<? extends Serializable> task : listTasks) {
+ if (task.isMapRedTask()) {
return true;
-
+ }
+ }
+
return false;
}
-
+
+ @Override
public boolean hasReduce() {
- for (Task<? extends Serializable> task : listTasks)
- if (task.hasReduce())
+ for (Task<? extends Serializable> task : listTasks) {
+ if (task.hasReduce()) {
return true;
-
+ }
+ }
+
return false;
}
-
- public void initialize (HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+
+ @Override
+ public void initialize(HiveConf conf, QueryPlan queryPlan,
+ DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
}
-
+
@Override
public int execute() {
resTasks = resolver.getTasks(conf, resolverCtx);
resolved = true;
- for(Task<? extends Serializable> tsk: getListTasks()) {
- if(!resTasks.contains(tsk)) {
- this.driverContext.getRunnable().remove(tsk);
+ for (Task<? extends Serializable> tsk : getListTasks()) {
+ if (!resTasks.contains(tsk)) {
+ driverContext.getRunnable().remove(tsk);
console.printInfo(ExecDriver.getJobEndMsg(""
+ Utilities.randGen.nextInt())
+ ", job is filtered out (removed at runtime).");
- if(tsk.getChildTasks() != null) {
- for(Task<? extends Serializable> child : tsk.getChildTasks()) {
+ if (tsk.getChildTasks() != null) {
+ for (Task<? extends Serializable> child : tsk.getChildTasks()) {
child.parentTasks.remove(tsk);
- if(DriverContext.isLaunchable(child))
- this.driverContext.addToRunnable(child);
+ if (DriverContext.isLaunchable(child)) {
+ driverContext.addToRunnable(child);
+ }
}
}
- } else if(!this.driverContext.getRunnable().contains(tsk)){
- this.driverContext.addToRunnable(tsk);
+ } else if (!driverContext.getRunnable().contains(tsk)) {
+ driverContext.addToRunnable(tsk);
}
}
return 0;
@@ -99,7 +109,8 @@
}
/**
- * @param resolver the resolver to set
+ * @param resolver
+ * the resolver to set
*/
public void setResolver(ConditionalResolver resolver) {
this.resolver = resolver;
@@ -111,29 +122,34 @@
public Object getResolverCtx() {
return resolverCtx;
}
-
- // used to determine whether child tasks can be run.
+
+ // used to determine whether child tasks can be run.
+ @Override
public boolean done() {
boolean ret = true;
- List<Task<? extends Serializable>> parentTasks = this.getParentTasks();
+ List<Task<? extends Serializable>> parentTasks = getParentTasks();
if (parentTasks != null) {
- for(Task<? extends Serializable> par: parentTasks)
+ for (Task<? extends Serializable> par : parentTasks) {
ret = ret && par.done();
+ }
}
List<Task<? extends Serializable>> retTasks;
- if(resolved)
- retTasks = this.resTasks;
- else
+ if (resolved) {
+ retTasks = resTasks;
+ } else {
retTasks = getListTasks();
- if (ret && retTasks != null) {
- for (Task<? extends Serializable> tsk : retTasks)
+ }
+ if (ret && retTasks != null) {
+ for (Task<? extends Serializable> tsk : retTasks) {
ret = ret && tsk.done();
+ }
}
return ret;
}
/**
- * @param resolverCtx the resolverCtx to set
+ * @param resolverCtx
+ * the resolverCtx to set
*/
public void setResolverCtx(Object resolverCtx) {
this.resolverCtx = resolverCtx;
@@ -147,12 +163,14 @@
}
/**
- * @param listTasks the listTasks to set
+ * @param listTasks
+ * the listTasks to set
*/
public void setListTasks(List<Task<? extends Serializable>> listTasks) {
this.listTasks = listTasks;
}
-
+
+ @Override
public int getType() {
return StageType.CONDITIONAL;
}
@@ -169,10 +187,11 @@
*
* @return true if the task got added false if it already existed
*/
+ @Override
public boolean addDependentTask(Task<? extends Serializable> dependent) {
boolean ret = false;
- if(this.getListTasks() != null) {
- for(Task<? extends Serializable> tsk: this.getListTasks()) {
+ if (getListTasks() != null) {
+ for (Task<? extends Serializable> tsk : getListTasks()) {
ret = ret & tsk.addDependentTask(dependent);
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Thu Jan 21 10:37:58 2010
@@ -20,13 +20,13 @@
import java.io.Serializable;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.copyWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
import org.apache.hadoop.util.StringUtils;
/**
@@ -39,7 +39,8 @@
public CopyTask() {
super();
}
-
+
+ @Override
public int execute() {
FileSystem dstFs = null;
Path toPath = null;
@@ -47,42 +48,46 @@
Path fromPath = new Path(work.getFromPath());
toPath = new Path(work.getToPath());
- console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString());
+ console.printInfo("Copying data from " + fromPath.toString(), " to "
+ + toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf);
dstFs = toPath.getFileSystem(conf);
- FileStatus [] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
+ FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
- if(srcs == null || srcs.length == 0) {
+ if (srcs == null || srcs.length == 0) {
console.printError("No files matching path: " + fromPath.toString());
return 3;
}
if (!dstFs.mkdirs(toPath)) {
- console.printError("Cannot make target directory: " + toPath.toString());
+ console
+ .printError("Cannot make target directory: " + toPath.toString());
return 2;
- }
+ }
- for(FileStatus oneSrc: srcs) {
+ for (FileStatus oneSrc : srcs) {
LOG.debug("Copying file: " + oneSrc.getPath().toString());
- if(!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath,
- false, // delete source
- true, // overwrite destination
- conf)) {
- console.printError("Failed to copy: '"+ oneSrc.getPath().toString() +
- "to: '" + toPath.toString() + "'");
+ if (!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete
+ // source
+ true, // overwrite destination
+ conf)) {
+ console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ + "to: '" + toPath.toString() + "'");
return 1;
}
}
return 0;
} catch (Exception e) {
- console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+ console.printError("Failed with exception " + e.getMessage(), "\n"
+ + StringUtils.stringifyException(e));
return (1);
}
}
-
+
+ @Override
public int getType() {
return StageType.COPY;
}