You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/28 00:16:46 UTC

svn commit: r903901 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/parse/

Author: namit
Date: Wed Jan 27 23:16:45 2010
New Revision: 903901

URL: http://svn.apache.org/viewvc?rev=903901&view=rev
Log:
HIVE-1108. Make QueryPlan serializable
(Zheng Shao via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jan 27 23:16:45 2010
@@ -82,6 +82,9 @@
     HIVE-898. Hive ODBC build fails on OSX.
     (Carl Steinbach and Ning Zhang via zshao)
 
+    HIVE-1108. Make QueryPlan serializable
+    (Zheng Shao via namit)
+
 Release 0.5.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Jan 27 23:16:45 2010
@@ -18,26 +18,38 @@
 
 package org.apache.hadoop.hive.ql;
 
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
 import java.io.DataInput;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Random;
 import java.util.Set;
 import java.util.Vector;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
@@ -48,6 +60,7 @@
 import org.apache.hadoop.hive.ql.exec.TaskResult;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.hooks.PostExecute;
 import org.apache.hadoop.hive.ql.hooks.PreExecute;
@@ -59,11 +72,15 @@
 import org.apache.hadoop.hive.ql.parse.ParseUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -81,6 +98,8 @@
   private DataInput resStream;
   private Context ctx;
   private QueryPlan plan;
+  private Schema schema;
+  
   private String errorMessage;
   private String SQLState;
 
@@ -137,58 +156,59 @@
     return cs;
   }
 
+  
+  public Schema getSchema() {
+    return schema;
+  }
+  
   /**
    * Get a Schema with fields represented with native Hive types
    */
-  public Schema getSchema() throws Exception {
+  public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
     Schema schema = null;
-    try {
-      // If we have a plan, prefer its logical result schema if it's
-      // available; otherwise, try digging out a fetch task; failing that,
-      // give up.
-      if (plan == null) {
-        // can't get any info without a plan
-      } else if (plan.getPlan().getResultSchema() != null) {
-        List<FieldSchema> lst = plan.getPlan().getResultSchema();
-        schema = new Schema(lst, null);
-      } else if (plan.getPlan().getFetchTask() != null) {
-        BaseSemanticAnalyzer sem = plan.getPlan();
-
-        if (!sem.getFetchTaskInit()) {
-          sem.setFetchTaskInit(true);
-          sem.getFetchTask().initialize(conf, plan, null);
-        }
-        FetchTask ft = (FetchTask) sem.getFetchTask();
-
-        TableDesc td = ft.getTblDesc();
-        // partitioned tables don't have tableDesc set on the FetchTask. Instead
-        // they have a list of PartitionDesc objects, each with a table desc.
-        // Let's
-        // try to fetch the desc for the first partition and use it's
-        // deserializer.
-        if (td == null && ft.getWork() != null
-            && ft.getWork().getPartDesc() != null) {
-          if (ft.getWork().getPartDesc().size() > 0) {
-            td = ft.getWork().getPartDesc().get(0).getTableDesc();
-          }
-        }
-
-        if (td == null) {
-          throw new Exception("No table description found for fetch task: "
-              + ft);
+    
+    // If we have a plan, prefer its logical result schema if it's
+    // available; otherwise, try digging out a fetch task; failing that,
+    // give up.
+    if (sem == null) {
+      // can't get any info without a plan
+    } else if (sem.getResultSchema() != null) {
+      List<FieldSchema> lst = sem.getResultSchema();
+      schema = new Schema(lst, null);
+    } else if (sem.getFetchTask() != null) {
+      FetchTask ft = (FetchTask) sem.getFetchTask();
+      TableDesc td = ft.getTblDesc();
+      // partitioned tables don't have tableDesc set on the FetchTask. Instead
+      // they have a list of PartitionDesc objects, each with a table desc.
+      // Let's
+      // try to fetch the desc for the first partition and use it's
+      // deserializer.
+      if (td == null && ft.getWork() != null
+          && ft.getWork().getPartDesc() != null) {
+        if (ft.getWork().getPartDesc().size() > 0) {
+          td = ft.getWork().getPartDesc().get(0).getTableDesc();
         }
+      }
 
+      if (td == null) {
+        LOG.info("No returning schema.");
+      } else {
         String tableName = "result";
-        List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(
-            tableName, td.getDeserializer());
-        schema = new Schema(lst, null);
-      }
-      if (schema == null) {
-        schema = new Schema();
+        List<FieldSchema> lst = null;
+        try {
+          lst = MetaStoreUtils.getFieldsFromDeserializer(
+              tableName, td.getDeserializer());
+        } catch (Exception e) {
+          LOG.warn("Error getting schema: " + 
+              org.apache.hadoop.util.StringUtils.stringifyException(e));
+        }
+        if (lst != null) {
+          schema = new Schema(lst, null);
+        }
       }
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw e;
+    }
+    if (schema == null) {
+      schema = new Schema();
     }
     LOG.info("Returning Hive schema: " + schema);
     return schema;
@@ -302,7 +322,14 @@
       sem.validate();
 
       plan = new QueryPlan(command, sem);
-
+      // initialize FetchTask right here
+      if (sem.getFetchTask() != null) {
+        sem.getFetchTask().initialize(conf, plan, null);
+      }
+      
+      // get the output schema
+      schema = getSchema(sem, conf);
+      
       return (0);
     } catch (SemanticException e) {
       errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
@@ -459,16 +486,14 @@
       }
       resStream = null;
 
-      BaseSemanticAnalyzer sem = plan.getPlan();
-
       // Get all the pre execution hooks and execute them.
       for (PreExecute peh : getPreExecHooks()) {
-        peh.run(SessionState.get(), sem.getInputs(), sem.getOutputs(),
+        peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
             UnixUserGroupInformation.readFromConf(conf,
                 UnixUserGroupInformation.UGI_PROPERTY_NAME));
       }
 
-      int jobs = countJobs(sem.getRootTasks());
+      int jobs = countJobs(plan.getRootTasks());
       if (jobs > 0) {
         console.printInfo("Total MapReduce jobs = " + jobs);
       }
@@ -476,7 +501,7 @@
         SessionState.get().getHiveHistory().setQueryProperty(queryId,
             Keys.QUERY_NUM_TASKS, String.valueOf(jobs));
         SessionState.get().getHiveHistory().setIdToTableMap(
-            sem.getIdToTableNameMap());
+            plan.getIdToTableNameMap());
       }
       String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
 
@@ -493,7 +518,7 @@
 
       // Add root Tasks to runnable
 
-      for (Task<? extends Serializable> tsk : sem.getRootTasks()) {
+      for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
         driverCxt.addToRunnable(tsk);
       }
 
@@ -542,7 +567,7 @@
 
       // Get all the post execution hooks and execute them.
       for (PostExecute peh : getPostExecHooks()) {
-        peh.run(SessionState.get(), sem.getInputs(), sem.getOutputs(),
+        peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
             UnixUserGroupInformation.readFromConf(conf,
                 UnixUserGroupInformation.UGI_PROPERTY_NAME));
       }
@@ -580,6 +605,7 @@
       }
     }
     console.printInfo("OK");
+    
     return (0);
   }
 
@@ -677,13 +703,8 @@
   }
 
   public boolean getResults(Vector<String> res) throws IOException {
-    if (plan != null && plan.getPlan().getFetchTask() != null) {
-      BaseSemanticAnalyzer sem = plan.getPlan();
-      if (!sem.getFetchTaskInit()) {
-        sem.setFetchTaskInit(true);
-        sem.getFetchTask().initialize(conf, plan, null);
-      }
-      FetchTask ft = (FetchTask) sem.getFetchTask();
+    if (plan != null && plan.getFetchTask() != null) {
+      FetchTask ft = (FetchTask) plan.getFetchTask();
       ft.setMaxRows(maxRows);
       return ft.fetch(res);
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Wed Jan 27 23:16:45 2010
@@ -36,8 +36,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
 import org.apache.hadoop.hive.ql.plan.api.NodeType;
@@ -47,27 +50,47 @@
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
 
+/**
+ * QueryPlan can be serialized to disk so that we can restart/resume the
+ * progress of it in the future, either within or outside of the current
+ * jvm.
+ */
 public class QueryPlan implements Serializable {
   private static final long serialVersionUID = 1L;
 
   static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName());
 
   private final String queryString;
-  private final BaseSemanticAnalyzer plan;
+  
+  private ArrayList<Task<? extends Serializable>> rootTasks;
+  private FetchTask fetchTask;
+  private HashSet<ReadEntity> inputs;
+  private HashSet<WriteEntity> outputs;
+
+  private HashMap<String, String> idToTableNameMap;
+  
   private final String queryId;
   private final org.apache.hadoop.hive.ql.plan.api.Query query;
-  private final Map<String, Map<String, Long>> counters;
-  private final Set<String> done;
-  private final Set<String> started;
+  private final HashMap<String, HashMap<String, Long>> counters;
+  private final HashSet<String> done;
+  private final HashSet<String> started;
 
-  public QueryPlan(String queryString, BaseSemanticAnalyzer plan) {
+  public QueryPlan(String queryString, BaseSemanticAnalyzer sem) {
     this.queryString = queryString;
-    this.plan = plan;
+    
+    rootTasks = new ArrayList<Task<? extends Serializable>>();
+    rootTasks.addAll(sem.getRootTasks());
+    fetchTask = sem.getFetchTask();
+    // Note that inputs and outputs can be changed when the query gets executed
+    inputs = sem.getInputs();
+    outputs = sem.getOutputs();
+    idToTableNameMap = new HashMap<String, String>(sem.getIdToTableNameMap());
+    
     queryId = makeQueryId();
     query = new org.apache.hadoop.hive.ql.plan.api.Query();
     query.setQueryId(queryId);
     query.putToQueryAttributes("queryString", this.queryString);
-    counters = new HashMap<String, Map<String, Long>>();
+    counters = new HashMap<String, HashMap<String, Long>>();
     done = new HashSet<String>();
     started = new HashSet<String>();
   }
@@ -76,10 +99,6 @@
     return queryString;
   }
 
-  public BaseSemanticAnalyzer getPlan() {
-    return plan;
-  }
-
   public String getQueryId() {
     return queryId;
   }
@@ -152,7 +171,7 @@
 
     Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
     Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
-    tasksToVisit.addAll(plan.getRootTasks());
+    tasksToVisit.addAll(rootTasks);
     while (tasksToVisit.size() != 0) {
       Task<? extends Serializable> task = tasksToVisit.remove();
       tasksVisited.add(task);
@@ -273,7 +292,7 @@
   private void extractCounters() throws IOException {
     Queue<Task<? extends Serializable>> tasksToVisit = new LinkedList<Task<? extends Serializable>>();
     Set<Task<? extends Serializable>> tasksVisited = new HashSet<Task<? extends Serializable>>();
-    tasksToVisit.addAll(plan.getRootTasks());
+    tasksToVisit.addAll(rootTasks);
     while (tasksToVisit.peek() != null) {
       Task<? extends Serializable> task = tasksToVisit.remove();
       tasksVisited.add(task);
@@ -601,4 +620,44 @@
     return done;
   }
 
+  public ArrayList<Task<? extends Serializable>> getRootTasks() {
+    return rootTasks;
+  }
+
+  public void setRootTasks(ArrayList<Task<? extends Serializable>> rootTasks) {
+    this.rootTasks = rootTasks;
+  }
+
+  public FetchTask getFetchTask() {
+    return fetchTask;
+  }
+
+  public void setFetchTask(FetchTask fetchTask) {
+    this.fetchTask = fetchTask;
+  }
+
+  public HashSet<ReadEntity> getInputs() {
+    return inputs;
+  }
+
+  public void setInputs(HashSet<ReadEntity> inputs) {
+    this.inputs = inputs;
+  }
+
+  public HashSet<WriteEntity> getOutputs() {
+    return outputs;
+  }
+
+  public void setOutputs(HashSet<WriteEntity> outputs) {
+    this.outputs = outputs;
+  }
+
+  public HashMap<String, String> getIdToTableNameMap() {
+    return idToTableNameMap;
+  }
+
+  public void setIdToTableNameMap(HashMap<String, String> idToTableNameMap) {
+    this.idToTableNameMap = idToTableNameMap;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Jan 27 23:16:45 2010
@@ -811,7 +811,7 @@
   /**
    * populated at runtime from hadoop counters at run time in the client
    */
-  transient protected Map<String, Long> counters;
+  transient protected HashMap<String, Long> counters;
 
   /**
    * keeps track of unique ProgressCounter enums used this value is used at
@@ -893,7 +893,7 @@
     this.operatorId = operatorId;
   }
 
-  public Map<String, Long> getCounters() {
+  public HashMap<String, Long> getCounters() {
     return counters;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Wed Jan 27 23:16:45 2010
@@ -56,7 +56,7 @@
   transient protected LogHelper console;
   transient protected QueryPlan queryPlan;
   transient protected TaskHandle taskHandle;
-  transient protected Map<String, Long> taskCounters;
+  transient protected HashMap<String, Long> taskCounters;
   transient protected DriverContext driverContext;
   // Bean methods
 
@@ -279,7 +279,7 @@
     // default, do nothing
   }
 
-  public Map<String, Long> getCounters() {
+  public HashMap<String, Long> getCounters() {
     return taskCounters;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Wed Jan 27 23:16:45 2010
@@ -22,6 +22,7 @@
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -34,6 +35,7 @@
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -56,8 +58,7 @@
   protected final Hive db;
   protected final HiveConf conf;
   protected List<Task<? extends Serializable>> rootTasks;
-  protected Task<? extends Serializable> fetchTask;
-  protected boolean fetchTaskInit;
+  protected FetchTask fetchTask;
   protected final Log LOG;
   protected final LogHelper console;
 
@@ -67,11 +68,11 @@
   /**
    * ReadEntitites that are passed to the hooks.
    */
-  protected Set<ReadEntity> inputs;
+  protected HashSet<ReadEntity> inputs;
   /**
    * List of WriteEntities that are passed to the hooks.
    */
-  protected Set<WriteEntity> outputs;
+  protected HashSet<WriteEntity> outputs;
 
   protected static final String TEXTFILE_INPUT = TextInputFormat.class
       .getName();
@@ -124,7 +125,7 @@
   /**
    * @return the fetchTask
    */
-  public Task<? extends Serializable> getFetchTask() {
+  public FetchTask getFetchTask() {
     return fetchTask;
   }
 
@@ -132,18 +133,10 @@
    * @param fetchTask
    *          the fetchTask to set
    */
-  public void setFetchTask(Task<? extends Serializable> fetchTask) {
+  public void setFetchTask(FetchTask fetchTask) {
     this.fetchTask = fetchTask;
   }
 
-  public boolean getFetchTaskInit() {
-    return fetchTaskInit;
-  }
-
-  public void setFetchTaskInit(boolean fetchTaskInit) {
-    this.fetchTaskInit = fetchTaskInit;
-  }
-
   protected void reset() {
     rootTasks = new ArrayList<Task<? extends Serializable>>();
   }
@@ -293,11 +286,11 @@
     return sb.toString();
   }
 
-  public Set<ReadEntity> getInputs() {
+  public HashSet<ReadEntity> getInputs() {
     return inputs;
   }
 
-  public Set<WriteEntity> getOutputs() {
+  public HashSet<WriteEntity> getOutputs() {
     return outputs;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Wed Jan 27 23:16:45 2010
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
@@ -290,7 +291,7 @@
    * @param schema
    *          thrift ddl
    */
-  private Task<? extends Serializable> createFetchTask(String schema) {
+  private FetchTask createFetchTask(String schema) {
     Properties prop = new Properties();
 
     prop.setProperty(Constants.SERIALIZATION_FORMAT, "9");
@@ -303,7 +304,7 @@
         LazySimpleSerDe.class, TextInputFormat.class,
         IgnoreKeyTextOutputFormat.class, prop), -1);
     fetch.setSerializationNullFormat(" ");
-    return TaskFactory.get(fetch, conf);
+    return (FetchTask)TaskFactory.get(fetch, conf);
   }
 
   private void analyzeDescribeTable(ASTNode ast) throws SemanticException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=903901&r1=903900&r2=903901&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Jan 27 23:16:45 2010
@@ -44,6 +44,7 @@
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -5164,7 +5165,7 @@
   private void genMapRedTasks(QB qb) throws SemanticException {
     FetchWork fetch = null;
     List<Task<? extends Serializable>> mvTask = new ArrayList<Task<? extends Serializable>>();
-    Task<? extends Serializable> fetchTask = null;
+    FetchTask fetchTask = null;
 
     QBParseInfo qbParseInfo = qb.getParseInfo();
 
@@ -5235,7 +5236,7 @@
       }
 
       if (noMapRed) {
-        fetchTask = TaskFactory.get(fetch, conf);
+        fetchTask = (FetchTask)TaskFactory.get(fetch, conf);
         setFetchTask(fetchTask);
 
         // remove root tasks if any
@@ -5262,7 +5263,7 @@
                   org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
                   colTypes)), qb.getParseInfo().getOuterQueryLimit());
 
-      fetchTask = TaskFactory.get(fetch, conf);
+      fetchTask = (FetchTask)TaskFactory.get(fetch, conf);
       setFetchTask(fetchTask);
     } else {
       new ArrayList<MoveWork>();