You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/31 00:22:46 UTC

svn commit: r1508669 [4/39] - in /hive/branches/vectorization: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/test/results/clientpositive/ data/files/ eclipse-templates/ hcatalog/build-support/ant/ hcatalog/core/src/main/java/org/apa...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Jul 30 22:22:35 2013
@@ -22,6 +22,7 @@ import java.beans.DefaultPersistenceDele
 import java.beans.Encoder;
 import java.beans.ExceptionListener;
 import java.beans.Expression;
+import java.beans.PersistenceDelegate;
 import java.beans.Statement;
 import java.beans.XMLDecoder;
 import java.beans.XMLEncoder;
@@ -50,12 +51,14 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.SQLTransientException;
+import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -111,6 +114,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -118,20 +122,24 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.Adjacency;
 import org.apache.hadoop.hive.ql.plan.api.Graph;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -166,6 +174,8 @@ public final class Utilities {
    */
 
   public static String HADOOP_LOCAL_FS = "file:///";
+  public static String MAP_PLAN_NAME = "map.xml";
+  public static String REDUCE_PLAN_NAME = "reduce.xml";
 
   /**
    * ReduceField:
@@ -188,56 +198,85 @@ public final class Utilities {
     // prevent instantiation
   }
 
-  private static Map<String, MapredWork> gWorkMap = Collections
-      .synchronizedMap(new HashMap<String, MapredWork>());
+  private static Map<Path, BaseWork> gWorkMap = Collections
+      .synchronizedMap(new HashMap<Path, BaseWork>());
   private static final Log LOG = LogFactory.getLog(Utilities.class.getName());
 
-  public static void clearMapRedWork(Configuration job) {
+  public static void clearWork(Configuration conf) {
+    Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
+    Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME);
+
+    // if the plan path hasn't been initialized just return, nothing to clean.
+    if (mapPath == null || reducePath == null) {
+      return;
+    }
+
     try {
-      Path planPath = new Path(HiveConf.getVar(job, HiveConf.ConfVars.PLAN));
-      FileSystem fs = planPath.getFileSystem(job);
-      if (fs.exists(planPath)) {
-        try {
-          fs.delete(planPath, true);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
+      FileSystem fs = mapPath.getFileSystem(conf);
+      if (fs.exists(mapPath)) {
+        fs.delete(mapPath, true);
       }
+      if (fs.exists(reducePath)) {
+        fs.delete(reducePath, true);
+      }
+
     } catch (Exception e) {
+      LOG.warn("Failed to clean-up tmp directories.", e);
     } finally {
       // where a single process works with multiple plans - we must clear
       // the cache before working with the next plan.
-      String jobID = getHiveJobID(job);
-      if (jobID != null) {
-        gWorkMap.remove(jobID);
+      if (mapPath != null) {
+        gWorkMap.remove(mapPath);
+      }
+      if (reducePath != null) {
+        gWorkMap.remove(reducePath);
       }
     }
   }
 
-  public static MapredWork getMapRedWork(Configuration job) {
-    MapredWork gWork = null;
+  public static MapredWork getMapRedWork(Configuration conf) {
+    MapredWork w = new MapredWork();
+    w.setMapWork(getMapWork(conf));
+    w.setReduceWork(getReduceWork(conf));
+    return w;
+  }
+
+  public static MapWork getMapWork(Configuration conf) {
+    return (MapWork) getBaseWork(conf, MAP_PLAN_NAME);
+  }
+
+  public static ReduceWork getReduceWork(Configuration conf) {
+    return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
+  }
+
+  public static BaseWork getBaseWork(Configuration conf, String name) {
+    BaseWork gWork = null;
+    Path path = null;
     try {
-      String jobID = getHiveJobID(job);
-      assert jobID != null;
-      gWork = gWorkMap.get(jobID);
+      path = getPlanPath(conf, name);
+      assert path != null;
+      gWork = gWorkMap.get(path);
       if (gWork == null) {
-        String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job);
-        String path;
+        String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf);
+        Path localPath;
         if (jtConf.equals("local")) {
-          String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
-          path = new Path(planPath).toUri().getPath();
+          localPath = path;
         } else {
-          path = "HIVE_PLAN" + jobID;
+          localPath = new Path(name);
         }
-        InputStream in = new FileInputStream(path);
-        MapredWork ret = deserializeMapRedWork(in, job);
+        InputStream in = new FileInputStream(localPath.toUri().getPath());
+        BaseWork ret = deserializeObject(in);
         gWork = ret;
-        gWork.initialize();
-        gWorkMap.put(jobID, gWork);
+        gWorkMap.put(path, gWork);
       }
-      return (gWork);
+      return gWork;
+    } catch (FileNotFoundException fnf) {
+      // happens. e.g.: no reduce work.
+      LOG.debug("No plan file found: "+path);
+      return null;
     } catch (Exception e) {
       e.printStackTrace();
+      LOG.error("Failed to load plan: "+path, e);
       throw new RuntimeException(e);
     }
   }
@@ -376,47 +415,115 @@ public final class Utilities {
 
   }
 
-  public static void setMapRedWork(Configuration job, MapredWork w, String hiveScratchDir) {
+  /**
+   * DatePersistenceDelegate. Needed to serialize java.util.Date
+   * since it is not serialization friendly.
+   * Also works for java.sql.Date since it derives from java.util.Date.
+   */
+  public static class DatePersistenceDelegate extends PersistenceDelegate {
+
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      Date dateVal = (Date)oldInstance;
+      Object[] args = { dateVal.getTime() };
+      return new Expression(dateVal, dateVal.getClass(), "new", args);
+    }
+
+    protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+      if (oldInstance == null || newInstance == null) {
+        return false;
+      }
+      return oldInstance.getClass() == newInstance.getClass();
+    }
+  }
+
+  /**
+   * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since
+   * it is not serialization friendly.
+   */
+  public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+    protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+      Timestamp ts = (Timestamp)oldInstance;
+      Object[] args = { ts.getNanos() };
+      Statement stmt = new Statement(oldInstance, "setNanos", args);
+      out.writeStatement(stmt);
+    }
+  }
+
+  public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
+    setMapWork(conf, w.getMapWork(), hiveScratchDir);
+    if (w.getReduceWork() != null) {
+      setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+    }
+  }
+
+  public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
+    setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+  }
+
+  public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
+    setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+  }
+
+  private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
     try {
+      setPlanPath(conf, hiveScratchDir);
 
-      // this is the unique job ID, which is kept in JobConf as part of the plan file name
-      String jobID = UUID.randomUUID().toString();
-      Path planPath = new Path(hiveScratchDir, jobID);
-      HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+      Path planPath = getPlanPath(conf, name);
 
-      // use the default file system of the job
-      FileSystem fs = planPath.getFileSystem(job);
+      // use the default file system of the conf
+      FileSystem fs = planPath.getFileSystem(conf);
       FSDataOutputStream out = fs.create(planPath);
-      serializeMapRedWork(w, out);
+      serializeObject(w, out);
 
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
       // able to get the plan directly from the cache
-      if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
+      if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
         // Set up distributed cache
-        DistributedCache.createSymlink(job);
-        String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID;
-        DistributedCache.addCacheFile(new URI(uriWithLink), job);
+        if (!DistributedCache.getSymlink(conf)) {
+          DistributedCache.createSymlink(conf);
+        }
+        String uriWithLink = planPath.toUri().toString() + "#" + name;
+        DistributedCache.addCacheFile(new URI(uriWithLink), conf);
 
         // set replication of the plan file to a high number. we use the same
         // replication factor as used by the hadoop jobclient for job.xml etc.
-        short replication = (short) job.getInt("mapred.submit.replication", 10);
+        short replication = (short) conf.getInt("mapred.submit.replication", 10);
         fs.setReplication(planPath, replication);
       }
 
       // Cache the plan in this process
-      w.initialize();
-      gWorkMap.put(jobID, w);
+      gWorkMap.put(planPath, w);
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);
     }
   }
 
-  public static String getHiveJobID(Configuration job) {
-    String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
-    if (planPath != null && !planPath.isEmpty()) {
-      return (new Path(planPath)).getName();
+  private static Path getPlanPath(Configuration conf, String name) {
+    Path planPath = getPlanPath(conf);
+    if (planPath == null) {
+      return null;
+    }
+    return new Path(planPath, name);
+  }
+
+  private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException {
+    if (getPlanPath(conf) == null) {
+      // this is the unique conf ID, which is kept in JobConf as part of the plan file name
+      String jobID = UUID.randomUUID().toString();
+      Path planPath = new Path(hiveScratchDir, jobID);
+      FileSystem fs = planPath.getFileSystem(conf);
+      fs.mkdirs(planPath);
+      HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+    }
+  }
+
+  private static Path getPlanPath(Configuration conf) {
+    String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
+    if (plan != null && !plan.isEmpty()) {
+      return new Path(plan);
     }
     return null;
   }
@@ -424,6 +531,8 @@ public final class Utilities {
   public static String serializeExpression(ExprNodeDesc expr) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     XMLEncoder encoder = new XMLEncoder(baos);
+    encoder.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
+    encoder.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
     try {
       encoder.writeObject(expr);
     } finally {
@@ -455,26 +564,6 @@ public final class Utilities {
     }
   }
 
-  /**
-   * Serialize a single Task.
-   */
-  public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
-    XMLEncoder e = null;
-    try {
-      e = new XMLEncoder(out);
-      // workaround for java 1.5
-      e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-      e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-      e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
-
-      e.writeObject(t);
-    } finally {
-      if (null != e) {
-        e.close();
-      }
-    }
-  }
-
   public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
     @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
@@ -491,102 +580,40 @@ public final class Utilities {
   }
 
   /**
-   * Serialize the whole query plan.
+   * Serialize the object. This helper function mainly makes sure that enums,
+   * counters, etc are handled properly.
    */
-  public static void serializeQueryPlan(QueryPlan plan, OutputStream out) {
+  public static void serializeObject(Object plan, OutputStream out) {
     XMLEncoder e = new XMLEncoder(out);
     e.setExceptionListener(new ExceptionListener() {
       public void exceptionThrown(Exception e) {
         LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
-        throw new RuntimeException("Cannot serialize the query plan", e);
+        throw new RuntimeException("Cannot serialize object", e);
       }
     });
     // workaround for java 1.5
     e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
     e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
     e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
+    e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
+    e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
 
-    e.setPersistenceDelegate(org.datanucleus.sco.backed.Map.class, new MapDelegate());
-    e.setPersistenceDelegate(org.datanucleus.sco.backed.List.class, new ListDelegate());
+    e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
+    e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
 
     e.writeObject(plan);
     e.close();
   }
 
   /**
-   * Deserialize the whole query plan.
-   */
-  public static QueryPlan deserializeQueryPlan(InputStream in, Configuration conf) {
-    XMLDecoder d = null;
-    try {
-      d = new XMLDecoder(in, null, null);
-      QueryPlan ret = (QueryPlan) d.readObject();
-      return (ret);
-    } finally {
-      if (null != d) {
-        d.close();
-      }
-    }
-  }
-
-  /**
-   * Serialize the mapredWork object to an output stream. DO NOT use this to write to standard
-   * output since it closes the output stream. DO USE mapredWork.toXML() instead.
-   */
-  public static void serializeMapRedWork(MapredWork w, OutputStream out) {
-    XMLEncoder e = null;
-    try {
-      e = new XMLEncoder(out);
-      // workaround for java 1.5
-      e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-      e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-      e.writeObject(w);
-    } finally {
-      if (null != e) {
-        e.close();
-      }
-    }
-
-  }
-
-  public static MapredWork deserializeMapRedWork(InputStream in, Configuration conf) {
-    XMLDecoder d = null;
-    try {
-      d = new XMLDecoder(in, null, null);
-      MapredWork ret = (MapredWork) d.readObject();
-      return (ret);
-    } finally {
-      if (null != d) {
-        d.close();
-      }
-    }
-  }
-
-  /**
-   * Serialize the mapredLocalWork object to an output stream. DO NOT use this to write to standard
-   * output since it closes the output stream. DO USE mapredWork.toXML() instead.
+   * De-serialize an object. This helper function mainly makes sure that enums,
+   * counters, etc are handled properly.
    */
-  public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) {
-    XMLEncoder e = null;
-    try {
-      e = new XMLEncoder(out);
-      // workaround for java 1.5
-      e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
-      e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
-      e.writeObject(w);
-    } finally {
-      if (null != e) {
-        e.close();
-      }
-    }
-  }
-
-  public static MapredLocalWork deserializeMapRedLocalWork(InputStream in, Configuration conf) {
+  public static <T> T deserializeObject(InputStream in) {
     XMLDecoder d = null;
     try {
       d = new XMLDecoder(in, null, null);
-      MapredLocalWork ret = (MapredLocalWork) d.readObject();
-      return (ret);
+      return (T) d.readObject();
     } finally {
       if (null != d) {
         d.close();
@@ -1765,7 +1792,7 @@ public final class Utilities {
    * @return the summary of all the input paths.
    * @throws IOException
    */
-  public static ContentSummary getInputSummary(Context ctx, MapredWork work, PathFilter filter)
+  public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
       throws IOException {
 
     long[] summary = {0, 0, 0};
@@ -2129,17 +2156,45 @@ public final class Utilities {
 
   /**
    * Check if a function can be pushed down to JDO.
-   * Now only {=, AND, OR} are supported.
+   * Now only {compares, AND, OR} are supported.
    * @param func a generic function.
    * @return true if this function can be pushed down to JDO filter.
    */
   private static boolean supportedJDOFuncs(GenericUDF func) {
+    // TODO: we might also want to add "not" and "between" here in future.
+    // TODO: change to GenericUDFBaseCompare once DN is upgraded
+    //       (see HIVE-2609 - in DN 2.0, substrings do not work in MySQL).
     return func instanceof GenericUDFOPEqual ||
+           func instanceof GenericUDFOPNotEqual ||
            func instanceof GenericUDFOPAnd ||
            func instanceof GenericUDFOPOr;
   }
 
   /**
+   * Check if a function can be pushed down to JDO for integral types.
+   * Only {=, !=} are supported. lt/gt/etc. to be dealt with in HIVE-4888.
+   * @param func a generic function.
+   * @return true iff this function can be pushed down to JDO filter for integral types.
+   */
+  private static boolean doesJDOFuncSupportIntegral(GenericUDF func) {
+    // AND, OR etc. don't need to be specified here.
+    return func instanceof GenericUDFOPEqual ||
+           func instanceof GenericUDFOPNotEqual;
+  }
+
+  /**
+   * @param type type
+   * @param constant The constant, if any.
+   * @return true iff type is an integral type.
+   */
+  private static boolean isIntegralType(String type) {
+    return type.equals(serdeConstants.TINYINT_TYPE_NAME) ||
+           type.equals(serdeConstants.SMALLINT_TYPE_NAME) ||
+           type.equals(serdeConstants.INT_TYPE_NAME) ||
+           type.equals(serdeConstants.BIGINT_TYPE_NAME);
+  }
+
+  /**
    * Check if the partition pruning expression can be pushed down to JDO filtering.
    * The partition expression contains only partition columns.
    * The criteria that an expression can be pushed down are that:
@@ -2149,32 +2204,47 @@ public final class Utilities {
    *     restriction by the current JDO filtering implementation.
    * @param tab The table that contains the partition columns.
    * @param expr the partition pruning expression
+   * @param parent parent UDF of expr if parent exists and contains a UDF; otherwise null.
    * @return null if the partition pruning expression can be pushed down to JDO filtering.
    */
-  public static String checkJDOPushDown(Table tab, ExprNodeDesc expr) {
-    if (expr instanceof ExprNodeConstantDesc) {
-      // JDO filter now only support String typed literal -- see Filter.g and ExpressionTree.java
+  public static String checkJDOPushDown(
+      Table tab, ExprNodeDesc expr, GenericUDF parent) {
+    boolean isConst = expr instanceof ExprNodeConstantDesc;
+    boolean isCol = !isConst && (expr instanceof ExprNodeColumnDesc);
+    boolean isIntegralSupported = (parent != null) && (isConst || isCol)
+        && doesJDOFuncSupportIntegral(parent);
+
+    // JDO filter now only support String typed literals, as well as integers
+    // for some operators; see Filter.g and ExpressionTree.java.
+    if (isConst) {
       Object value = ((ExprNodeConstantDesc)expr).getValue();
       if (value instanceof String) {
         return null;
       }
-      return "Constant " + value + " is not string type";
-    } else if (expr instanceof ExprNodeColumnDesc) {
-      // JDO filter now only support String typed literal -- see Filter.g and ExpressionTree.java
+      if (isIntegralSupported && isIntegralType(expr.getTypeInfo().getTypeName())) {
+        return null;
+      }
+      return "Constant " + value + " is not string "
+        + (isIntegralSupported ? "or integral ": "") + "type: " + expr.getTypeInfo().getTypeName();
+    } else if (isCol) {
       TypeInfo type = expr.getTypeInfo();
-      if (type.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+      if (type.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)
+          || (isIntegralSupported && isIntegralType(type.getTypeName()))) {
         String colName = ((ExprNodeColumnDesc)expr).getColumn();
         for (FieldSchema fs: tab.getPartCols()) {
           if (fs.getName().equals(colName)) {
-            if (fs.getType().equals(serdeConstants.STRING_TYPE_NAME)) {
+            if (fs.getType().equals(serdeConstants.STRING_TYPE_NAME)
+                || (isIntegralSupported && isIntegralType(fs.getType()))) {
               return null;
             }
-            return "Partition column " + fs.getName() + " is not string type";
+            return "Partition column " + fs.getName() + " is not string "
+              + (isIntegralSupported ? "or integral ": "") + "type: " + fs.getType();
           }
         }
         assert(false); // cannot find the partition column!
      } else {
-        return "Column " + expr.getExprString() + " is not string type";
+        return "Column " + expr.getExprString() + " is not string "
+          + (isIntegralSupported ? "or integral ": "") + "type: " + type.getTypeName();
      }
     } else if (expr instanceof ExprNodeGenericFuncDesc) {
       ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr;
@@ -2188,7 +2258,7 @@ public final class Utilities {
         if (!(child instanceof ExprNodeConstantDesc)) {
           allChildrenConstant = false;
         }
-        String message = checkJDOPushDown(tab, child);
+        String message = checkJDOPushDown(tab, child, func);
         if (message != null) {
           return message;
         }
@@ -2226,7 +2296,7 @@ public final class Utilities {
       try {
         MapredWork mapredWork = ((MapRedTask) task).getWork();
         Set<Class<? extends InputFormat>> reworkInputFormats = new HashSet<Class<? extends InputFormat>>();
-        for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) {
+        for (PartitionDesc part : mapredWork.getMapWork().getPathToPartitionInfo().values()) {
           Class<? extends InputFormat> inputFormatCls = part
               .getInputFileFormatClass();
           if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Jul 30 22:22:35 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
 import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -64,7 +65,6 @@ import org.apache.hadoop.hive.ql.exec.Pa
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
@@ -78,10 +78,12 @@ import org.apache.hadoop.hive.ql.io.OneN
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -107,7 +109,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.varia.NullAppender;
 
 /**
- * ExecDriver is the central class in co-ordinating execution of any map-reduce task. 
+ * ExecDriver is the central class in co-ordinating execution of any map-reduce task.
  * It's main responsabilities are:
  *
  * - Converting the plan (MapredWork) into a MR Job (JobConf)
@@ -201,13 +203,13 @@ public class ExecDriver extends Task<Map
    * @return true if fatal errors happened during job execution, false otherwise.
    */
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
-    for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+    for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
       if (op.checkFatalErrors(ctrs, errMsg)) {
         return true;
       }
     }
-    if (work.getReducer() != null) {
-      if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
+    if (work.getReduceWork() != null) {
+      if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) {
         return true;
       }
     }
@@ -216,18 +218,18 @@ public class ExecDriver extends Task<Map
 
   protected void createTmpDirs() throws IOException {
     // fix up outputs
-    Map<String, ArrayList<String>> pa = work.getPathToAliases();
+    Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
     if (pa != null) {
       List<Operator<? extends OperatorDesc>> opList =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
-      if (work.getReducer() != null) {
-        opList.add(work.getReducer());
+      if (work.getReduceWork() != null) {
+        opList.add(work.getReduceWork().getReducer());
       }
 
       for (List<String> ls : pa.values()) {
         for (String a : ls) {
-          opList.add(work.getAliasToWork().get(a));
+          opList.add(work.getMapWork().getAliasToWork().get(a));
 
           while (!opList.isEmpty()) {
             Operator<? extends OperatorDesc> op = opList.remove(0);
@@ -256,6 +258,7 @@ public class ExecDriver extends Task<Map
    /**
    * Execute a query plan using Hadoop.
    */
+  @SuppressWarnings("deprecation")
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -264,16 +267,14 @@ public class ExecDriver extends Task<Map
 
     boolean success = true;
 
-    String invalidReason = work.isInvalid();
-    if (invalidReason != null) {
-      throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
-    }
-
     Context ctx = driverContext.getCtx();
     boolean ctxCreated = false;
     String emptyScratchDirStr;
     Path emptyScratchDir;
 
+    MapWork mWork = work.getMapWork();
+    ReduceWork rWork = work.getReduceWork();
+
     try {
       if (ctx == null) {
         ctx = new Context(job);
@@ -322,27 +323,27 @@ public class ExecDriver extends Task<Map
       throw new RuntimeException(e.getMessage());
     }
 
-    if (work.getNumMapTasks() != null) {
-      job.setNumMapTasks(work.getNumMapTasks().intValue());
+    if (mWork.getNumMapTasks() != null) {
+      job.setNumMapTasks(mWork.getNumMapTasks().intValue());
     }
 
-    if (work.getMaxSplitSize() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
+    if (mWork.getMaxSplitSize() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, mWork.getMaxSplitSize().longValue());
     }
 
-    if (work.getMinSplitSize() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
+    if (mWork.getMinSplitSize() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, mWork.getMinSplitSize().longValue());
     }
 
-    if (work.getMinSplitSizePerNode() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
+    if (mWork.getMinSplitSizePerNode() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, mWork.getMinSplitSizePerNode().longValue());
     }
 
-    if (work.getMinSplitSizePerRack() != null) {
-      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
+    if (mWork.getMinSplitSizePerRack() != null) {
+      HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, mWork.getMinSplitSizePerRack().longValue());
     }
 
-    job.setNumReduceTasks(work.getNumReduceTasks().intValue());
+    job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
     job.setReducerClass(ExecReducer.class);
 
     // set input format information if necessary
@@ -359,7 +360,7 @@ public class ExecDriver extends Task<Map
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
     }
 
-    if (getWork().isUseBucketizedHiveInputFormat()) {
+    if (mWork.isUseBucketizedHiveInputFormat()) {
       inpFormat = BucketizedHiveInputFormat.class.getName();
     }
 
@@ -408,11 +409,11 @@ public class ExecDriver extends Task<Map
     }
 
     try{
-      MapredLocalWork localwork = work.getMapLocalWork();
+      MapredLocalWork localwork = mWork.getMapLocalWork();
       if (localwork != null) {
         if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
           Path localPath = new Path(localwork.getTmpFileURI());
-          Path hdfsPath = new Path(work.getTmpHDFSFileURI());
+          Path hdfsPath = new Path(mWork.getTmpHDFSFileURI());
 
           FileSystem hdfs = hdfsPath.getFileSystem(job);
           FileSystem localFS = localPath.getFileSystem(job);
@@ -450,17 +451,17 @@ public class ExecDriver extends Task<Map
         }
       }
       work.configureJobConf(job);
-      addInputPaths(job, work, emptyScratchDirStr, ctx);
+      addInputPaths(job, mWork, emptyScratchDirStr, ctx);
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
 
-      if (work.getSamplingType() > 0 && work.getNumReduceTasks() > 1) {
+      if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
         try {
-          handleSampling(driverContext, work, job, new HiveConf(conf));
+          handleSampling(driverContext, mWork, job, conf);
           job.setPartitionerClass(HiveTotalOrderPartitioner.class);
         } catch (Exception e) {
           console.printInfo("Not enough sampling data.. Rolling back to single reducer task");
-          work.setNumReduceTasks(1);
+          rWork.setNumReduceTasks(1);
           job.setNumReduceTasks(1);
         }
       }
@@ -475,7 +476,7 @@ public class ExecDriver extends Task<Map
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
 
-      if (work.isGatheringStats()) {
+      if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
         // initialize stats publishing table
         StatsPublisher statsPublisher;
         String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS);
@@ -517,7 +518,7 @@ public class ExecDriver extends Task<Map
       success = false;
       returnVal = 1;
     } finally {
-      Utilities.clearMapRedWork(job);
+      Utilities.clearWork(job);
       try {
         if (ctxCreated) {
           ctx.clear();
@@ -538,13 +539,13 @@ public class ExecDriver extends Task<Map
     try {
       if (rj != null) {
         JobCloseFeedBack feedBack = new JobCloseFeedBack();
-        if (work.getAliasToWork() != null) {
-          for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+        if (mWork.getAliasToWork() != null) {
+          for (Operator<? extends OperatorDesc> op : mWork.getAliasToWork().values()) {
             op.jobClose(job, success, feedBack);
           }
         }
-        if (work.getReducer() != null) {
-          work.getReducer().jobClose(job, success, feedBack);
+        if (rWork != null) {
+          rWork.getReducer().jobClose(job, success, feedBack);
         }
       }
     } catch (Exception e) {
@@ -562,7 +563,7 @@ public class ExecDriver extends Task<Map
 
   private boolean validateVectorPath() {
     LOG.debug("Validating if vectorized execution is applicable");
-    MapredWork thePlan = this.getWork();
+    MapWork thePlan = this.getWork().getMapWork();
 
     for (String path : thePlan.getPathToPartitionInfo().keySet()) {
       PartitionDesc pd = thePlan.getPathToPartitionInfo().get(path);
@@ -613,16 +614,16 @@ public class ExecDriver extends Task<Map
     }
   }
 
-  private void handleSampling(DriverContext context, MapredWork work, JobConf job, HiveConf conf)
+  private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf)
       throws Exception {
-    assert work.getAliasToWork().keySet().size() == 1;
+    assert mWork.getAliasToWork().keySet().size() == 1;
 
-    String alias = work.getAliases().get(0);
-    Operator<?> topOp = work.getAliasToWork().get(alias);
-    PartitionDesc partDesc = work.getAliasToPartnInfo().get(alias);
+    String alias = mWork.getAliases().get(0);
+    Operator<?> topOp = mWork.getAliasToWork().get(alias);
+    PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias);
 
-    ArrayList<String> paths = work.getPaths();
-    ArrayList<PartitionDesc> parts = work.getPartitionDescs();
+    ArrayList<String> paths = mWork.getPaths();
+    ArrayList<PartitionDesc> parts = mWork.getPartitionDescs();
 
     Path onePath = new Path(paths.get(0));
     String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
@@ -632,7 +633,7 @@ public class ExecDriver extends Task<Map
 
     PartitionKeySampler sampler = new PartitionKeySampler();
 
-    if (work.getSamplingType() == MapredWork.SAMPLING_ON_PREV_MR) {
+    if (mWork.getSamplingType() == MapWork.SAMPLING_ON_PREV_MR) {
       console.printInfo("Use sampling data created in previous MR");
       // merges sampling data from previous MR and make paritition keys for total sort
       for (String path : paths) {
@@ -642,7 +643,7 @@ public class ExecDriver extends Task<Map
           sampler.addSampleFile(status.getPath(), job);
         }
       }
-    } else if (work.getSamplingType() == MapredWork.SAMPLING_ON_START) {
+    } else if (mWork.getSamplingType() == MapWork.SAMPLING_ON_START) {
       console.printInfo("Creating sampling data..");
       assert topOp instanceof TableScanOperator;
       TableScanOperator ts = (TableScanOperator) topOp;
@@ -666,7 +667,7 @@ public class ExecDriver extends Task<Map
         fetcher.clearFetchContext();
       }
     } else {
-      throw new IllegalArgumentException("Invalid sampling type " + work.getSamplingType());
+      throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
     }
     sampler.writePartitionKeys(partitionFile, job);
   }
@@ -675,16 +676,17 @@ public class ExecDriver extends Task<Map
    * Set hive input format, and input format file if necessary.
    */
   protected void setInputAttributes(Configuration conf) {
-    if (work.getInputformat() != null) {
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
-    }
-    if (work.getIndexIntermediateFile() != null) {
-      conf.set("hive.index.compact.file", work.getIndexIntermediateFile());
-      conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile());
+    MapWork mWork = work.getMapWork();
+    if (mWork.getInputformat() != null) {
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+    }
+    if (mWork.getIndexIntermediateFile() != null) {
+      conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+      conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
     }
 
     // Intentionally overwrites anything the user may have put here
-    conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted());
+    conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
   }
 
   public boolean mapStarted() {
@@ -831,12 +833,12 @@ public class ExecDriver extends Task<Map
     int ret;
     if (localtask) {
       memoryMXBean = ManagementFactory.getMemoryMXBean();
-      MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf);
+      MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
       MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
       ret = ed.executeFromChildJVM(new DriverContext());
 
     } else {
-      MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
+      MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
       ExecDriver ed = new ExecDriver(plan, conf, isSilent);
       ret = ed.execute(new DriverContext());
     }
@@ -897,19 +899,19 @@ public class ExecDriver extends Task<Map
 
   @Override
   public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
-    return getWork().getAliasToWork().values();
+    return getWork().getMapWork().getAliasToWork().values();
   }
 
   @Override
   public boolean hasReduce() {
     MapredWork w = getWork();
-    return w.getReducer() != null;
+    return w.getReduceWork() != null;
   }
 
   /**
    * Handle a empty/null path for a given alias.
    */
-  private static int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir,
+  private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
       int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
     // either the directory does not exist or it is empty
     assert path == null || isEmptyPath;
@@ -993,7 +995,7 @@ public class ExecDriver extends Task<Map
     return numEmptyPaths;
   }
 
-  public static void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx)
+  public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
       throws Exception {
     int numEmptyPaths = 0;
 
@@ -1076,11 +1078,11 @@ public class ExecDriver extends Task<Map
 
   @Override
   public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
-    for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+    for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
       op.updateCounters(ctrs);
     }
-    if (work.getReducer() != null) {
-      work.getReducer().updateCounters(ctrs);
+    if (work.getReduceWork() != null) {
+      work.getReduceWork().getReducer().updateCounters(ctrs);
     }
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Jul 30 22:22:35 2013
@@ -30,11 +30,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -45,10 +45,10 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.StringUtils;
 
 /**
- * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is 
+ * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
  * the bridge between the map-reduce framework and the Hive operator pipeline at
  * execution time. It's main responsabilities are:
- * 
+ *
  * - Load and setup the operator pipeline from XML
  * - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
  * - Stop execution when the "limit" is reached
@@ -96,7 +96,7 @@ public class ExecMapper extends MapReduc
       jc = job;
       execContext.setJc(jc);
       // create map and fetch operators
-      MapredWork mrwork = Utilities.getMapRedWork(job);
+      MapWork mrwork = Utilities.getMapWork(job);
       mo = new MapOperator();
       mo.setConf(mrwork);
       // initialize map operator

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Jul 30 22:22:35 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
@@ -112,7 +112,7 @@ public class ExecReducer extends MapRedu
       l4j.info("cannot get classpath: " + e.getMessage());
     }
     jc = job;
-    MapredWork gWork = Utilities.getMapRedWork(job);
+    ReduceWork gWork = Utilities.getReduceWork(job);
     reducer = gWork.getReducer();
     reducer.setParentOperators(null); // clear out any parents as reducer is the
     // root

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Tue Jul 30 22:22:35 2013
@@ -40,8 +40,10 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
@@ -101,7 +103,7 @@ public class MapRedTask extends ExecDriv
           conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
 
         if (inputSummary == null) {
-          inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+          inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
         }
 
         // set the values of totalInputFileSize and totalInputNumFiles, estimating them
@@ -109,7 +111,7 @@ public class MapRedTask extends ExecDriv
         estimateInputSize();
 
         // at this point the number of reducers is precisely defined in the plan
-        int numReducers = work.getNumReduceTasks();
+        int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Task: " + getId() + ", Summary: " +
@@ -177,7 +179,7 @@ public class MapRedTask extends ExecDriv
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeMapRedWork(plan, out);
+      Utilities.serializeObject(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System
           .getProperty("test.silent")) ? "-nolog" : "";
@@ -383,26 +385,26 @@ public class MapRedTask extends ExecDriv
    * Set the number of reducers for the mapred work.
    */
   private void setNumberOfReducers() throws IOException {
+    ReduceWork rWork = work.getReduceWork();
     // this is a temporary hack to fix things that are not fixed in the compiler
-    Integer numReducersFromWork = work.getNumReduceTasks();
+    Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();
 
-    if (work.getReducer() == null) {
+    if (rWork == null) {
       console
           .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
-      work.setNumReduceTasks(Integer.valueOf(0));
     } else {
       if (numReducersFromWork >= 0) {
         console.printInfo("Number of reduce tasks determined at compile time: "
-            + work.getNumReduceTasks());
+            + rWork.getNumReduceTasks());
       } else if (job.getNumReduceTasks() > 0) {
         int reducers = job.getNumReduceTasks();
-        work.setNumReduceTasks(reducers);
+        rWork.setNumReduceTasks(reducers);
         console
             .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
             + reducers);
       } else {
         int reducers = estimateNumberOfReducers();
-        work.setNumReduceTasks(reducers);
+        rWork.setNumReduceTasks(reducers);
         console
             .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
             + reducers);
@@ -437,7 +439,7 @@ public class MapRedTask extends ExecDriv
 
     if(inputSummary == null) {
       // compute the summary and stash it away
-      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work, null);
+      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
     }
 
     // if all inputs are sampled, we should shrink the size of reducers accordingly.
@@ -459,7 +461,7 @@ public class MapRedTask extends ExecDriv
     // and the user has configured Hive to do this, make sure the number of reducers is a
     // power of two
     if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
-        work.isFinalMapRed() && !work.getBucketedColsByDirectory().isEmpty()) {
+        work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
 
       int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
       int reducersPowerTwo = (int)Math.pow(2, reducersLog);
@@ -497,11 +499,13 @@ public class MapRedTask extends ExecDriv
       return;
     }
 
+    MapWork mWork = work.getMapWork();
+
     // Initialize the values to be those taken from the input summary
     totalInputFileSize = inputSummary.getLength();
     totalInputNumFiles = inputSummary.getFileCount();
 
-    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+    if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
       // If percentage block sampling wasn't used, we don't need to do any estimation
       inputSizeEstimated = true;
       return;
@@ -510,10 +514,10 @@ public class MapRedTask extends ExecDriv
     // if all inputs are sampled, we should shrink the size of the input accordingly
     double highestSamplePercentage = 0;
     boolean allSample = false;
-    for (String alias : work.getAliasToWork().keySet()) {
-      if (work.getNameToSplitSample().containsKey(alias)) {
+    for (String alias : mWork.getAliasToWork().keySet()) {
+      if (mWork.getNameToSplitSample().containsKey(alias)) {
         allSample = true;
-        Double rate = work.getNameToSplitSample().get(alias).getPercent();
+        Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
         if (rate != null && rate > highestSamplePercentage) {
           highestSamplePercentage = rate;
         }
@@ -580,7 +584,7 @@ public class MapRedTask extends ExecDriv
 
   @Override
   public Operator<? extends OperatorDesc> getReducer() {
-    return getWork().getReducer();
+    return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer();
   }
 
   @Override

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Jul 30 22:22:35 2013
@@ -141,7 +141,7 @@ public class MapredLocalTask extends Tas
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredLocalWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeMapRedLocalWork(plan, out);
+      Utilities.serializeObject(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java Tue Jul 30 22:22:35 2013
@@ -28,10 +28,10 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -93,7 +93,7 @@ public class VectorExecMapper extends Ma
       mo.setChildren(job);
       l4j.info(mo.dump(0));
       // initialize map local work
-      localWork = mrwork.getMapLocalWork();
+      localWork = mrwork.getMapWork().getMapLocalWork();
       execContext.setLocalWork(localWork);
 
       mo.setExecContext(execContext);
@@ -205,7 +205,7 @@ public class VectorExecMapper extends Ma
       }
 
       if (fetchOperators != null) {
-        MapredLocalWork localWork = mo.getConf().getMapLocalWork();
+        MapredLocalWork localWork = mo.getConf().getMapWork().getMapLocalWork();
         for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
           Operator<? extends OperatorDesc> forwardOp = localWork
               .getAliasToWork().get(entry.getKey());

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Tue Jul 30 22:22:35 2013
@@ -33,8 +33,8 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -242,7 +242,7 @@ public class VectorMapOperator extends O
           throws HiveException,
       ClassNotFoundException, InstantiationException, IllegalAccessException,
       SerDeException {
-    PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
+    PartitionDesc pd = conf.getMapWork().getPathToPartitionInfo().get(onefile);
     LinkedHashMap<String, String> partSpec = pd.getPartSpec();
     // Use tblProps in case of unpartitioned tables
     Properties partProps =
@@ -337,8 +337,8 @@ public class VectorMapOperator extends O
     Set<TableDesc> identityConverterTableDesc = new HashSet<TableDesc>();
     try
     {
-      for (String onefile : conf.getPathToAliases().keySet()) {
-        PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
+      for (String onefile : conf.getMapWork().getPathToAliases().keySet()) {
+        PartitionDesc pd = conf.getMapWork().getPathToPartitionInfo().get(onefile);
         TableDesc tableDesc = pd.getTableDesc();
         Properties tblProps = tableDesc.getProperties();
         // If the partition does not exist, use table properties
@@ -416,7 +416,7 @@ public class VectorMapOperator extends O
         new HashMap<String, Operator<? extends OperatorDesc>>();
 
     try {
-      for (String onefile : conf.getPathToAliases().keySet()) {
+      for (String onefile : conf.getMapWork().getPathToAliases().keySet()) {
         MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI);
         //Create columnMap
         Map<String, Integer> columnMap = new HashMap<String, Integer>();
@@ -433,12 +433,12 @@ public class VectorMapOperator extends O
         }
 
         Path onepath = new Path(new Path(onefile).toUri().getPath());
-        List<String> aliases = conf.getPathToAliases().get(onefile);
+        List<String> aliases = conf.getMapWork().getPathToAliases().get(onefile);
 
         vectorizationContext  = new VectorizationContext(columnMap, columnCount);
 
         for (String onealias : aliases) {
-          Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(
+          Operator<? extends OperatorDesc> op = conf.getMapWork().getAliasToWork().get(
             onealias);
           LOG.info("Adding alias " + onealias + " to work list for file "
             + onefile);
@@ -640,14 +640,14 @@ public class VectorMapOperator extends O
     Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile()))
         .toUri().getPath());
 
-    for (String onefile : conf.getPathToAliases().keySet()) {
+    for (String onefile : conf.getMapWork().getPathToAliases().keySet()) {
       Path onepath = new Path(new Path(onefile).toUri().getPath());
       // check for the operators who will process rows coming to this Map
       // Operator
       if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
-        String onealias = conf.getPathToAliases().get(onefile).get(0);
+        String onealias = conf.getMapWork().getPathToAliases().get(onefile).get(0);
         Operator<? extends OperatorDesc> op =
-            conf.getAliasToWork().get(onealias);
+            conf.getMapWork().getAliasToWork().get(onealias);
 
         LOG.info("Processing alias " + onealias + " for file " + onefile);
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Jul 30 22:22:35 2013
@@ -118,7 +118,7 @@ public class VectorizedRowBatchCtx {
       IllegalAccessException, HiveException {
 
     Map<String, PartitionDesc> pathToPartitionInfo = Utilities
-        .getMapRedWork(hiveConf).getPathToPartitionInfo();
+        .getMapRedWork(hiveConf).getMapWork().getPathToPartitionInfo();
 
     PartitionDesc part = HiveFileFormatUtils
         .getPartitionDescFromPathRecursively(pathToPartitionInfo,

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Tue Jul 30 22:22:35 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -188,7 +189,7 @@ public class CompactIndexHandler extends
 
     if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
       // For now, only works if the predicate is a single condition
-      MapredWork work = null;
+      MapWork work = null;
       String originalInputFormat = null;
       for (Task task : driver.getPlan().getRootTasks()) {
         // The index query should have one and only one map reduce task in the root tasks
@@ -202,7 +203,9 @@ public class CompactIndexHandler extends
             work.setInputFormatSorted(false);
             break;
           }
-          work = (MapredWork)task.getWork();
+          if (task.getWork() != null) {
+            work = ((MapredWork)task.getWork()).getMapWork();
+          }
           String inputFormat = work.getInputformat();
           originalInputFormat = inputFormat;
           if (inputFormat == null) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Tue Jul 30 22:22:35 2013
@@ -95,7 +95,7 @@ public class CombineHiveInputFormat<K ex
       this.inputSplitShim = inputSplitShim;
       if (job != null) {
         Map<String, PartitionDesc> pathToPartitionInfo = Utilities
-            .getMapRedWork(job).getPathToPartitionInfo();
+            .getMapWork(job).getPathToPartitionInfo();
 
         // extract all the inputFormatClass names for each chunk in the
         // CombinedSplit.
@@ -200,7 +200,7 @@ public class CombineHiveInputFormat<K ex
 
       if (inputFormatClassName == null) {
         Map<String, PartitionDesc> pathToPartitionInfo = Utilities
-            .getMapRedWork(getJob()).getPathToPartitionInfo();
+            .getMapWork(getJob()).getPathToPartitionInfo();
 
         // extract all the inputFormatClass names for each chunk in the
         // CombinedSplit.

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Jul 30 22:22:35 2013
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -249,10 +249,10 @@ public class HiveInputFormat<K extends W
   }
 
   protected Map<String, PartitionDesc> pathToPartitionInfo;
-  MapredWork mrwork = null;
+  MapWork mrwork = null;
 
   protected void init(JobConf job) {
-    mrwork = Utilities.getMapRedWork(job);
+    mrwork = Utilities.getMapWork(job);
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java Tue Jul 30 22:22:35 2013
@@ -37,10 +37,10 @@ import org.apache.hadoop.mapred.TextInpu
 public class SymbolicInputFormat implements ReworkMapredInputFormat {
 
   public void rework(HiveConf job, MapredWork work) throws IOException {
-    Map<String, PartitionDesc> pathToParts = work.getPathToPartitionInfo();
+    Map<String, PartitionDesc> pathToParts = work.getMapWork().getPathToPartitionInfo();
     List<String> toRemovePaths = new ArrayList<String>();
     Map<String, PartitionDesc> toAddPathToPart = new HashMap<String, PartitionDesc>();
-    Map<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    Map<String, ArrayList<String>> pathToAliases = work.getMapWork().getPathToAliases();
 
     for (Map.Entry<String, PartitionDesc> pathPartEntry : pathToParts
         .entrySet()) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java Tue Jul 30 22:22:35 2013
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hive.ql.io.avro;
 
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
@@ -29,18 +33,17 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.*;
-
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 
 /**
  * RecordReader optimized against Avro GenericRecords that returns to record
@@ -67,7 +70,9 @@ public class AvroGenericRecordReader imp
 
     GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>();
 
-    if(latest != null) gdr.setExpected(latest);
+    if(latest != null) {
+      gdr.setExpected(latest);
+    }
 
     this.reader = new DataFileReader<GenericRecord>(new FsInput(split.getPath(), job), gdr);
     this.reader.sync(split.getStart());
@@ -86,11 +91,11 @@ public class AvroGenericRecordReader imp
     FileSystem fs = split.getPath().getFileSystem(job);
     // Inside of a MR job, we can pull out the actual properties
     if(AvroSerdeUtils.insideMRJob(job)) {
-      MapredWork mapRedWork = Utilities.getMapRedWork(job);
+      MapWork mapWork = Utilities.getMapWork(job);
 
       // Iterate over the Path -> Partition descriptions to find the partition
       // that matches our input split.
-      for (Map.Entry<String,PartitionDesc> pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){
+      for (Map.Entry<String,PartitionDesc> pathsAndParts: mapWork.getPathToPartitionInfo().entrySet()){
         String partitionPath = pathsAndParts.getKey();
         if(pathIsInPartition(split.getPath(), partitionPath)) {
           if(LOG.isInfoEnabled()) {
@@ -101,11 +106,15 @@ public class AvroGenericRecordReader imp
           Properties props = pathsAndParts.getValue().getProperties();
           if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) {
             return AvroSerdeUtils.determineSchemaOrThrowException(props);
-          } else
+          }
+          else {
             return null; // If it's not in this property, it won't be in any others
+          }
         }
       }
-      if(LOG.isInfoEnabled()) LOG.info("Unable to match filesplit " + split + " with a partition.");
+      if(LOG.isInfoEnabled()) {
+        LOG.info("Unable to match filesplit " + split + " with a partition.");
+      }
     }
 
     // In "select * from table" situations (non-MR), we can add things to the job

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java Tue Jul 30 22:22:35 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 
@@ -543,6 +544,97 @@ class ColumnStatisticsImpl implements Co
     }
   }
 
+  private static final class DateStatisticsImpl extends ColumnStatisticsImpl
+      implements DateColumnStatistics {
+    private DateWritable minimum = null;
+    private DateWritable maximum = null;
+
+    DateStatisticsImpl() {
+    }
+
+    DateStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+      OrcProto.DateStatistics dateStats = stats.getDateStatistics();
+      // min,max values serialized/deserialized as int (days since epoch)
+      if (dateStats.hasMaximum()) {
+        maximum = new DateWritable(dateStats.getMaximum());
+      }
+      if (dateStats.hasMinimum()) {
+        minimum = new DateWritable(dateStats.getMinimum());
+      }
+    }
+
+    @Override
+    void reset() {
+      super.reset();
+      minimum = null;
+      maximum = null;
+    }
+
+    @Override
+    void updateDate(DateWritable value) {
+      if (minimum == null) {
+        minimum = value;
+        maximum = value;
+      } else if (minimum.compareTo(value) > 0) {
+        minimum = value;
+      } else if (maximum.compareTo(value) < 0) {
+        maximum = value;
+      }
+    }
+
+    @Override
+    void merge(ColumnStatisticsImpl other) {
+      super.merge(other);
+      DateStatisticsImpl dateStats = (DateStatisticsImpl) other;
+      if (minimum == null) {
+        minimum = dateStats.minimum;
+        maximum = dateStats.maximum;
+      } else if (dateStats.minimum != null) {
+        if (minimum.compareTo(dateStats.minimum) > 0) {
+          minimum = dateStats.minimum;
+        } else if (maximum.compareTo(dateStats.maximum) < 0) {
+          maximum = dateStats.maximum;
+        }
+      }
+    }
+
+    @Override
+    OrcProto.ColumnStatistics.Builder serialize() {
+      OrcProto.ColumnStatistics.Builder result = super.serialize();
+      OrcProto.DateStatistics.Builder dateStats =
+          OrcProto.DateStatistics.newBuilder();
+      if (getNumberOfValues() != 0) {
+        dateStats.setMinimum(minimum.getDays());
+        dateStats.setMaximum(maximum.getDays());
+      }
+      result.setDateStatistics(dateStats);
+      return result;
+    }
+
+    @Override
+    public DateWritable getMinimum() {
+      return minimum;
+    }
+
+    @Override
+    public DateWritable getMaximum() {
+      return maximum;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder(super.toString());
+      if (getNumberOfValues() != 0) {
+        buf.append(" min: ");
+        buf.append(minimum);
+        buf.append(" max: ");
+        buf.append(maximum);
+      }
+      return buf.toString();
+    }
+  }
+
   private long count = 0;
 
   ColumnStatisticsImpl(OrcProto.ColumnStatistics stats) {
@@ -578,6 +670,10 @@ class ColumnStatisticsImpl implements Co
     throw new UnsupportedOperationException("Can't update decimal");
   }
 
+  void updateDate(DateWritable value) {
+    throw new UnsupportedOperationException("Can't update date");
+  }
+
   void merge(ColumnStatisticsImpl stats) {
     count += stats.count;
   }
@@ -621,6 +717,8 @@ class ColumnStatisticsImpl implements Co
             return new StringStatisticsImpl();
           case DECIMAL:
             return new DecimalStatisticsImpl();
+          case DATE:
+            return new DateStatisticsImpl();
           default:
             return new ColumnStatisticsImpl();
         }
@@ -640,6 +738,8 @@ class ColumnStatisticsImpl implements Co
       return new StringStatisticsImpl(stats);
     } else if (stats.hasDecimalStatistics()) {
       return new DecimalStatisticsImpl(stats);
+    } else if (stats.hasDateStatistics()) {
+      return new DateStatisticsImpl(stats);
     } else {
       return new ColumnStatisticsImpl(stats);
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Tue Jul 30 22:22:35 2013
@@ -475,6 +475,8 @@ final class OrcStruct implements Writabl
             return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
           case TIMESTAMP:
             return PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+          case DATE:
+            return PrimitiveObjectInspectorFactory.javaDateObjectInspector;
           case DECIMAL:
             return PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector;
           default:
@@ -519,6 +521,8 @@ final class OrcStruct implements Writabl
         return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
       case TIMESTAMP:
         return PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+      case DATE:
+        return PrimitiveObjectInspectorFactory.javaDateObjectInspector;
       case DECIMAL:
         return PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector;
       case STRUCT:

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Jul 30 22:22:35 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.io.BooleanWritable;
@@ -862,6 +864,50 @@ class RecordReaderImpl implements Record
     }
   }
 
+  private static class DateTreeReader extends TreeReader{
+    private RunLengthIntegerReader reader = null;
+
+    DateTreeReader(Path path, int columnId) {
+      super(path, columnId);
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     List<OrcProto.ColumnEncoding> encodings
+                    ) throws IOException {
+      super.startStripe(streams, encodings);
+      StreamName name = new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA);
+      reader = new RunLengthIntegerReader(streams.get(name), true);
+    }
+
+    @Override
+    void seek(PositionProvider[] index) throws IOException {
+      super.seek(index);
+      reader.seek(index[columnId]);
+    }
+
+    @Override
+    Object next(Object previous) throws IOException {
+      super.next(previous);
+      Date result = null;
+      if (valuePresent) {
+        if (previous == null) {
+          result = new Date(0);
+        } else {
+          result = (Date) previous;
+        }
+        result.setTime(DateWritable.daysToMillis((int) reader.next()));
+      }
+      return result;
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      reader.skip(countNonNulls(items));
+    }
+  }
+
   private static class DecimalTreeReader extends TreeReader{
     private InStream valueStream;
     private RunLengthIntegerReader scaleStream;
@@ -1453,6 +1499,8 @@ class RecordReaderImpl implements Record
         return new BinaryTreeReader(path, columnId);
       case TIMESTAMP:
         return new TimestampTreeReader(path, columnId);
+      case DATE:
+        return new DateTreeReader(path, columnId);
       case DECIMAL:
         return new DecimalTreeReader(path, columnId);
       case STRUCT:

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Jul 30 22:22:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -41,12 +42,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -988,6 +990,46 @@ class WriterImpl implements Writer, Memo
     }
   }
 
+  private static class DateTreeWriter extends TreeWriter {
+    private final RunLengthIntegerWriter writer;
+
+    DateTreeWriter(int columnId,
+                   ObjectInspector inspector,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, inspector, writer, nullable);
+      PositionedOutputStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.writer = new RunLengthIntegerWriter(out, true);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void write(Object obj) throws IOException {
+      super.write(obj);
+      if (obj != null) {
+        // Using the Writable here as it's used directly for writing as well as for stats.
+        DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
+        indexStatistics.updateDate(val);
+        writer.write(val.getDays());
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
   private static class DecimalTreeWriter extends TreeWriter {
     private final PositionedOutputStream valueStream;
     private final RunLengthIntegerWriter scaleStream;
@@ -1262,6 +1304,9 @@ class WriterImpl implements Writer, Memo
           case TIMESTAMP:
             return new TimestampTreeWriter(streamFactory.getNextColumnId(),
                 inspector, streamFactory, nullable);
+          case DATE:
+            return new DateTreeWriter(streamFactory.getNextColumnId(),
+                inspector, streamFactory, nullable);
           case DECIMAL:
             return new DecimalTreeWriter(streamFactory.getNextColumnId(),
                 inspector, streamFactory,  nullable);
@@ -1324,6 +1369,9 @@ class WriterImpl implements Writer, Memo
           case TIMESTAMP:
             type.setKind(OrcProto.Type.Kind.TIMESTAMP);
             break;
+          case DATE:
+            type.setKind(OrcProto.Type.Kind.DATE);
+            break;
           case DECIMAL:
             type.setKind(OrcProto.Type.Kind.DECIMAL);
             break;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Tue Jul 30 22:22:35 2013
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Tue Jul 30 22:22:35 2013
@@ -33,14 +33,14 @@ import org.apache.hadoop.hive.ql.io.Comb
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.Mapper;
 
 @Explain(displayName = "Block level merge")
-public class MergeWork extends MapredWork implements Serializable {
+public class MergeWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -70,9 +70,6 @@ public class MergeWork extends MapredWor
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     for(String path: this.inputPaths) {
       this.getPathToPartitionInfo().put(path, partDesc);
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Tue Jul 30 22:22:35 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -181,7 +182,9 @@ public class PartialScanTask extends Tas
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      MapredWork mrWork = new MapredWork();
+      mrWork.setMapWork(work);
+      Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs