You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/31 00:22:46 UTC

svn commit: r1508669 [5/39] - in /hive/branches/vectorization: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/test/results/clientpositive/ data/files/ eclipse-templates/ hcatalog/build-support/ant/ hcatalog/core/src/main/java/org/apa...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java Tue Jul 30 22:22:35 2013
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
 import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.Mapper;
 
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapred.Mapper;
  *
  */
 @Explain(displayName = "Partial Scan Statistics")
-public class PartialScanWork extends MapredWork implements Serializable {
+public class PartialScanWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -52,9 +52,6 @@ public class PartialScanWork extends Map
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     for(String path: this.inputPaths) {
       this.getPathToPartitionInfo().put(path, partDesc);
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java Tue Jul 30 22:22:35 2013
@@ -76,7 +76,7 @@ public class ColumnTruncateMapper extend
   @Override
   public void configure(JobConf job) {
     jc = job;
-    work = (ColumnTruncateWork) Utilities.getMapRedWork(job);
+    work = (ColumnTruncateWork) Utilities.getMapWork(job);
 
     String specPath = work.getOutputDir();
     Path tmpPath = Utilities.toTempPath(specPath);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java Tue Jul 30 22:22:35 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.Throttle;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -165,7 +166,9 @@ public class ColumnTruncateTask extends 
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      MapredWork mrWork = new MapredWork();
+      mrWork.setMapWork(work);
+      Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java Tue Jul 30 22:22:35 2013
@@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.io.rcfi
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.Mapper;
 
 @Explain(displayName = "Column Truncate")
-public class ColumnTruncateWork extends MapredWork implements Serializable {
+public class ColumnTruncateWork extends MapWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -64,9 +64,6 @@ public class ColumnTruncateWork extends 
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
-    if(this.getNumReduceTasks() == null) {
-      this.setNumReduceTasks(0);
-    }
     this.getPathToPartitionInfo().put(inputDir, partDesc);
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Jul 30 22:22:35 2013
@@ -2114,12 +2114,75 @@ private void constructOneLBLocationMap(F
     return false;
   }
 
+  //it is assumed that parent directory of the destf should already exist when this
+  //method is called. when the replace value is true, this method works a little different
+  //from mv command if the destf is a directory, it replaces the destf instead of moving under
+  //the destf. in this case, the replaced destf still preserves the original destf's permission
+  static protected boolean renameFile(HiveConf conf, Path srcf, Path destf, FileSystem fs,
+      boolean replace) throws HiveException {
+    boolean success = false;
+    boolean inheritPerms = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+    String group = null;
+    String permission = null;
+
+    try {
+      if (inheritPerms || replace) {
+        try{
+          FileStatus deststatus = fs.getFileStatus(destf);
+          if (inheritPerms) {
+            group = deststatus.getGroup();
+            permission= Integer.toString(deststatus.getPermission().toShort(), 8);
+          }
+          //if destf is an existing directory:
+          //if replace is true, delete followed by rename(mv) is equivalent to replace
+          //if replace is false, rename (mv) actually move the src under dest dir
+          //if destf is an existing file, rename is actually a replace, and do not need
+          // to delete the file first
+          if (replace && deststatus.isDir()) {
+            fs.delete(destf, true);
+          }
+        } catch (FileNotFoundException ignore) {
+          //if dest dir does not exist, any re
+          if (inheritPerms) {
+            FileStatus deststatus = fs.getFileStatus(destf.getParent());
+            group = deststatus.getGroup();
+            permission= Integer.toString(deststatus.getPermission().toShort(), 8);
+          }
+        }
+      }
+      success = fs.rename(srcf, destf);
+      LOG.debug((replace ? "Replacing src:" : "Renaming src:") + srcf.toString()
+          + ";dest: " + destf.toString()  + ";Status:" + success);
+    } catch (IOException ioe) {
+      throw new HiveException("Unable to move source" + srcf + " to destination " + destf, ioe);
+    }
+
+    if (success && inheritPerms) {
+      //use FsShell to change group and permissions recursively
+      try {
+        FsShell fshell = new FsShell();
+        fshell.setConf(conf);
+        fshell.run(new String[]{"-chgrp", "-R", group, destf.toString()});
+        fshell.run(new String[]{"-chmod", "-R", permission, destf.toString()});
+      } catch (Exception e) {
+        throw new HiveException("Unable to set permissions of " + destf, e);
+      }
+    }
+    return success;
+  }
+
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs)
       throws HiveException {
+    boolean inheritPerms = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
       // create the destination if it does not exist
       if (!fs.exists(destf)) {
         fs.mkdirs(destf);
+        if (inheritPerms) {
+          fs.setPermission(destf, fs.getFileStatus(destf.getParent()).getPermission());
+        }
       }
     } catch (IOException e) {
       throw new HiveException(
@@ -2146,7 +2209,7 @@ private void constructOneLBLocationMap(F
     try {
       for (List<Path[]> sdpairs : result) {
         for (Path[] sdpair : sdpairs) {
-          if (!fs.rename(sdpair[0], sdpair[1])) {
+          if (!renameFile(conf, sdpair[0], sdpair[1], fs, false)) {
             throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]);
           }
         }
@@ -2175,6 +2238,8 @@ private void constructOneLBLocationMap(F
       throws HiveException {
     try {
       FileSystem fs = srcf.getFileSystem(conf);
+      boolean inheritPerms = HiveConf.getBoolVar(conf,
+          HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
 
       // check if srcf contains nested sub-directories
       FileStatus[] srcs;
@@ -2189,8 +2254,10 @@ private void constructOneLBLocationMap(F
       }
       List<List<Path[]>> result = checkPaths(conf, fs, srcs, destf, true);
 
-      // point of no return -- delete oldPath
-      if (oldPath != null) {
+      // point of no return -- delete oldPath only if it is not same as destf,
+      // otherwise, the oldPath/destf will be cleaned later just before move
+      if (oldPath != null && (!destf.getFileSystem(conf).equals(oldPath.getFileSystem(conf))
+          || !destf.equals(oldPath))) {
         try {
           FileSystem fs2 = oldPath.getFileSystem(conf);
           if (fs2.exists(oldPath)) {
@@ -2208,27 +2275,30 @@ private void constructOneLBLocationMap(F
       // rename src directory to destf
       if (srcs.length == 1 && srcs[0].isDir()) {
         // rename can fail if the parent doesn't exist
-        if (!fs.exists(destf.getParent())) {
-          fs.mkdirs(destf.getParent());
-        }
-        if (fs.exists(destf)) {
-          fs.delete(destf, true);
+        Path destfp = destf.getParent();
+        if (!fs.exists(destfp)) {
+          boolean success = fs.mkdirs(destfp);
+          if (inheritPerms && success) {
+            fs.setPermission(destfp, fs.getFileStatus(destfp.getParent()).getPermission());
+          }
         }
 
-        boolean b = fs.rename(srcs[0].getPath(), destf);
+        boolean b = renameFile(conf, srcs[0].getPath(), destf, fs, true);
         if (!b) {
           throw new HiveException("Unable to move results from " + srcs[0].getPath()
               + " to destination directory: " + destf);
         }
-        LOG.debug("Renaming:" + srcf.toString() + " to " + destf.toString()  + ",Status:" + b);
       } else { // srcf is a file or pattern containing wildcards
         if (!fs.exists(destf)) {
-          fs.mkdirs(destf);
+          boolean success = fs.mkdirs(destf);
+          if (inheritPerms && success) {
+            fs.setPermission(destf, fs.getFileStatus(destf.getParent()).getPermission());
+          }
         }
         // srcs must be a list of files -- ensured by LoadSemanticAnalyzer
         for (List<Path[]> sdpairs : result) {
           for (Path[] sdpair : sdpairs) {
-            if (!fs.rename(sdpair[0], sdpair[1])) {
+            if (!renameFile(conf, sdpair[0], sdpair[1], fs, true)) {
               throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]);
             }
           }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java Tue Jul 30 22:22:35 2013
@@ -119,27 +119,27 @@ class TextMetaDataFormatter implements M
                               boolean isFormatted, boolean isExt, boolean isPretty)
          throws HiveException {
         try {
+          String output;
           if (colPath.equals(tableName)) {
             List<FieldSchema> partCols = tbl.isPartitioned() ? tbl.getPartCols() : null;
-            outStream.writeBytes(
-              isPretty ?
-                  MetaDataPrettyFormatUtils.getAllColumnsInformation(
-                      cols, partCols, prettyOutputNumCols)
+            output = isPretty ?
+                MetaDataPrettyFormatUtils.getAllColumnsInformation(
+                    cols, partCols, prettyOutputNumCols)
                 :
-                  MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted)
-              );
+                MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted);
           } else {
-            outStream.writeBytes(
-                MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted));
+            output = MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted);
           }
+          outStream.write(output.getBytes());
 
           if (tableName.equals(colPath)) {
             if (isFormatted) {
               if (part != null) {
-                outStream.writeBytes(MetaDataFormatUtils.getPartitionInformation(part));
+                output = MetaDataFormatUtils.getPartitionInformation(part);
               } else {
-                outStream.writeBytes(MetaDataFormatUtils.getTableInformation(tbl));
+                output = MetaDataFormatUtils.getTableInformation(tbl);
               }
+              outStream.write(output.getBytes());
             }
 
           // if extended desc table then show the complete details of the table
@@ -150,7 +150,7 @@ class TextMetaDataFormatter implements M
                 // show partition information
                 outStream.writeBytes("Detailed Partition Information");
                 outStream.write(separator);
-                outStream.writeBytes(part.getTPartition().toString());
+                outStream.write(part.getTPartition().toString().getBytes());
                 outStream.write(separator);
                 // comment column is empty
                 outStream.write(terminator);
@@ -158,7 +158,7 @@ class TextMetaDataFormatter implements M
                 // show table information
                 outStream.writeBytes("Detailed Table Information");
                 outStream.write(separator);
-                outStream.writeBytes(tbl.getTTable().toString());
+                outStream.write(tbl.getTTable().toString().getBytes());
                 outStream.write(separator);
                 outStream.write(terminator);
               }
@@ -444,7 +444,7 @@ class TextMetaDataFormatter implements M
             outStream.writeBytes(database);
             outStream.write(separator);
             if (comment != null) {
-              outStream.writeBytes(comment);
+              outStream.write(comment.getBytes());
             }
             outStream.write(separator);
             if (location != null) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Tue Jul 30 22:22:35 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Co
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -144,10 +145,10 @@ public class GenMRFileSink1 implements N
               // or for a map-reduce job
               MapredWork currWork = (MapredWork) currTask.getWork();
               boolean mergeMapOnly =
-                  hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReducer() == null;
+                  hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null;
               boolean mergeMapRed =
                   hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
-                      currWork.getReducer() != null;
+                      currWork.getReduceWork() != null;
               if (mergeMapOnly || mergeMapRed) {
                 chDir = true;
               }
@@ -239,7 +240,10 @@ public class GenMRFileSink1 implements N
 
     // mark the MapredWork and FileSinkOperator for gathering stats
     nd.getConf().setGatherStats(true);
-    mrWork.setGatheringStats(true);
+    mrWork.getMapWork().setGatheringStats(true);
+    if (mrWork.getReduceWork() != null) {
+      mrWork.getReduceWork().setGatheringStats(true);
+    }
     nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
     nd.getConf().setMaxStatsKeyPrefixLength(
         hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
@@ -345,7 +349,8 @@ public class GenMRFileSink1 implements N
     //
     MoveWork dummyMv = new MoveWork(null, null, null,
         new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
-    MapredWork cplan;
+    MapWork cplan;
+    Serializable work;
 
     if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
         fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
@@ -358,6 +363,7 @@ public class GenMRFileSink1 implements N
         LOG.info("RCFile format- Using block level merge");
         cplan = createRCFileMergeTask(fsInputDesc, finalName,
             dpCtx != null && dpCtx.getNumDPCols() > 0);
+        work = cplan;
       } catch (ClassNotFoundException e) {
         String msg = "Illegal input format class: " + inputFormatClass;
         throw new SemanticException(msg);
@@ -365,12 +371,14 @@ public class GenMRFileSink1 implements N
 
     } else {
       cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+      work = new MapredWork();
+      ((MapredWork)work).setMapWork(cplan);
       // use CombineHiveInputFormat for map-only merging
     }
     cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
     // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
     // know if merge MR2 will be triggered at execution time
-    ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, cplan,
+    ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work,
         fsInputDesc.getFinalDirName());
 
     // keep the dynamic partition context in conditional task resolver context
@@ -471,7 +479,7 @@ public class GenMRFileSink1 implements N
    *          the last FileSinkOperator in the parent MapReduce work
    * @return the MapredWork
    */
-  private MapredWork createMRWorkForMergingFiles (HiveConf conf,
+  private MapWork createMRWorkForMergingFiles (HiveConf conf,
     Operator<? extends OperatorDesc> topOp,  FileSinkDesc fsDesc) {
 
     ArrayList<String> aliases = new ArrayList<String>();
@@ -480,10 +488,10 @@ public class GenMRFileSink1 implements N
     aliases.add(inputDir); // dummy alias: just use the input path
 
     // constructing the default MapredWork
-    MapredWork cplan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+    MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+    MapWork cplan = cMrPlan.getMapWork();
     cplan.getPathToAliases().put(inputDir, aliases);
     cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
-    cplan.setNumReduceTasks(0);
     cplan.getAliasToWork().put(inputDir, topOp);
     cplan.setMapperCannotSpanPartns(true);
 
@@ -498,7 +506,7 @@ public class GenMRFileSink1 implements N
    * @return MergeWork if table is stored as RCFile,
    *         null otherwise
    */
-  private MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+  private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
       String finalName, boolean hasDynamicPartitions) throws SemanticException {
 
     String inputDir = fsInputDesc.getFinalDirName();
@@ -561,7 +569,7 @@ public class GenMRFileSink1 implements N
    */
   private ConditionalTask createCondTask(HiveConf conf,
       Task<? extends Serializable> currTask, MoveWork mvWork,
-      MapredWork mergeWork, String inputPath) {
+      Serializable mergeWork, String inputPath) {
 
     // There are 3 options for this ConditionalTask:
     // 1) Merge the partitions

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Tue Jul 30 22:22:35 2013
@@ -77,7 +77,7 @@ public class GenMRRedSink1 implements No
 
     // If the plan for this reducer does not exist, initialize the plan
     if (oldTask == null) {
-      if (currPlan.getReducer() == null) {
+      if (currPlan.getReduceWork() == null) {
         GenMapRedUtils.initPlan(op, ctx);
       } else {
         GenMapRedUtils.splitPlan(op, ctx);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Tue Jul 30 22:22:35 2013
@@ -85,13 +85,13 @@ public class GenMRRedSink3 implements No
     // If the plan for this reducer does not exist, initialize the plan
     if (reducerTask == null) {
       // When the reducer is encountered for the first time
-      if (plan.getReducer() == null) {
+      if (plan.getReduceWork() == null) {
         GenMapRedUtils.initUnionPlan(op, union, ctx, unionTask);
         // When union is followed by a multi-table insert
       } else {
         GenMapRedUtils.splitPlan(op, ctx);
       }
-    } else if (plan.getReducer() == reducer) {
+    } else if (plan.getReduceWork() != null && plan.getReduceWork().getReducer() == reducer) {
       // The union is already initialized. However, the union is walked from
       // another input
       // initUnionPlan is idempotent

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Tue Jul 30 22:22:35 2013
@@ -117,7 +117,10 @@ public class GenMRTableScan1 implements 
             handlePartialScanCommand(op, ctx, parseCtx, currTask, parseInfo, statsWork, statsTask);
           }
 
-          currWork.setGatheringStats(true);
+          currWork.getMapWork().setGatheringStats(true);
+          if (currWork.getReduceWork() != null) {
+            currWork.getReduceWork().setGatheringStats(true);
+          }
           // NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list,
           // and pass it to setTaskPlan as the last parameter
           Set<Partition> confirmedPartns = new HashSet<Partition>();

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Jul 30 22:22:35 2013
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -61,12 +62,15 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 
@@ -81,9 +85,9 @@ public final class GenMapRedUtils {
     LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
   }
 
-  private static boolean needsTagging(Operator<? extends OperatorDesc> reducer) {
-    return (reducer.getClass() == JoinOperator.class ||
-        reducer.getClass() == DemuxOperator.class);
+  private static boolean needsTagging(ReduceWork rWork) {
+    return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class ||
+         rWork.getReducer().getClass() == DemuxOperator.class);
   }
   /**
    * Initialize the current plan by adding it to root tasks.
@@ -106,13 +110,14 @@ public final class GenMapRedUtils {
     Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     opTaskMap.put(reducer, currTask);
-    plan.setReducer(reducer);
+    plan.setReduceWork(new ReduceWork());
+    plan.getReduceWork().setReducer(reducer);
     ReduceSinkDesc desc = op.getConf();
 
-    plan.setNumReduceTasks(desc.getNumReducers());
+    plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
 
-    if (needsTagging(reducer)) {
-      plan.setNeedsTagging(true);
+    if (needsTagging(plan.getReduceWork())) {
+      plan.getReduceWork().setNeedsTagging(true);
     }
 
     assert currTopOp != null;
@@ -149,13 +154,16 @@ public final class GenMapRedUtils {
         opProcCtx.getOpTaskMap();
 
     opTaskMap.put(reducer, unionTask);
-    plan.setReducer(reducer);
+
+    plan.setReduceWork(new ReduceWork());
+    plan.getReduceWork().setReducer(reducer);
+    plan.getReduceWork().setReducer(reducer);
     ReduceSinkDesc desc = op.getConf();
 
-    plan.setNumReduceTasks(desc.getNumReducers());
+    plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
 
-    if (needsTagging(reducer)) {
-      plan.setNeedsTagging(true);
+    if (needsTagging(plan.getReduceWork())) {
+      plan.getReduceWork().setNeedsTagging(true);
     }
 
     initUnionPlan(opProcCtx, currUnionOp, unionTask, false);
@@ -189,13 +197,14 @@ public final class GenMapRedUtils {
         for (int pos = 0; pos < size; pos++) {
           String taskTmpDir = taskTmpDirLst.get(pos);
           TableDesc tt_desc = tt_descLst.get(pos);
-          if (plan.getPathToAliases().get(taskTmpDir) == null) {
-            plan.getPathToAliases().put(taskTmpDir,
+          MapWork mWork = plan.getMapWork();
+          if (mWork.getPathToAliases().get(taskTmpDir) == null) {
+            mWork.getPathToAliases().put(taskTmpDir,
                 new ArrayList<String>());
-            plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-            plan.getPathToPartitionInfo().put(taskTmpDir,
+            mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
+            mWork.getPathToPartitionInfo().put(taskTmpDir,
                 new PartitionDesc(tt_desc, null));
-            plan.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
+            mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
           }
         }
       }
@@ -304,7 +313,8 @@ public final class GenMapRedUtils {
     }
 
     if (oldTask instanceof MapRedTask && currTask instanceof MapRedTask) {
-      ((MapRedTask)currTask).getWork().mergingInto(((MapRedTask) oldTask).getWork());
+      ((MapRedTask)currTask).getWork().getMapWork()
+        .mergingInto(((MapRedTask) oldTask).getWork().getMapWork());
     }
 
     opProcCtx.setCurrTopOp(null);
@@ -357,9 +367,11 @@ public final class GenMapRedUtils {
     Operator<? extends OperatorDesc> reducer = cRS.getChildOperators().get(0);
 
     // Add the reducer
-    childPlan.setReducer(reducer);
+    ReduceWork rWork = new ReduceWork();
+    childPlan.setReduceWork(rWork);
+    rWork.setReducer(reducer);
     ReduceSinkDesc desc = cRS.getConf();
-    childPlan.setNumReduceTasks(new Integer(desc.getNumReducers()));
+    childPlan.getReduceWork().setNumReduceTasks(new Integer(desc.getNumReducers()));
 
     opProcCtx.getOpTaskMap().put(reducer, childTask);
 
@@ -427,7 +439,7 @@ public final class GenMapRedUtils {
   public static void setTaskPlan(String alias_id,
       Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
       GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
-    MapredWork plan = (MapredWork) task.getWork();
+    MapWork plan = ((MapredWork) task.getWork()).getMapWork();
     ParseContext parseCtx = opProcCtx.getParseCtx();
     Set<ReadEntity> inputs = opProcCtx.getInputs();
 
@@ -482,6 +494,15 @@ public final class GenMapRedUtils {
 
     }
 
+    Map<String, String> props = parseCtx.getTopToProps().get(topOp);
+    if (props != null) {
+      Properties target = aliasPartnDesc.getProperties();
+      if (target == null) {
+        aliasPartnDesc.setProperties(target = new Properties());
+      }
+      target.putAll(props);
+    }
+
     plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
 
     long sizeNeeded = Integer.MAX_VALUE;
@@ -602,6 +623,14 @@ public final class GenMapRedUtils {
         tblDesc = Utilities.getTableDesc(part.getTable());
       }
 
+      if (props != null) {
+        Properties target = tblDesc.getProperties();
+        if (target == null) {
+          tblDesc.setProperties(target = new Properties());
+        }
+        target.putAll(props);
+      }
+
       for (Path p : paths) {
         if (p == null) {
           continue;
@@ -693,7 +722,7 @@ public final class GenMapRedUtils {
    *          table descriptor
    */
   public static void setTaskPlan(String path, String alias,
-      Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+      Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
       TableDesc tt_desc) throws SemanticException {
 
     if (path == null || alias == null) {
@@ -732,7 +761,7 @@ public final class GenMapRedUtils {
    * @param topOp
    *          current top operator in the path
    */
-  public static void setKeyAndValueDesc(MapredWork plan,
+  public static void setKeyAndValueDesc(ReduceWork plan,
       Operator<? extends OperatorDesc> topOp) {
     if (topOp == null) {
       return;
@@ -773,12 +802,12 @@ public final class GenMapRedUtils {
       }
     } else if (task instanceof ExecDriver) {
       MapredWork work = (MapredWork) task.getWork();
-      work.deriveExplainAttributes();
+      work.getMapWork().deriveExplainAttributes();
       HashMap<String, Operator<? extends OperatorDesc>> opMap = work
-          .getAliasToWork();
+          .getMapWork().getAliasToWork();
       if (opMap != null && !opMap.isEmpty()) {
         for (Operator<? extends OperatorDesc> op : opMap.values()) {
-          setKeyAndValueDesc(work, op);
+          setKeyAndValueDesc(work.getReduceWork(), op);
         }
       }
     }
@@ -799,7 +828,7 @@ public final class GenMapRedUtils {
    */
   public static MapredWork getMapRedWork(ParseContext parseCtx) {
     MapredWork work = getMapRedWorkFromConf(parseCtx.getConf());
-    work.setNameToSplitSample(parseCtx.getNameToSplitSample());
+    work.getMapWork().setNameToSplitSample(parseCtx.getNameToSplitSample());
     return work;
   }
 
@@ -810,7 +839,8 @@ public final class GenMapRedUtils {
    * @return the new plan
    */
   public static MapredWork getMapRedWorkFromConf(HiveConf conf) {
-    MapredWork work = new MapredWork();
+    MapredWork mrWork = new MapredWork();
+    MapWork work = mrWork.getMapWork();
 
     boolean mapperCannotSpanPartns =
         conf.getBoolVar(
@@ -819,11 +849,9 @@ public final class GenMapRedUtils {
     work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
     work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
-    work.setTagToValueDesc(new ArrayList<TableDesc>());
-    work.setReducer(null);
     work.setHadoopSupportsSplittable(
         conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
-    return work;
+    return mrWork;
   }
 
   /**
@@ -928,22 +956,22 @@ public final class GenMapRedUtils {
 
     Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
 
-    if (needsTagging(reducer)) {
+    if (needsTagging(cplan.getReduceWork())) {
       String origStreamDesc;
       streamDesc = "$INTNAME";
       origStreamDesc = streamDesc;
       int pos = 0;
-      while (cplan.getAliasToWork().get(streamDesc) != null) {
+      while (cplan.getMapWork().getAliasToWork().get(streamDesc) != null) {
         streamDesc = origStreamDesc.concat(String.valueOf(++pos));
       }
 
       // TODO: Allocate work to remove the temporary files and make that
       // dependent on the redTask
-      cplan.setNeedsTagging(true);
+      cplan.getReduceWork().setNeedsTagging(true);
     }
 
     // Add the path to alias mapping
-    setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, false, tt_desc);
+    setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan.getMapWork(), false, tt_desc);
     opProcCtx.setCurrTopOp(null);
     opProcCtx.setCurrAliasId(null);
     opProcCtx.setCurrTask(childTask);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Tue Jul 30 22:22:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -77,7 +78,7 @@ public final class MapJoinFactory {
    */
   private static class TableScanMapJoinProcessor implements NodeProcessor {
 
-    public static void setupBucketMapJoinInfo(MapredWork plan,
+    public static void setupBucketMapJoinInfo(MapWork plan,
         AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
       if (currMapJoinOp != null) {
         Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
@@ -214,7 +215,7 @@ public final class MapJoinFactory {
       // the first time.
       boolean local = pos != mapJoin.getConf().getPosBigTable();
       if (oldTask == null) {
-        assert currPlan.getReducer() == null;
+        assert currPlan.getReduceWork() == null;
         initMapJoinPlan(mapJoin, currTask, ctx, local);
       } else {
         // The current plan can be thrown away after being merged with the
@@ -223,7 +224,7 @@ public final class MapJoinFactory {
         ctx.setCurrTask(currTask = oldTask);
       }
       MapredWork plan = (MapredWork) currTask.getWork();
-      setupBucketMapJoinInfo(plan, mapJoin);
+      setupBucketMapJoinInfo(plan.getMapWork(), mapJoin);
 
       mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrAliasId()));
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Jul 30 22:22:35 2013
@@ -134,7 +134,7 @@ public class MapJoinProcessor implements
         new LinkedHashMap<String, FetchWork>());
 
     for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
-      newWork.getAliasToWork().entrySet()) {
+      newWork.getMapWork().getAliasToWork().entrySet()) {
       String alias = entry.getKey();
       Operator<? extends OperatorDesc> op = entry.getValue();
 
@@ -162,7 +162,7 @@ public class MapJoinProcessor implements
       smallTableAliasList.add(alias);
       // get input path and remove this alias from pathToAlias
       // because this file will be fetched by fetch operator
-      LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getPathToAliases();
+      LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getMapWork().getPathToAliases();
 
       // keep record all the input path for this alias
       HashSet<String> pathSet = new HashSet<String>();
@@ -193,7 +193,7 @@ public class MapJoinProcessor implements
       List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
 
       for (String tablePath : pathSet) {
-        PartitionDesc partitionDesc = newWork.getPathToPartitionInfo().get(tablePath);
+        PartitionDesc partitionDesc = newWork.getMapWork().getPathToPartitionInfo().get(tablePath);
         // create fetchwork for non partitioned table
         if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) {
           fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc());
@@ -205,7 +205,7 @@ public class MapJoinProcessor implements
       }
       // create fetchwork for partitioned table
       if (fetchWork == null) {
-        TableDesc table = newWork.getAliasToPartnInfo().get(alias).getTableDesc();
+        TableDesc table = newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc();
         fetchWork = new FetchWork(partDir, partDesc, table);
       }
       // set alias to fetch work
@@ -213,13 +213,13 @@ public class MapJoinProcessor implements
     }
     // remove small table ailias from aliasToWork;Avoid concurrent modification
     for (String alias : smallTableAliasList) {
-      newWork.getAliasToWork().remove(alias);
+      newWork.getMapWork().getAliasToWork().remove(alias);
     }
 
     // set up local work
-    newWork.setMapLocalWork(newLocalWork);
+    newWork.getMapWork().setMapLocalWork(newLocalWork);
     // remove reducer
-    newWork.setReducer(null);
+    newWork.setReduceWork(null);
     // return the big table alias
     if (bigTableAlias == null) {
       throw new SemanticException("Big Table Alias is null");
@@ -240,8 +240,8 @@ public class MapJoinProcessor implements
   public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
       throws SemanticException {
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
-        newWork.getOpParseCtxMap();
-    QBJoinTree newJoinTree = newWork.getJoinTree();
+        newWork.getMapWork().getOpParseCtxMap();
+    QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
     // generate the map join operator; already checked the map join
     MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
         newJoinTree, mapJoinPos, true, false);
@@ -256,8 +256,8 @@ public class MapJoinProcessor implements
       String bigTableAlias = MapJoinProcessor
           .genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
       // clean up the mapred work
-      newWork.setOpParseCtxMap(null);
-      newWork.setJoinTree(null);
+      newWork.getMapWork().setOpParseCtxMap(null);
+      newWork.getMapWork().setJoinTree(null);
 
       return bigTableAlias;
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java Tue Jul 30 22:22:35 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 
 /**
  * Common iteration methods for converting joins and sort-merge joins.
@@ -119,7 +120,7 @@ public abstract class AbstractJoinTaskDi
     }
   }
 
-  public long getTotalKnownInputSize(Context context, MapredWork currWork,
+  public long getTotalKnownInputSize(Context context, MapWork currWork,
       Map<String, ArrayList<String>> pathToAliases,
       HashMap<String, Long> aliasToSize) throws SemanticException {
     try {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java Tue Jul 30 22:22:35 2013
@@ -86,13 +86,13 @@ public class BucketingSortingInferenceOp
         continue;
       }
 
-      Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReducer();
-      if (reducer == null) {
+      if (mapRedTask.getWork().getReduceWork() == null) {
         continue;
       }
+      Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReduceWork().getReducer();
 
       // uses sampling, which means it's not bucketed
-      boolean disableBucketing = mapRedTask.getWork().getSamplingType() > 0;
+      boolean disableBucketing = mapRedTask.getWork().getMapWork().getSamplingType() > 0;
       BucketingSortingCtx bCtx = new BucketingSortingCtx(disableBucketing);
 
       // RuleRegExp rules are used to match operators anywhere in the tree
@@ -145,8 +145,8 @@ public class BucketingSortingInferenceOp
       topNodes.add(reducer);
       ogw.startWalking(topNodes, null);
 
-      mapRedTask.getWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
-      mapRedTask.getWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
+      mapRedTask.getWork().getMapWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
+      mapRedTask.getWork().getMapWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
     }
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Tue Jul 30 22:22:35 2013
@@ -50,10 +50,12 @@ import org.apache.hadoop.hive.ql.plan.Co
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 
 /*
  * Convert tasks involving JOIN into MAPJOIN.
@@ -108,7 +110,7 @@ public class CommonJoinTaskDispatcher ex
   }
 
   // Get the position of the big table for this join operator and the given alias
-  private int getPosition(MapredWork work, Operator<? extends OperatorDesc> joinOp,
+  private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
       String alias) {
     Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
 
@@ -127,9 +129,9 @@ public class CommonJoinTaskDispatcher ex
    */
   private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
     MapRedTask childTask = (MapRedTask) task.getChildTasks().get(0);
-    MapredWork work = task.getWork();
+    MapWork work = task.getWork().getMapWork();
     MapredLocalWork localWork = work.getMapLocalWork();
-    MapredWork childWork = childTask.getWork();
+    MapWork childWork = childTask.getWork().getMapWork();
     MapredLocalWork childLocalWork = childWork.getMapLocalWork();
 
     // Can this be merged
@@ -205,21 +207,27 @@ public class CommonJoinTaskDispatcher ex
       }
     }
 
+    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
+    // top of the second
     Operator<? extends Serializable> childAliasOp =
         childWork.getAliasToWork().values().iterator().next();
     if (fop.getParentOperators().size() > 1) {
       return;
     }
-
-    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
-    // top of the second
     Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
-    parentFOp.getChildOperators().remove(fop);
-    parentFOp.getChildOperators().add(childAliasOp);
-    List<Operator<? extends OperatorDesc>> parentOps =
-        new ArrayList<Operator<? extends OperatorDesc>>();
-    parentOps.add(parentFOp);
-    childAliasOp.setParentOperators(parentOps);
+    // remove the unnecessary TableScan
+    if (childAliasOp instanceof TableScanOperator) {
+      TableScanOperator tso = (TableScanOperator)childAliasOp;
+      if (tso.getNumChild() != 1) {
+        // shouldn't happen
+        return;
+      }
+      childAliasOp = tso.getChildOperators().get(0);
+      childAliasOp.replaceParent(tso, parentFOp);
+    } else {
+      childAliasOp.setParentOperators(Utilities.makeList(parentFOp));
+    }
+    parentFOp.replaceChild(fop, childAliasOp);
 
     work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
     for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
@@ -256,19 +264,26 @@ public class CommonJoinTaskDispatcher ex
    * @param childTask
    */
   private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
-    MapredWork childWork = childTask.getWork();
+    MapredWork mrChildWork = childTask.getWork();
+    ReduceWork childWork = childTask.getWork().getReduceWork();
+    if (childWork == null) {
+      return;
+    }
+
     Operator childReducer = childWork.getReducer();
     MapredWork work = task.getWork();
     if (childReducer == null) {
       return;
     }
-    work.setReducer(childReducer);
-    work.setNumReduceTasks(childWork.getNumReduceTasks());
-    work.setJoinTree(childWork.getJoinTree());
-    work.setNeedsTagging(childWork.getNeedsTagging());
+    ReduceWork rWork = new ReduceWork();
+    work.setReduceWork(rWork);
+    rWork.setReducer(childReducer);
+    rWork.setNumReduceTasks(childWork.getNumReduceTasks());
+    work.getMapWork().setJoinTree(mrChildWork.getMapWork().getJoinTree());
+    rWork.setNeedsTagging(childWork.getNeedsTagging());
 
     // Make sure the key configuration is correct, clear and regenerate.
-    work.getTagToValueDesc().clear();
+    rWork.getTagToValueDesc().clear();
     GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
   }
 
@@ -303,10 +318,9 @@ public class CommonJoinTaskDispatcher ex
       return;
     }
     MapRedTask childTask = (MapRedTask) firstChildTask;
-    MapredWork mapJoinWork = mapJoinTask.getWork();
+    MapWork mapJoinWork = mapJoinTask.getWork().getMapWork();
     MapredWork childWork = childTask.getWork();
-    Operator childReducer = childWork.getReducer();
-    if (childReducer == null) {
+    if (childWork.getReduceWork() == null) {
       // Not a MR job, nothing to merge.
       return;
     }
@@ -316,7 +330,7 @@ public class CommonJoinTaskDispatcher ex
     if (aliasToWork.size() > 1) {
       return;
     }
-    Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
+    Map<String, ArrayList<String>> childPathToAliases = childWork.getMapWork().getPathToAliases();
     if (childPathToAliases.size() > 1) {
       return;
     }
@@ -347,7 +361,7 @@ public class CommonJoinTaskDispatcher ex
     }
 
     MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
-    MapredLocalWork childLocalWork = childWork.getMapLocalWork();
+    MapredLocalWork childLocalWork = childWork.getMapWork().getMapLocalWork();
 
     // Either of them should not be bucketed
     if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
@@ -355,12 +369,12 @@ public class CommonJoinTaskDispatcher ex
       return;
     }
 
-    if (childWork.getAliasToWork().size() > 1) {
+    if (childWork.getMapWork().getAliasToWork().size() > 1) {
       return;
     }
 
     Operator<? extends Serializable> childAliasOp =
-        childWork.getAliasToWork().values().iterator().next();
+        childWork.getMapWork().getAliasToWork().values().iterator().next();
     if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
       return;
     }
@@ -387,10 +401,10 @@ public class CommonJoinTaskDispatcher ex
     parentOps.add(parentFOp);
     childAliasOp.setParentOperators(parentOps);
 
-    mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
-    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
+    mapJoinWork.getAliasToPartnInfo().putAll(childWork.getMapWork().getAliasToPartnInfo());
+    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getMapWork().getPathToPartitionInfo()
         .entrySet()) {
-      if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
+      if (childWork.getMapWork().getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
         mapJoinWork.getPathToPartitionInfo()
             .put(childWorkEntry.getKey(), childWorkEntry.getValue());
       }
@@ -444,7 +458,7 @@ public class CommonJoinTaskDispatcher ex
     }
     currTask.setTaskTag(Task.COMMON_JOIN);
 
-    MapredWork currWork = currTask.getWork();
+    MapWork currWork = currTask.getWork().getMapWork();
 
     // create conditional work list and task list
     List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -535,7 +549,7 @@ public class CommonJoinTaskDispatcher ex
 
       if (convertJoinMapJoin) {
         // create map join task and set big table as bigTablePosition
-        MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
+        MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst();
 
         newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
         replaceTask(currTask, newTask, physicalContext);
@@ -571,9 +585,9 @@ public class CommonJoinTaskDispatcher ex
         }
         // deep copy a new mapred work from xml
         // Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
-        String xml = currWork.toXML();
+        String xml = currTask.getWork().toXML();
         InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-        MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+        MapredWork newWork = Utilities.deserializeObject(in);
 
         // create map join task and set big table as i
         ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);
@@ -653,14 +667,15 @@ public class CommonJoinTaskDispatcher ex
   }
 
   private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
-    MapredWork work = task.getWork();
-    if (work == null) {
+    MapWork mWork = task.getWork().getMapWork();
+    ReduceWork rWork = task.getWork().getReduceWork();
+    if (rWork == null) {
       return null;
     }
-    Operator<? extends OperatorDesc> reducerOp = work.getReducer();
+    Operator<? extends OperatorDesc> reducerOp = rWork.getReducer();
     if (reducerOp instanceof JoinOperator) {
       /* Is any operator present, which prevents the conversion */
-      Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
+      Map<String, Operator<? extends OperatorDesc>> aliasToWork = mWork.getAliasToWork();
       for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
         if (!checkOperatorOKMapJoinConversion(op)) {
           return null;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Tue Jul 30 22:22:35 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import java.io.ByteArrayInputStream;
-import java.io.File;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
@@ -50,6 +49,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -104,6 +104,7 @@ public final class GenMRSkewJoinProcesso
    * https://issues.apache.org/jira/browse/HIVE-964.
    *
    */
+  @SuppressWarnings("unchecked")
   public static void processSkewJoin(JoinOperator joinOp,
       Task<? extends Serializable> currTask, ParseContext parseCtx)
       throws SemanticException {
@@ -151,7 +152,7 @@ public final class GenMRSkewJoinProcesso
     List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
     MapredWork currPlan = (MapredWork) currTask.getWork();
 
-    TableDesc keyTblDesc = (TableDesc) currPlan.getKeyDesc().clone();
+    TableDesc keyTblDesc = (TableDesc) currPlan.getReduceWork().getKeyDesc().clone();
     List<String> joinKeys = Utilities
         .getColumnNames(keyTblDesc.getProperties());
     List<String> joinKeyTypes = Utilities.getColumnTypes(keyTblDesc
@@ -232,7 +233,7 @@ public final class GenMRSkewJoinProcesso
 
     for (int i = 0; i < numAliases - 1; i++) {
       Byte src = tags[i];
-      MapredWork newPlan = PlanUtils.getMapRedWork();
+      MapWork newPlan = PlanUtils.getMapRedWork().getMapWork();
 
       // This code has been only added for testing
       boolean mapperCannotSpanPartns =
@@ -246,7 +247,7 @@ public final class GenMRSkewJoinProcesso
         StringBuilder sb = new StringBuilder(xmlPlan);
         ByteArrayInputStream bis;
         bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
-        clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf());
+        clonePlan = Utilities.deserializeObject(bis);
       } catch (UnsupportedEncodingException e) {
         throw new SemanticException(e);
       }
@@ -276,7 +277,7 @@ public final class GenMRSkewJoinProcesso
       newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
       newPlan.getAliasToPartnInfo().put(alias, part);
 
-      Operator<? extends OperatorDesc> reducer = clonePlan.getReducer();
+      Operator<? extends OperatorDesc> reducer = clonePlan.getReduceWork().getReducer();
       assert reducer instanceof JoinOperator;
       JoinOperator cloneJoinOp = (JoinOperator) reducer;
 
@@ -328,16 +329,18 @@ public final class GenMRSkewJoinProcesso
       newPlan
           .setMinSplitSize(HiveConf.getLongVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
       newPlan.setInputformat(HiveInputFormat.class.getName());
-      Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
-          newPlan, jc);
+
+      MapredWork w = new MapredWork();
+      w.setMapWork(newPlan);
+
+      Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(w, jc);
       bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
       listWorks.add(skewJoinMapJoinTask.getWork());
       listTasks.add(skewJoinMapJoinTask);
     }
 
     ConditionalWork cndWork = new ConditionalWork(listWorks);
-    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork,
-        parseCtx.getConf());
+    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
     cndTsk.setListTasks(listTasks);
     cndTsk.setResolver(new ConditionalResolverSkewJoin());
     cndTsk

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Tue Jul 30 22:22:35 2013
@@ -48,8 +48,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import
-  org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -98,14 +97,14 @@ public class MapJoinResolver implements 
         ConditionalTask conditionalTask) throws SemanticException {
       // get current mapred work and its local work
       MapredWork mapredWork = (MapredWork) currTask.getWork();
-      MapredLocalWork localwork = mapredWork.getMapLocalWork();
+      MapredLocalWork localwork = mapredWork.getMapWork().getMapLocalWork();
       if (localwork != null) {
         // get the context info and set up the shared tmp URI
         Context ctx = physicalContext.getContext();
         String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
         localwork.setTmpFileURI(tmpFileURI);
         String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
-        mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
+        mapredWork.getMapWork().setTmpHDFSFileURI(hdfsTmpURI);
         // create a task for this local work; right now, this local work is shared
         // by the original MapredTask and this new generated MapredLocalTask.
         MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, physicalContext
@@ -134,7 +133,7 @@ public class MapJoinResolver implements 
         newLocalWork.setTmpFileURI(tmpFileURI);
         newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
         newLocalWork.setBucketMapjoinContext(localwork.copyPartSpecMappingOnly());
-        mapredWork.setMapLocalWork(newLocalWork);
+        mapredWork.getMapWork().setMapLocalWork(newLocalWork);
         // get all parent tasks
         List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
         currTask.setParentTasks(null);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java Tue Jul 30 22:22:35 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -188,7 +189,7 @@ public class MetadataOnlyOptimizer imple
       physicalContext = context;
     }
 
-    private String getAliasForTableScanOperator(MapredWork work,
+    private String getAliasForTableScanOperator(MapWork work,
         TableScanOperator tso) {
 
       for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
@@ -211,7 +212,7 @@ public class MetadataOnlyOptimizer imple
       return desc;
     }
 
-    private List<String> getPathsForAlias(MapredWork work, String alias) {
+    private List<String> getPathsForAlias(MapWork work, String alias) {
       List<String> paths = new ArrayList<String>();
 
       for (Map.Entry<String, ArrayList<String>> entry : work.getPathToAliases().entrySet()) {
@@ -223,7 +224,7 @@ public class MetadataOnlyOptimizer imple
       return paths;
     }
 
-    private void processAlias(MapredWork work, String alias) {
+    private void processAlias(MapWork work, String alias) {
       // Change the alias partition desc
       PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
       changePartitionToMetadataOnly(aliasPartn);
@@ -247,12 +248,6 @@ public class MetadataOnlyOptimizer imple
       return partSpec.toString().replaceAll("[:/#\\?]", "_");
     }
 
-    private void convertToMetadataOnlyQuery(MapredWork work,
-        TableScanOperator tso) {
-      String alias = getAliasForTableScanOperator(work, tso);
-      processAlias(work, alias);
-    }
-
     @Override
     public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
         throws SemanticException {
@@ -305,8 +300,10 @@ public class MetadataOnlyOptimizer imple
 
       while (iterator.hasNext()) {
         TableScanOperator tso = iterator.next();
-        LOG.info("Metadata only table scan for " + tso.getConf().getAlias());
-        convertToMetadataOnlyQuery((MapredWork) task.getWork(), tso);
+        MapWork work = ((MapredWork) task.getWork()).getMapWork();
+        String alias = getAliasForTableScanOperator(work, tso);
+        LOG.info("Metadata only table scan for " + alias);
+        processAlias(work, alias);
       }
 
       return null;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java Tue Jul 30 22:22:35 2013
@@ -27,7 +27,9 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 
 /**
  * Mark final MapredWork for ORDER BY to use sampling and set number of reduce task as -1
@@ -39,12 +41,16 @@ public class SamplingOptimizer implement
       if (!(task instanceof MapRedTask) || !((MapRedTask)task).getWork().isFinalMapRed()) {
         continue; // this could be replaced by bucketing on RS + bucketed fetcher for next MR
       }
-      MapredWork mapreWork = ((MapRedTask) task).getWork();
-      if (mapreWork.getNumReduceTasks() != 1 || mapreWork.getAliasToWork().size() != 1 ||
-          mapreWork.getSamplingType() > 0 || mapreWork.getReducer() == null) {
+      MapredWork mrWork = ((MapRedTask) task).getWork();
+      MapWork mapWork = mrWork.getMapWork();
+      ReduceWork reduceWork = mrWork.getReduceWork();
+
+      if (reduceWork == null || reduceWork.getNumReduceTasks() != 1
+          || mapWork.getAliasToWork().size() != 1 || mapWork.getSamplingType() > 0
+          || reduceWork.getReducer() == null) {
         continue;
       }
-      Operator<?> operator = mapreWork.getAliasToWork().values().iterator().next();
+      Operator<?> operator = mapWork.getAliasToWork().values().iterator().next();
       if (!(operator instanceof TableScanOperator)) {
         continue;
       }
@@ -55,8 +61,8 @@ public class SamplingOptimizer implement
         continue;
       }
       child.getConf().setNumReducers(-1);
-      mapreWork.setNumReduceTasks(-1);
-      mapreWork.setSamplingType(MapredWork.SAMPLING_ON_START);
+      reduceWork.setNumReduceTasks(-1);
+      mapWork.setSamplingType(MapWork.SAMPLING_ON_START);
     }
     return pctx;
   }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Tue Jul 30 22:22:35 2013
@@ -74,7 +74,7 @@ public class SkewJoinResolver implements
       Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
 
       if (!task.isMapRedTask() || task instanceof ConditionalTask
-          || ((MapredWork) task.getWork()).getReducer() == null) {
+          || ((MapredWork) task.getWork()).getReduceWork() == null) {
         return null;
       }
 
@@ -94,7 +94,9 @@ public class SkewJoinResolver implements
 
       // iterator the reducer operator tree
       ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.add(((MapredWork) task.getWork()).getReducer());
+      if (((MapredWork)task.getWork()).getReduceWork() != null) {
+        topNodes.add(((MapredWork) task.getWork()).getReduceWork().getReducer());
+      }
       ogw.startWalking(topNodes, null);
       return null;
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Tue Jul 30 22:22:35 2013
@@ -52,10 +52,12 @@ import org.apache.hadoop.hive.ql.plan.Co
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 
 /**
@@ -72,7 +74,7 @@ public class SortMergeJoinTaskDispatcher
   // Convert the work in the SMB plan to a regular join
   // Note that the operator tree is not fixed, only the path/alias mappings in the
   // plan are fixed. The operator tree will still contain the SMBJoinOperator
-  private void genSMBJoinWork(MapredWork currWork, SMBMapJoinOperator smbJoinOp) {
+  private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) {
     // Remove the paths which are not part of aliasToPartitionInfo
     Map<String, PartitionDesc> aliasToPartitionInfo = currWork.getAliasToPartnInfo();
     List<String> removePaths = new ArrayList<String>();
@@ -150,7 +152,7 @@ public class SortMergeJoinTaskDispatcher
 
       // deep copy a new mapred work
       InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-      MapredWork currJoinWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+      MapredWork currJoinWork = Utilities.deserializeObject(in);
       SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
 
       // Add the row resolver for the new operator
@@ -158,7 +160,7 @@ public class SortMergeJoinTaskDispatcher
           physicalContext.getParseContext().getOpParseCtx();
       opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
       // change the newly created map-red plan as if it was a join operator
-      genSMBJoinWork(currJoinWork, newSMBJoinOp);
+      genSMBJoinWork(currJoinWork.getMapWork(), newSMBJoinOp);
       return currJoinWork;
     } catch (Exception e) {
       e.printStackTrace();
@@ -174,24 +176,25 @@ public class SortMergeJoinTaskDispatcher
       throws UnsupportedEncodingException, SemanticException {
     // deep copy a new mapred work from xml
     InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-    MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+    MapredWork newWork = Utilities.deserializeObject(in);
     // create a mapred task for this work
     MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
         .getParseContext().getConf());
     // generate the map join operator; already checked the map join
     MapJoinOperator newMapJoinOp =
         getMapJoinOperator(newTask, newWork, smbJoinOp, joinTree, bigTablePosition);
+
     // The reducer needs to be restored - Consider a query like:
     // select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key;
     // The reducer contains a groupby, which needs to be restored.
-    Operator<? extends OperatorDesc> reducer = newWork.getReducer();
+    ReduceWork rWork = newWork.getReduceWork();
 
     // create the local work for this plan
     String bigTableAlias =
         MapJoinProcessor.genLocalWorkForMapJoin(newWork, newMapJoinOp, bigTablePosition);
 
     // restore the reducer
-    newWork.setReducer(reducer);
+    newWork.setReduceWork(rWork);
     return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
   }
 
@@ -259,10 +262,10 @@ public class SortMergeJoinTaskDispatcher
     MapredWork currJoinWork = convertSMBWorkToJoinWork(currWork, originalSMBJoinOp);
     SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
 
-    currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
-    currWork.setJoinTree(joinTree);
-    currJoinWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
-    currJoinWork.setJoinTree(joinTree);
+    currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
+    currWork.getMapWork().setJoinTree(joinTree);
+    currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
+    currJoinWork.getMapWork().setJoinTree(joinTree);
 
     // create conditional work list and task list
     List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -272,7 +275,7 @@ public class SortMergeJoinTaskDispatcher
     HashMap<String, Task<? extends Serializable>> aliasToTask =
         new HashMap<String, Task<? extends Serializable>>();
     // Note that pathToAlias will behave as if the original plan was a join plan
-    HashMap<String, ArrayList<String>> pathToAliases = currJoinWork.getPathToAliases();
+    HashMap<String, ArrayList<String>> pathToAliases = currJoinWork.getMapWork().getPathToAliases();
 
     // generate a map join task for the big table
     SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf();
@@ -289,7 +292,7 @@ public class SortMergeJoinTaskDispatcher
     HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
     Configuration conf = context.getConf();
     try {
-      long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork,
+      long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork.getMapWork(),
           pathToAliases, aliasToSize);
 
       String xml = currJoinWork.toXML();
@@ -339,8 +342,8 @@ public class SortMergeJoinTaskDispatcher
     listWorks.add(currTask.getWork());
     listTasks.add(currTask);
     // clear JoinTree and OP Parse Context
-    currWork.setOpParseCtxMap(null);
-    currWork.setJoinTree(null);
+    currWork.getMapWork().setOpParseCtxMap(null);
+    currWork.getMapWork().setJoinTree(null);
 
     // create conditional task and insert conditional task into task tree
     ConditionalWork cndWork = new ConditionalWork(listWorks);
@@ -417,9 +420,9 @@ public class SortMergeJoinTaskDispatcher
   }
 
   private SMBMapJoinOperator getSMBMapJoinOp(MapredWork work) throws SemanticException {
-    if (work != null) {
-      Operator<? extends OperatorDesc> reducer = work.getReducer();
-      for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+    if (work != null && work.getReduceWork() != null) {
+      Operator<? extends OperatorDesc> reducer = work.getReduceWork().getReducer();
+      for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
         SMBMapJoinOperator smbMapJoinOp = getSMBMapJoinOp(op, reducer);
         if (smbMapJoinOp != null) {
           return smbMapJoinOp;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java Tue Jul 30 22:22:35 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -162,7 +163,7 @@ public class IndexWhereProcessor impleme
       HiveIndexQueryContext queryContext = queryContexts.get(chosenIndex);
 
       // prepare the map reduce job to use indexing
-      MapredWork work = currentTask.getWork();
+      MapWork work = currentTask.getWork().getMapWork();
       work.setInputformat(queryContext.getIndexInputFormat());
       work.addIndexIntermediateFile(queryContext.getIndexIntermediateFile());
       // modify inputs based on index query
@@ -204,7 +205,7 @@ public class IndexWhereProcessor impleme
 
     // check the size
     try {
-      ContentSummary inputSummary = Utilities.getInputSummary(pctx.getContext(), task.getWork(), null);
+      ContentSummary inputSummary = Utilities.getInputSummary(pctx.getContext(), task.getWork().getMapWork(), null);
       long inputSize = inputSummary.getLength();
       if (!indexHandler.checkQuerySize(inputSize, pctx.getConf())) {
         queryContext.setQueryTasks(null);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java Tue Jul 30 22:22:35 2013
@@ -93,7 +93,7 @@ public class IndexWhereTaskDispatcher im
     GraphWalker ogw = new DefaultGraphWalker(dispatcher);
     ArrayList<Node> topNodes = new ArrayList<Node>();
     if (task.getWork() instanceof MapredWork) {
-      topNodes.addAll(((MapredWork)task.getWork()).getAliasToWork().values());
+      topNodes.addAll(((MapredWork)task.getWork()).getMapWork().getAliasToWork().values());
     } else {
       return null;
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Tue Jul 30 22:22:35 2013
@@ -201,7 +201,7 @@ public class PartitionPruner implements 
             // are on non-partition columns.
             unkn_parts.addAll(Hive.get().getPartitions(tab));
           } else {
-            String message = Utilities.checkJDOPushDown(tab, compactExpr);
+            String message = Utilities.checkJDOPushDown(tab, compactExpr, null);
             if (message == null) {
               String filter = compactExpr.getExprString();
               String oldFilter = prunerExpr.getExprString();
@@ -217,7 +217,7 @@ public class PartitionPruner implements 
               LOG.info(ErrorMsg.INVALID_JDO_FILTER_EXPRESSION.getMsg("by condition '"
                   + message + "'"));
               pruneBySequentialScan(tab, true_parts, unkn_parts, denied_parts,
-                  prunerExpr, rowObjectInspector);
+                  prunerExpr, rowObjectInspector, conf);
             }
           }
         }
@@ -300,10 +300,11 @@ public class PartitionPruner implements 
    * @param denied_parts pruned out partitions.
    * @param prunerExpr the SQL predicate that involves partition columns.
    * @param rowObjectInspector object inspector used by the evaluator
+   * @param conf Hive Configuration object, can not be NULL.
    * @throws Exception
    */
   static private void pruneBySequentialScan(Table tab, Set<Partition> true_parts, Set<Partition> unkn_parts,
-      Set<Partition> denied_parts, ExprNodeDesc prunerExpr, StructObjectInspector rowObjectInspector)
+      Set<Partition> denied_parts, ExprNodeDesc prunerExpr, StructObjectInspector rowObjectInspector, HiveConf conf)
       throws Exception {
 
     List<String> trueNames = null;
@@ -320,6 +321,7 @@ public class PartitionPruner implements 
     List<String> partCols = new ArrayList<String>(pCols.size());
     List<String> values = new ArrayList<String>(pCols.size());
     Object[] objectWithPart = new Object[2];
+    String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
 
     for (FieldSchema pCol : pCols) {
       partCols.add(pCol.getName());
@@ -344,11 +346,17 @@ public class PartitionPruner implements 
       Boolean r = (Boolean) PartExprEvalUtils.evaluateExprOnPart(handle, objectWithPart);
 
       if (r == null) {
-        if (unknNames == null) {
-          unknNames = new LinkedList<String>();
+        // Reject default partitions if we couldn't determine whether we should include it or not.
+        // Note that predicate would only contains partition column parts of original predicate.
+        if (values.contains(defaultPartitionName)) {
+          LOG.debug("skipping default/bad partition: " + partName);
+        }else {
+          if (unknNames == null) {
+            unknNames = new LinkedList<String>();
+          }
+          unknNames.add(partName);
+          LOG.debug("retained unknown partition: " + partName);
         }
-        unknNames.add(partName);
-        LOG.debug("retained unknown partition: " + partName);
       } else if (Boolean.TRUE.equals(r)) {
         if (trueNames == null) {
           trueNames = new LinkedList<String>();

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Tue Jul 30 22:22:35 2013
@@ -156,8 +156,8 @@ public class DDLSemanticAnalyzer extends
   }
 
   public static String getTypeName(int token) throws SemanticException {
-    // date and datetime types aren't currently supported
-    if (token == HiveParser.TOK_DATE || token == HiveParser.TOK_DATETIME) {
+    // datetime type isn't currently supported
+    if (token == HiveParser.TOK_DATETIME) {
       throw new SemanticException(ErrorMsg.UNSUPPORTED_TYPE.getMsg());
     }
     return TokenToTypeName.get(token);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g Tue Jul 30 22:22:35 2013
@@ -176,8 +176,8 @@ tableSample
 tableSource
 @init { gParent.msgs.push("table source"); }
 @after { gParent.msgs.pop(); }
-    : tabname=tableName (ts=tableSample)? (KW_AS? alias=identifier)?
-    -> ^(TOK_TABREF $tabname $ts? $alias?)
+    : tabname=tableName (props=tableProperties)? (ts=tableSample)? (KW_AS? alias=Identifier)?
+    -> ^(TOK_TABREF $tabname $props? $ts? $alias?)
     ;
 
 tableName

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Tue Jul 30 22:22:35 2013
@@ -106,6 +106,7 @@ TOK_BOOLEAN;
 TOK_FLOAT;
 TOK_DOUBLE;
 TOK_DATE;
+TOK_DATELITERAL;
 TOK_DATETIME;
 TOK_TIMESTAMP;
 TOK_STRING;