You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/28 00:16:46 UTC
svn commit: r903901 - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/parse/
Author: namit
Date: Wed Jan 27 23:16:45 2010
New Revision: 903901
URL: http://svn.apache.org/viewvc?rev=903901&view=rev
Log:
HIVE-1108. Make QueryPlan serializable
(Zheng Shao via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jan 27 23:16:45 2010
@@ -82,6 +82,9 @@
HIVE-898. Hive ODBC build fails on OSX.
(Carl Steinbach and Ning Zhang via zshao)
+ HIVE-1108. Make QueryPlan serializable
+ (Zheng Shao via namit)
+
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Jan 27 23:16:45 2010
@@ -18,26 +18,38 @@
package org.apache.hadoop.hive.ql;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
import java.io.DataInput;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Random;
import java.util.Set;
import java.util.Vector;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
@@ -48,6 +60,7 @@
import org.apache.hadoop.hive.ql.exec.TaskResult;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
@@ -59,11 +72,15 @@
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -81,6 +98,8 @@
private DataInput resStream;
private Context ctx;
private QueryPlan plan;
+ private Schema schema;
+
private String errorMessage;
private String SQLState;
@@ -137,58 +156,59 @@
return cs;
}
+
+ public Schema getSchema() {
+ return schema;
+ }
+
/**
* Get a Schema with fields represented with native Hive types
*/
- public Schema getSchema() throws Exception {
+ public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
Schema schema = null;
- try {
- // If we have a plan, prefer its logical result schema if it's
- // available; otherwise, try digging out a fetch task; failing that,
- // give up.
- if (plan == null) {
- // can't get any info without a plan
- } else if (plan.getPlan().getResultSchema() != null) {
- List<FieldSchema> lst = plan.getPlan().getResultSchema();
- schema = new Schema(lst, null);
- } else if (plan.getPlan().getFetchTask() != null) {
- BaseSemanticAnalyzer sem = plan.getPlan();
-
- if (!sem.getFetchTaskInit()) {
- sem.setFetchTaskInit(true);
- sem.getFetchTask().initialize(conf, plan, null);
- }
- FetchTask ft = (FetchTask) sem.getFetchTask();
-
- TableDesc td = ft.getTblDesc();
- // partitioned tables don't have tableDesc set on the FetchTask. Instead
- // they have a list of PartitionDesc objects, each with a table desc.
- // Let's
- // try to fetch the desc for the first partition and use it's
- // deserializer.
- if (td == null && ft.getWork() != null
- && ft.getWork().getPartDesc() != null) {
- if (ft.getWork().getPartDesc().size() > 0) {
- td = ft.getWork().getPartDesc().get(0).getTableDesc();
- }
- }
-
- if (td == null) {
- throw new Exception("No table description found for fetch task: "
- + ft);
+
+ // If we have a plan, prefer its logical result schema if it's
+ // available; otherwise, try digging out a fetch task; failing that,
+ // give up.
+ if (sem == null) {
+ // can't get any info without a plan
+ } else if (sem.getResultSchema() != null) {
+ List<FieldSchema> lst = sem.getResultSchema();
+ schema = new Schema(lst, null);
+ } else if (sem.getFetchTask() != null) {
+ FetchTask ft = (FetchTask) sem.getFetchTask();
+ TableDesc td = ft.getTblDesc();
+ // partitioned tables don't have tableDesc set on the FetchTask. Instead
+ // they have a list of PartitionDesc objects, each with a table desc.
+ // Let's
+ // try to fetch the desc for the first partition and use it's
+ // deserializer.
+ if (td == null && ft.getWork() != null
+ && ft.getWork().getPartDesc() != null) {
+ if (ft.getWork().getPartDesc().size() > 0) {
+ td = ft.getWork().getPartDesc().get(0).getTableDesc();
}
+ }
+ if (td == null) {
+ LOG.info("No returning schema.");
+ } else {
String tableName = "result";
- List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(
- tableName, td.getDeserializer());
- schema = new Schema(lst, null);
- }
- if (schema == null) {
- schema = new Schema();
+ List<FieldSchema> lst = null;
+ try {
+ lst = MetaStoreUtils.getFieldsFromDeserializer(
+ tableName, td.getDeserializer());
+ } catch (Exception e) {
+ LOG.warn("Error getting schema: " +
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (lst != null) {
+ schema = new Schema(lst, null);
+ }
}
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
+ }
+ if (schema == null) {
+ schema = new Schema();
}
LOG.info("Returning Hive schema: " + schema);
return schema;
@@ -302,7 +322,14 @@
sem.validate();
plan = new QueryPlan(command, sem);
-
+ // initialize FetchTask right here
+ if (sem.getFetchTask() != null) {
+ sem.getFetchTask().initialize(conf, plan, null);
+ }
+
+ // get the output schema
+ schema = getSchema(sem, conf);
+
return (0);
} catch (SemanticException e) {
errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
@@ -459,16 +486,14 @@
}
resStream = null;
- BaseSemanticAnalyzer sem = plan.getPlan();
-
// Get all the pre execution hooks and execute them.
for (PreExecute peh : getPreExecHooks()) {
- peh.run(SessionState.get(), sem.getInputs(), sem.getOutputs(),
+ peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
UnixUserGroupInformation.readFromConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME));
}
- int jobs = countJobs(sem.getRootTasks());
+ int jobs = countJobs(plan.getRootTasks());
if (jobs > 0) {
console.printInfo("Total MapReduce jobs = " + jobs);
}
@@ -476,7 +501,7 @@
SessionState.get().getHiveHistory().setQueryProperty(queryId,
Keys.QUERY_NUM_TASKS, String.valueOf(jobs));
SessionState.get().getHiveHistory().setIdToTableMap(
- sem.getIdToTableNameMap());
+ plan.getIdToTableNameMap());
}
String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
@@ -493,7 +518,7 @@
// Add root Tasks to runnable
- for (Task<? extends Serializable> tsk : sem.getRootTasks()) {
+ for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
driverCxt.addToRunnable(tsk);
}
@@ -542,7 +567,7 @@
// Get all the post execution hooks and execute them.
for (PostExecute peh : getPostExecHooks()) {
- peh.run(SessionState.get(), sem.getInputs(), sem.getOutputs(),
+ peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
UnixUserGroupInformation.readFromConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME));
}
@@ -580,6 +605,7 @@
}
}
console.printInfo("OK");
+
return (0);
}
@@ -677,13 +703,8 @@
}
public boolean getResults(Vector<String> res) throws IOException {
- if (plan != null && plan.getPlan().getFetchTask() != null) {
- BaseSemanticAnalyzer sem = plan.getPlan();
- if (!sem.getFetchTaskInit()) {
- sem.setFetchTaskInit(true);
- sem.getFetchTask().initialize(conf, plan, null);
- }
- FetchTask ft = (FetchTask) sem.getFetchTask();
+ if (plan != null && plan.getFetchTask() != null) {
+ FetchTask ft = (FetchTask) plan.getFetchTask();
ft.setMaxRows(maxRows);
return ft.fetch(res);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Wed Jan 27 23:16:45 2010
@@ -36,8 +36,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
import org.apache.hadoop.hive.ql.plan.api.NodeType;
@@ -47,27 +50,47 @@
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
+/**
+ * QueryPlan can be serialized to disk so that we can restart/resume the
+ * progress of it in the future, either within or outside of the current
+ * jvm.
+ */
public class QueryPlan implements Serializable {
private static final long serialVersionUID = 1L;
static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName());
private final String queryString;
- private final BaseSemanticAnalyzer plan;
+
+ private ArrayList<Task<? extends Serializable>> rootTasks;
+ private FetchTask fetchTask;
+ private HashSet<ReadEntity> inputs;
+ private HashSet<WriteEntity> outputs;
+
+ private HashMap<String, String> idToTableNameMap;
+
private final String queryId;
private final org.apache.hadoop.hive.ql.plan.api.Query query;
- private final Map<String, Map<String, Long>> counters;
- private final Set<String> done;
- private final Set<String> started;
+ private final HashMap<String, HashMap<String, Long>> counters;
+ private final HashSet<String> done;
+ private final HashSet<String> started;
- public QueryPlan(String queryString, BaseSemanticAnalyzer plan) {
+ public QueryPlan(String queryString, BaseSemanticAnalyzer sem) {
this.queryString = queryString;
- this.plan = plan;
+
+ rootTasks = new ArrayList<Task<? extends Serializable>>();
+ rootTasks.addAll(sem.getRootTasks());
+ fetchTask = sem.getFetchTask();
+ // Note that inputs and outputs can be changed when the query gets executed
+ inputs = sem.getInputs();
+ outputs = sem.getOutputs();
+ idToTableNameMap = new HashMap<String, String>(sem.getIdToTableNameMap());
+
queryId = makeQueryId();
query = new org.apache.hadoop.hive.ql.plan.api.Query();
query.setQueryId(queryId);
query.putToQueryAttributes("queryString", this.queryString);
- counters = new HashMap<String, Map<String, Long>>();
+ counters = new HashMap<String, HashMap<String, Long>>();
done = new HashSet<String>();
started = new HashSet<String>();
}
@@ -76,10 +99,6 @@
return queryString;
}
- public BaseSemanticAnalyzer getPlan() {
- return plan;
- }
-
public String getQueryId() {
return queryId;
}
@@ -152,7 +171,7 @@
Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
- tasksToVisit.addAll(plan.getRootTasks());
+ tasksToVisit.addAll(rootTasks);
while (tasksToVisit.size() != 0) {
Task<? extends Serializable> task = tasksToVisit.remove();
tasksVisited.add(task);
@@ -273,7 +292,7 @@
private void extractCounters() throws IOException {
Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
- tasksToVisit.addAll(plan.getRootTasks());
+ tasksToVisit.addAll(rootTasks);
while (tasksToVisit.peek() != null) {
Task<? extends Serializable> task = tasksToVisit.remove();
tasksVisited.add(task);
@@ -601,4 +620,44 @@
return done;
}
+ public ArrayList<Task<? extends Serializable>> getRootTasks() {
+ return rootTasks;
+ }
+
+ public void setRootTasks(ArrayList<Task<? extends Serializable>> rootTasks) {
+ this.rootTasks = rootTasks;
+ }
+
+ public FetchTask getFetchTask() {
+ return fetchTask;
+ }
+
+ public void setFetchTask(FetchTask fetchTask) {
+ this.fetchTask = fetchTask;
+ }
+
+ public HashSet<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public void setInputs(HashSet<ReadEntity> inputs) {
+ this.inputs = inputs;
+ }
+
+ public HashSet<WriteEntity> getOutputs() {
+ return outputs;
+ }
+
+ public void setOutputs(HashSet<WriteEntity> outputs) {
+ this.outputs = outputs;
+ }
+
+ public HashMap<String, String> getIdToTableNameMap() {
+ return idToTableNameMap;
+ }
+
+ public void setIdToTableNameMap(HashMap<String, String> idToTableNameMap) {
+ this.idToTableNameMap = idToTableNameMap;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Jan 27 23:16:45 2010
@@ -811,7 +811,7 @@
/**
* populated at runtime from hadoop counters at run time in the client
*/
- transient protected Map<String, Long> counters;
+ transient protected HashMap<String, Long> counters;
/**
* keeps track of unique ProgressCounter enums used this value is used at
@@ -893,7 +893,7 @@
this.operatorId = operatorId;
}
- public Map<String, Long> getCounters() {
+ public HashMap<String, Long> getCounters() {
return counters;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Jan 27 23:16:45 2010
@@ -56,7 +56,7 @@
transient protected LogHelper console;
transient protected QueryPlan queryPlan;
transient protected TaskHandle taskHandle;
- transient protected Map<String, Long> taskCounters;
+ transient protected HashMap<String, Long> taskCounters;
transient protected DriverContext driverContext;
// Bean methods
@@ -279,7 +279,7 @@
// default, do nothing
}
- public Map<String, Long> getCounters() {
+ public HashMap<String, Long> getCounters() {
return taskCounters;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Wed Jan 27 23:16:45 2010
@@ -22,6 +22,7 @@
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -34,6 +35,7 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -56,8 +58,7 @@
protected final Hive db;
protected final HiveConf conf;
protected List<Task<? extends Serializable>> rootTasks;
- protected Task<? extends Serializable> fetchTask;
- protected boolean fetchTaskInit;
+ protected FetchTask fetchTask;
protected final Log LOG;
protected final LogHelper console;
@@ -67,11 +68,11 @@
/**
* ReadEntitites that are passed to the hooks.
*/
- protected Set<ReadEntity> inputs;
+ protected HashSet<ReadEntity> inputs;
/**
* List of WriteEntities that are passed to the hooks.
*/
- protected Set<WriteEntity> outputs;
+ protected HashSet<WriteEntity> outputs;
protected static final String TEXTFILE_INPUT = TextInputFormat.class
.getName();
@@ -124,7 +125,7 @@
/**
* @return the fetchTask
*/
- public Task<? extends Serializable> getFetchTask() {
+ public FetchTask getFetchTask() {
return fetchTask;
}
@@ -132,18 +133,10 @@
* @param fetchTask
* the fetchTask to set
*/
- public void setFetchTask(Task<? extends Serializable> fetchTask) {
+ public void setFetchTask(FetchTask fetchTask) {
this.fetchTask = fetchTask;
}
- public boolean getFetchTaskInit() {
- return fetchTaskInit;
- }
-
- public void setFetchTaskInit(boolean fetchTaskInit) {
- this.fetchTaskInit = fetchTaskInit;
- }
-
protected void reset() {
rootTasks = new ArrayList<Task<? extends Serializable>>();
}
@@ -293,11 +286,11 @@
return sb.toString();
}
- public Set<ReadEntity> getInputs() {
+ public HashSet<ReadEntity> getInputs() {
return inputs;
}
- public Set<WriteEntity> getOutputs() {
+ public HashSet<WriteEntity> getOutputs() {
return outputs;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Wed Jan 27 23:16:45 2010
@@ -36,6 +36,7 @@
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
@@ -290,7 +291,7 @@
* @param schema
* thrift ddl
*/
- private Task<? extends Serializable> createFetchTask(String schema) {
+ private FetchTask createFetchTask(String schema) {
Properties prop = new Properties();
prop.setProperty(Constants.SERIALIZATION_FORMAT, "9");
@@ -303,7 +304,7 @@
LazySimpleSerDe.class, TextInputFormat.class,
IgnoreKeyTextOutputFormat.class, prop), -1);
fetch.setSerializationNullFormat(" ");
- return TaskFactory.get(fetch, conf);
+ return (FetchTask)TaskFactory.get(fetch, conf);
}
private void analyzeDescribeTable(ASTNode ast) throws SemanticException {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Jan 27 23:16:45 2010
@@ -44,6 +44,7 @@
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -5164,7 +5165,7 @@
private void genMapRedTasks(QB qb) throws SemanticException {
FetchWork fetch = null;
List<Task<? extends Serializable>> mvTask = new ArrayList<Task<? extends Serializable>>();
- Task<? extends Serializable> fetchTask = null;
+ FetchTask fetchTask = null;
QBParseInfo qbParseInfo = qb.getParseInfo();
@@ -5235,7 +5236,7 @@
}
if (noMapRed) {
- fetchTask = TaskFactory.get(fetch, conf);
+ fetchTask = (FetchTask)TaskFactory.get(fetch, conf);
setFetchTask(fetchTask);
// remove root tasks if any
@@ -5262,7 +5263,7 @@
org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
colTypes)), qb.getParseInfo().getOuterQueryLimit());
- fetchTask = TaskFactory.get(fetch, conf);
+ fetchTask = (FetchTask)TaskFactory.get(fetch, conf);
setFetchTask(fetchTask);
} else {
new ArrayList<MoveWork>();