You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/03 20:33:14 UTC

svn commit: r1519788 [1/2] - in /hive/branches/vectorization/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/exec/vector/expressions/ java/o...

Author: hashutosh
Date: Tue Sep  3 18:33:13 2013
New Revision: 1519788

URL: http://svn.apache.org/r1519788
Log:
HIVE-4959 : Vectorized plan generation should be added as an optimization transform. (Jitendra Nath Pandey via Ashutosh Chauhan)

Added:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
Removed:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Sep  3 18:33:13 2013
@@ -87,7 +87,7 @@ public class FileSinkOperator extends Te
   protected transient int maxPartitions;
   protected transient ListBucketingCtx lbCtx;
   protected transient boolean isSkewedStoredAsSubDirectories;
-  private transient boolean statsCollectRawDataSize;
+  protected transient boolean statsCollectRawDataSize;
 
 
   private static final transient String[] FATAL_ERR_MSG = {
@@ -220,6 +220,10 @@ public class FileSinkOperator extends Te
         }
       }
     }
+
+    public Stat getStat() {
+      return stat;
+    }
   } // class FSPaths
 
   private static final long serialVersionUID = 1L;
@@ -227,7 +231,7 @@ public class FileSinkOperator extends Te
   protected transient Serializer serializer;
   protected transient BytesWritable commonKey = new BytesWritable();
   protected transient TableIdEnum tabIdEnum = null;
-  private transient LongWritable row_count;
+  protected transient LongWritable row_count;
   private transient boolean isNativeTable = true;
 
   /**
@@ -236,17 +240,17 @@ public class FileSinkOperator extends Te
    * each reducer can write 10 files - this way we effectively get 1000 files.
    */
   private transient ExprNodeEvaluator[] partitionEval;
-  private transient int totalFiles;
+  protected transient int totalFiles;
   private transient int numFiles;
-  private transient boolean multiFileSpray;
-  private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+  protected transient boolean multiFileSpray;
+  protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
 
   private transient ObjectInspector[] partitionObjectInspectors;
-  private transient HivePartitioner<HiveKey, Object> prtner;
-  private transient final HiveKey key = new HiveKey();
+  protected transient HivePartitioner<HiveKey, Object> prtner;
+  protected transient final HiveKey key = new HiveKey();
   private transient Configuration hconf;
-  private transient FSPaths fsp;
-  private transient boolean bDynParts;
+  protected transient FSPaths fsp;
+  protected transient boolean bDynParts;
   private transient SubStructObjectInspector subSetOI;
   private transient int timeOut; // JT timeout in msec.
   private transient long lastProgressReport = System.currentTimeMillis();
@@ -278,7 +282,7 @@ public class FileSinkOperator extends Te
   Class<? extends Writable> outputClass;
   String taskId;
 
-  private boolean filesCreated = false;
+  protected boolean filesCreated = false;
 
   private void initializeSpecPath() {
     // For a query of the type:
@@ -431,7 +435,7 @@ public class FileSinkOperator extends Te
     }
   }
 
-  private void createBucketFiles(FSPaths fsp) throws HiveException {
+  protected void createBucketFiles(FSPaths fsp) throws HiveException {
     try {
       int filesIdx = 0;
       Set<Integer> seenBuckets = new HashSet<Integer>();
@@ -543,7 +547,7 @@ public class FileSinkOperator extends Te
    *
    * @return true if a new progress update is reported, false otherwise.
    */
-  private boolean updateProgress() {
+  protected boolean updateProgress() {
     if (reporter != null &&
         (System.currentTimeMillis() - lastProgressReport) > timeOut) {
       reporter.progress();
@@ -554,7 +558,7 @@ public class FileSinkOperator extends Te
     }
   }
 
-  Writable recordValue;
+  protected Writable recordValue;
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
@@ -660,7 +664,7 @@ public class FileSinkOperator extends Te
    * @return
    * @throws HiveException
    */
-  private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+  protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
     FSPaths fsp2 = valToPaths.get(lbDirName);
     if (fsp2 == null) {
       fsp2 = createNewPaths(lbDirName);
@@ -698,7 +702,7 @@ public class FileSinkOperator extends Te
    * @param row row to process.
    * @return directory name.
    */
-  private String generateListBucketingDirName(Object row) {
+  protected String generateListBucketingDirName(Object row) {
     if (!this.isSkewedStoredAsSubDirectories) {
       return null;
     }
@@ -739,7 +743,7 @@ public class FileSinkOperator extends Te
     return lbDirName;
   }
 
-  private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
+  protected FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
 
     FSPaths fp;
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Sep  3 18:33:13 2013
@@ -22,12 +22,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
-import org.apache.hadoop.hive.ql.plan.MuxDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
-import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.La
 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -106,6 +112,38 @@ public final class OperatorFactory {
         MuxOperator.class));
   }
 
+  public static ArrayList<OpTuple> vectorOpvec;
+  static {
+    vectorOpvec = new ArrayList<OpTuple>();
+    vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
+    vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
+    vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
+        VectorReduceSinkOperator.class));
+    vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
+    vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
+  }
+
+  public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
+      VectorizationContext vContext) {
+    Class<T> descClass = (Class<T>) conf.getClass();
+    for (OpTuple o : vectorOpvec) {
+      if (o.descClass == descClass) {
+        try {
+          Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
+              VectorizationContext.class, OperatorDesc.class).newInstance(
+              vContext, conf);
+          op.initializeCounters();
+          return op;
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    throw new RuntimeException("No vector operator for descriptor class "
+        + descClass.getName());
+  }
+
   public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
 
     for (OpTuple o : opvec) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Sep  3 18:33:13 2013
@@ -25,7 +25,6 @@ import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -57,10 +56,6 @@ import org.apache.hadoop.hive.ql.exec.Pa
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper;
-import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
@@ -243,22 +238,7 @@ public class ExecDriver extends Task<Map
     //See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput()
     job.setOutputFormat(HiveOutputFormatImpl.class);
 
-
-    boolean vectorPath = HiveConf.getBoolVar(job,
-        HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
-
-    if (vectorPath) {
-      if (validateVectorPath()) {
-        LOG.info("Going down the vectorization path");
-        job.setMapperClass(VectorExecMapper.class);
-      } else {
-        //fall back to non-vector mode
-        HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
-        job.setMapperClass(ExecMapper.class);
-      }
-    } else {
-      job.setMapperClass(ExecMapper.class);
-    }
+    job.setMapperClass(ExecMapper.class);
 
     job.setMapOutputKeyClass(HiveKey.class);
     job.setMapOutputValueClass(BytesWritable.class);
@@ -510,59 +490,6 @@ public class ExecDriver extends Task<Map
     return (returnVal);
   }
 
-  private boolean validateVectorPath() {
-    LOG.debug("Validating if vectorized execution is applicable");
-    MapWork thePlan = this.getWork().getMapWork();
-
-    for (String path : thePlan.getPathToPartitionInfo().keySet()) {
-      PartitionDesc pd = thePlan.getPathToPartitionInfo().get(path);
-      List<Class<?>> interfaceList =
-          Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
-      if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
-        LOG.debug("Input format: " + pd.getInputFileFormatClassName()
-            + ", doesn't provide vectorized input");
-        return false;
-      }
-    }
-    VectorizationContext vc = new VectorizationContext(null, 0);
-    for (String onefile : thePlan.getPathToAliases().keySet()) {
-      List<String> aliases = thePlan.getPathToAliases().get(onefile);
-      for (String onealias : aliases) {
-        Operator<? extends OperatorDesc> op = thePlan.getAliasToWork().get(
-            onealias);
-        Operator<? extends OperatorDesc> vectorOp = null;
-        try {
-          vectorOp = VectorMapOperator.vectorizeOperator(op, vc);
-        } catch (Exception e) {
-          LOG.debug("Cannot vectorize the plan", e);
-          return false;
-        }
-        if (vectorOp == null) {
-          LOG.debug("Cannot vectorize the plan");
-          return false;
-        }
-        //verify the expressions contained in the operators
-        try {
-          validateVectorOperator(vectorOp);
-        } catch (HiveException e) {
-          LOG.debug("Cannot vectorize the plan", e);
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  private void validateVectorOperator(Operator<? extends OperatorDesc> vectorOp)
-      throws HiveException {
-    vectorOp.initialize(job, null);
-    if (vectorOp.getChildOperators() != null) {
-      for (Operator<? extends OperatorDesc> vop : vectorOp.getChildOperators()) {
-        validateVectorOperator(vop);
-      }
-    }
-  }
-
   private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf)
       throws Exception {
     assert mWork.getAliasToWork().keySet().size() == 1;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Sep  3 18:33:13 2013
@@ -19,553 +19,40 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
-import org.apache.hadoop.hive.ql.exec.Stat;
-import org.apache.hadoop.hive.ql.exec.TerminalOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * File Sink operator implementation.
  **/
-public class VectorFileSinkOperator extends TerminalOperator<FileSinkDesc> implements
-    Serializable {
+public class VectorFileSinkOperator extends FileSinkOperator {
 
-  protected transient HashMap<String, FSPaths> valToPaths;
-  protected transient int numDynParts;
-  protected transient List<String> dpColNames;
-  protected transient DynamicPartitionCtx dpCtx;
-  protected transient boolean isCompressed;
-  protected transient Path parent;
-  protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
-  protected transient Path specPath;
-  protected transient String childSpecPathDynLinkedPartitions;
-  protected transient int dpStartCol; // start column # for DP columns
-  protected transient List<String> dpVals; // array of values corresponding to DP columns
-  protected transient List<Object> dpWritables;
-  protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
-  protected transient int maxPartitions;
-  protected transient ListBucketingCtx lbCtx;
-  protected transient boolean isSkewedStoredAsSubDirectories;
-  private transient boolean statsCollectRawDataSize;
-
-  private static final transient String[] FATAL_ERR_MSG = {
-      null, // counter value 0 means no error
-      "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions" +
-          ".pernode."
-  };
-  private final VectorizationContext vContext;
+  private static final long serialVersionUID = 1L;
 
   public VectorFileSinkOperator(VectorizationContext context,
       OperatorDesc conf) {
-    this.vContext = context;
+    super();
     this.conf = (FileSinkDesc) conf;
   }
 
-  public class FSPaths implements Cloneable {
-    Path tmpPath;
-    Path taskOutputTempPath;
-    Path[] outPaths;
-    Path[] finalPaths;
-    RecordWriter[] outWriters;
-    Stat stat;
-
-    public FSPaths() {
-    }
-
-    public FSPaths(Path specPath) {
-      tmpPath = Utilities.toTempPath(specPath);
-      taskOutputTempPath = Utilities.toTaskTempPath(specPath);
-      outPaths = new Path[numFiles];
-      finalPaths = new Path[numFiles];
-      outWriters = new RecordWriter[numFiles];
-      stat = new Stat();
-    }
-
-    /**
-     * Update OutPath according to tmpPath.
-     */
-    public Path getTaskOutPath(String taskId) {
-      return getOutPath(taskId, this.taskOutputTempPath);
-    }
-
-
-    /**
-     * Update OutPath according to tmpPath.
-     */
-    public Path getOutPath(String taskId) {
-      return getOutPath(taskId, this.tmpPath);
-    }
-
-    /**
-     * Update OutPath according to tmpPath.
-     */
-    public Path getOutPath(String taskId, Path tmp) {
-      return new Path(tmp, Utilities.toTempPath(taskId));
-    }
-
-    /**
-     * Update the final paths according to tmpPath.
-     */
-    public Path getFinalPath(String taskId) {
-      return getFinalPath(taskId, this.tmpPath, null);
-    }
-
-    /**
-     * Update the final paths according to tmpPath.
-     */
-    public Path getFinalPath(String taskId, Path tmpPath, String extension) {
-      if (extension != null) {
-        return new Path(tmpPath, taskId + extension);
-      } else {
-        return new Path(tmpPath, taskId);
-      }
-    }
-
-    public void setOutWriters(RecordWriter[] out) {
-      outWriters = out;
-    }
-
-    public RecordWriter[] getOutWriters() {
-      return outWriters;
-    }
-
-    public void closeWriters(boolean abort) throws HiveException {
-      for (int idx = 0; idx < outWriters.length; idx++) {
-        if (outWriters[idx] != null) {
-          try {
-            outWriters[idx].close(abort);
-            updateProgress();
-          } catch (IOException e) {
-            throw new HiveException(e);
-          }
-        }
-      }
-    }
-
-    private void commit(FileSystem fs) throws HiveException {
-      for (int idx = 0; idx < outPaths.length; ++idx) {
-        try {
-          if ((bDynParts || isSkewedStoredAsSubDirectories)
-              && !fs.exists(finalPaths[idx].getParent())) {
-            fs.mkdirs(finalPaths[idx].getParent());
-          }
-          if (!fs.rename(outPaths[idx], finalPaths[idx])) {
-            throw new HiveException("Unable to rename output from: " +
-                outPaths[idx] + " to: " + finalPaths[idx]);
-          }
-          updateProgress();
-        } catch (IOException e) {
-          throw new HiveException("Unable to rename output from: " +
-              outPaths[idx] + " to: " + finalPaths[idx], e);
-        }
-      }
-    }
-
-    public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
-      for (int idx = 0; idx < outWriters.length; idx++) {
-        if (outWriters[idx] != null) {
-          try {
-            outWriters[idx].close(abort);
-            if (delete) {
-              fs.delete(outPaths[idx], true);
-            }
-            updateProgress();
-          } catch (IOException e) {
-            throw new HiveException(e);
-          }
-        }
-      }
-    }
-  } // class FSPaths
-
-  private static final long serialVersionUID = 1L;
-  protected transient FileSystem fs;
-  protected transient Serializer serializer;
-  protected transient BytesWritable commonKey = new BytesWritable();
-  protected transient TableIdEnum tabIdEnum = null;
-  private transient LongWritable row_count;
-  private transient boolean isNativeTable = true;
-
-  /**
-   * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets,
-   * it is not a good idea to start so many reducers - if the maximum number of reducers is 100,
-   * each reducer can write 10 files - this way we effectively get 1000 files.
-   */
-  private transient ExprNodeEvaluator[] partitionEval;
-  private transient int totalFiles;
-  private transient int numFiles;
-  private transient boolean multiFileSpray;
-  private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
-
-  private transient ObjectInspector[] partitionObjectInspectors;
-  private transient HivePartitioner<HiveKey, Object> prtner;
-  private transient final HiveKey key = new HiveKey();
-  private transient Configuration hconf;
-  private transient FSPaths fsp;
-  private transient boolean bDynParts;
-  private transient SubStructObjectInspector subSetOI;
-  private transient int timeOut; // JT timeout in msec.
-  private transient long lastProgressReport = System.currentTimeMillis();
-
-  /**
-   * TableIdEnum.
-   *
-   */
-  public static enum TableIdEnum {
-    TABLE_ID_1_ROWCOUNT,
-    TABLE_ID_2_ROWCOUNT,
-    TABLE_ID_3_ROWCOUNT,
-    TABLE_ID_4_ROWCOUNT,
-    TABLE_ID_5_ROWCOUNT,
-    TABLE_ID_6_ROWCOUNT,
-    TABLE_ID_7_ROWCOUNT,
-    TABLE_ID_8_ROWCOUNT,
-    TABLE_ID_9_ROWCOUNT,
-    TABLE_ID_10_ROWCOUNT,
-    TABLE_ID_11_ROWCOUNT,
-    TABLE_ID_12_ROWCOUNT,
-    TABLE_ID_13_ROWCOUNT,
-    TABLE_ID_14_ROWCOUNT,
-    TABLE_ID_15_ROWCOUNT;
-  }
-
-  protected transient boolean autoDelete = false;
-  protected transient JobConf jc;
-  Class<? extends Writable> outputClass;
-  String taskId;
-
-  private boolean filesCreated = false;
-
-  private void initializeSpecPath() {
-    // For a query of the type:
-    // insert overwrite table T1
-    // select * from (subq1 union all subq2)u;
-    // subQ1 and subQ2 write to directories Parent/Child_1 and
-    // Parent/Child_2 respectively, and union is removed.
-    // The movetask that follows subQ1 and subQ2 tasks moves the directory
-    // 'Parent'
-
-    // However, if the above query contains dynamic partitions, subQ1 and
-    // subQ2 have to write to directories: Parent/DynamicPartition/Child_1
-    // and Parent/DynamicPartition/Child_1 respectively.
-    // The movetask that follows subQ1 and subQ2 tasks still moves the directory
-    // 'Parent'
-    if ((!conf.isLinkedFileSink()) || (dpCtx == null)) {
-      specPath = new Path(conf.getDirName());
-      childSpecPathDynLinkedPartitions = null;
-      return;
-    }
-
-    specPath = new Path(conf.getParentDir());
-    childSpecPathDynLinkedPartitions = Utilities.getFileNameFromDirName(conf.getDirName());
-  }
-
-  @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    try {
-      this.hconf = hconf;
-      filesCreated = false;
-      isNativeTable = !conf.getTableInfo().isNonNative();
-      multiFileSpray = conf.isMultiFileSpray();
-      totalFiles = conf.getTotalFiles();
-      numFiles = conf.getNumFiles();
-      dpCtx = conf.getDynPartCtx();
-      lbCtx = conf.getLbCtx();
-      valToPaths = new HashMap<String, FSPaths>();
-      taskId = Utilities.getTaskId(hconf);
-      initializeSpecPath();
-      fs = specPath.getFileSystem(hconf);
-      hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
-      isCompressed = conf.getCompressed();
-      parent = Utilities.toTempPath(conf.getDirName());
-      statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
-
-      serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
-      serializer.initialize(null, conf.getTableInfo().getProperties());
-      outputClass = serializer.getSerializedClass();
-
-      // Timeout is chosen to make sure that even if one iteration takes more than
-      // half of the script.timeout but less than script.timeout, we will still
-      // be able to report progress.
-      timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2;
-
-      if (hconf instanceof JobConf) {
-        jc = (JobConf) hconf;
-      } else {
-        // test code path
-        jc = new JobConf(hconf, ExecDriver.class);
-      }
-
-      if (multiFileSpray) {
-        partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
-        int i = 0;
-        for (ExprNodeDesc e : conf.getPartitionCols()) {
-          partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
-        }
-
-        partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
-        prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
-            jc.getPartitionerClass(), null);
-      }
-      int id = conf.getDestTableId();
-      if ((id != 0) && (id <= TableIdEnum.values().length)) {
-        String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
-        tabIdEnum = TableIdEnum.valueOf(enumName);
-        row_count = new LongWritable();
-        statsMap.put(tabIdEnum, row_count);
-      }
-
-      if (dpCtx != null) {
-        dpSetup();
-      }
-
-      if (lbCtx != null) {
-        lbSetup();
-      }
-
-      if (!bDynParts) {
-        fsp = new FSPaths(specPath);
-
-        // Create all the files - this is required because empty files need to be created for
-        // empty buckets
-        // createBucketFiles(fsp);
-        if (!this.isSkewedStoredAsSubDirectories) {
-          valToPaths.put("", fsp); // special entry for non-DP case
-        }
-      }
-
-      initializeChildren(hconf);
-    } catch (HiveException e) {
-      throw e;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new HiveException(e);
-    }
-  }
-
-  /**
-   * Initialize list bucketing information
-   */
-  private void lbSetup() {
-    this.isSkewedStoredAsSubDirectories =  ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir());
-  }
-
-  /**
-   * Set up for dynamic partitioning including a new ObjectInspector for the output row.
-   */
-  private void dpSetup() {
-
-    this.bDynParts = false;
-    this.numDynParts = dpCtx.getNumDPCols();
-    this.dpColNames = dpCtx.getDPColNames();
-    this.maxPartitions = dpCtx.getMaxPartitionsPerNode();
-
-    assert numDynParts == dpColNames.size() : "number of dynamic paritions should be the same as the size of DP mapping";
-
-    if (dpColNames != null && dpColNames.size() > 0) {
-      this.bDynParts = true;
-      assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has "
-          + inputObjInspectors.length;
-      StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
-      // remove the last dpMapping.size() columns from the OI
-      List<? extends StructField> fieldOI = soi.getAllStructFieldRefs();
-      ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>();
-      ArrayList<String> newFieldsName = new ArrayList<String>();
-      this.dpStartCol = 0;
-      for (StructField sf : fieldOI) {
-        String fn = sf.getFieldName();
-        if (!dpCtx.getInputToDPCols().containsKey(fn)) {
-          newFieldsOI.add(sf.getFieldObjectInspector());
-          newFieldsName.add(sf.getFieldName());
-          this.dpStartCol++;
-        }
-      }
-      assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty";
-
-      this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
-      this.dpVals = new ArrayList<String>(numDynParts);
-      this.dpWritables = new ArrayList<Object>(numDynParts);
-    }
-  }
-
-  private void createBucketFiles(FSPaths fsp) throws HiveException {
-    try {
-      int filesIdx = 0;
-      Set<Integer> seenBuckets = new HashSet<Integer>();
-      for (int idx = 0; idx < totalFiles; idx++) {
-        if (this.getExecContext() != null && this.getExecContext().getFileId() != null) {
-          LOG.info("replace taskId from execContext ");
-
-          taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId());
-
-          LOG.info("new taskId: FS " + taskId);
-
-          assert !multiFileSpray;
-          assert totalFiles == 1;
-        }
-
-        if (multiFileSpray) {
-          key.setHashCode(idx);
-
-          // Does this hashcode belong to this reducer
-          int numReducers = totalFiles / numFiles;
-
-          if (numReducers > 1) {
-            int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities
-                .getTaskId(hconf)));
-
-            int reducerIdx = prtner.getPartition(key, null, numReducers);
-            if (currReducer != reducerIdx) {
-              continue;
-            }
-          }
-
-          int bucketNum = prtner.getBucket(key, null, totalFiles);
-          if (seenBuckets.contains(bucketNum)) {
-            continue;
-          }
-          seenBuckets.add(bucketNum);
-
-          bucketMap.put(bucketNum, filesIdx);
-          taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
-        }
-        if (isNativeTable) {
-          fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
-          LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
-          fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
-          LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
-        } else {
-          fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
-        }
-        try {
-          // The reason to keep these instead of using
-          // OutputFormat.getRecordWriter() is that
-          // getRecordWriter does not give us enough control over the file name that
-          // we create.
-          String extension = Utilities.getFileExtension(jc, isCompressed,
-              hiveOutputFormat);
-          if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
-            fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
-          } else {
-            fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
-          }
-
-        } catch (Exception e) {
-          e.printStackTrace();
-          throw new HiveException(e);
-        }
-        LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
-
-        if (isNativeTable) {
-          try {
-            // in recent hadoop versions, use deleteOnExit to clean tmp files.
-            autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(
-                fs, fsp.outPaths[filesIdx]);
-          } catch (IOException e) {
-            throw new HiveException(e);
-          }
-        }
-
-        Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
-        // only create bucket files only if no dynamic partitions,
-        // buckets of dynamic partitions will be created for each newly created partition
-        fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
-            jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
-            reporter);
-        // increment the CREATED_FILES counter
-        if (reporter != null) {
-          reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
-        }
-        filesIdx++;
-      }
-      assert filesIdx == numFiles;
-
-      // in recent hadoop versions, use deleteOnExit to clean tmp files.
-      if (isNativeTable) {
-        autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, fsp.outPaths[0]);
-      }
-    } catch (HiveException e) {
-      throw e;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new HiveException(e);
-    }
-
-    filesCreated = true;
-  }
+  public VectorFileSinkOperator() {
 
-  /**
-   * Report status to JT so that JT won't kill this task if closing takes too long
-   * due to too many files to close and the NN is overloaded.
-   *
-   * @return true if a new progress update is reported, false otherwise.
-   */
-  private boolean updateProgress() {
-    if (reporter != null &&
-        (System.currentTimeMillis() - lastProgressReport) > timeOut) {
-      reporter.progress();
-      lastProgressReport = System.currentTimeMillis();
-      return true;
-    } else {
-      return false;
-    }
   }
 
-  Writable recordValue;
-
   @Override
   public void processOp(Object data, int tag) throws HiveException {
 
@@ -652,15 +139,15 @@ public class VectorFileSinkOperator exte
         }
       }
 
-      rowOutWriters = fpaths.outWriters;
+      rowOutWriters = fpaths.getOutWriters();
       if (conf.isGatherStats()) {
         if (statsCollectRawDataSize) {
           SerDeStats stats = serializer.getSerDeStats();
           if (stats != null) {
-            fpaths.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+            fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
           }
         }
-        fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1);
+        fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1);
       }
 
 
@@ -682,417 +169,4 @@ public class VectorFileSinkOperator exte
     }
     }
   }
-
-  /**
-   * Generate list bucketing directory name from a row.
-   * @param row row to process.
-   * @return directory name.
-   */
-  private String generateListBucketingDirName(Object row) {
-    if (!this.isSkewedStoredAsSubDirectories) {
-      return null;
-    }
-
-    String lbDirName = null;
-    List<Object> standObjs = new ArrayList<Object>();
-    List<String> skewedCols = lbCtx.getSkewedColNames();
-    List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
-    List<String> skewedValsCandidate = null;
-    Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
-
-    /* Convert input row to standard objects. */
-    ObjectInspectorUtils.copyToStandardObject(standObjs, row,
-        (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
-
-    assert (standObjs.size() >= skewedCols.size()) :
-      "The row has less number of columns than no. of skewed column.";
-
-    skewedValsCandidate = new ArrayList<String>(skewedCols.size());
-    for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
-      skewedValsCandidate.add(posPair.getSkewColPosition(),
-          standObjs.get(posPair.getTblColPosition()).toString());
-    }
-    /* The row matches skewed column names. */
-    if (allSkewedVals.contains(skewedValsCandidate)) {
-      /* matches skewed values. */
-      lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
-      locationMap.put(skewedValsCandidate, lbDirName);
-    } else {
-      /* create default directory. */
-      lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
-          lbCtx.getDefaultDirName());
-      List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
-      if (!locationMap.containsKey(defaultKey)) {
-        locationMap.put(defaultKey, lbDirName);
-      }
-    }
-    return lbDirName;
-  }
-
-  /**
-   * Lookup list bucketing path.
-   * @param lbDirName
-   * @return
-   * @throws HiveException
-   */
-  private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
-    FSPaths fsp2 = valToPaths.get(lbDirName);
-    if (fsp2 == null) {
-      fsp2 = createNewPaths(lbDirName);
-    }
-    return fsp2;
-  }
-
-  /**
-   * create new path.
-   *
-   * @param dirName
-   * @return
-   * @throws HiveException
-   */
-  private FSPaths createNewPaths(String dirName) throws HiveException {
-    FSPaths fsp2 = new FSPaths(specPath);
-    if (childSpecPathDynLinkedPartitions != null) {
-      fsp2.tmpPath = new Path(fsp2.tmpPath,
-          dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
-      fsp2.taskOutputTempPath =
-        new Path(fsp2.taskOutputTempPath,
-            dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
-    } else {
-      fsp2.tmpPath = new Path(fsp2.tmpPath, dirName);
-      fsp2.taskOutputTempPath =
-        new Path(fsp2.taskOutputTempPath, dirName);
-    }
-    createBucketFiles(fsp2);
-    valToPaths.put(dirName, fsp2);
-    return fsp2;
-  }
-
-  private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
-
-    FSPaths fp;
-
-    // get the path corresponding to the dynamic partition columns,
-    String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
-
-    if (dpDir != null) {
-      dpDir = appendListBucketingDirName(lbDirName, dpDir);
-      FSPaths fsp2 = valToPaths.get(dpDir);
-
-      if (fsp2 == null) {
-        // check # of dp
-        if (valToPaths.size() > maxPartitions) {
-          // throw fatal error
-          incrCounter(fatalErrorCntr, 1);
-          fatalError = true;
-          LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
-        }
-        fsp2 = createNewPaths(dpDir);
-      }
-      fp = fsp2;
-    } else {
-      fp = fsp;
-    }
-    return fp;
-  }
-
-  /**
-   * Append list bucketing dir name to original dir name.
-   * Skewed columns cannot be partitioned columns.
-   * @param lbDirName
-   * @param dpDir
-   * @return
-   */
-  private String appendListBucketingDirName(String lbDirName, String dpDir) {
-    StringBuilder builder = new StringBuilder(dpDir);
-    dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName)
-          .toString();
-    return dpDir;
-  }
-
-  // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
-  // return the relative path corresponding to the row.
-  // e.g., ds=2008-04-08/hr=11
-  private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
-    assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns";
-    return FileUtils.makePartName(dpColNames, row);
-  }
-
-  @Override
-  protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) {
-    errMsg.append("Operator ").append(getOperatorId()).append(" (id=").append(id).append("): ");
-    errMsg.append(counterCode > FATAL_ERR_MSG.length - 1 ?
-        "fatal error." :
-          FATAL_ERR_MSG[(int) counterCode]);
-    // number of partitions exceeds limit, list all the partition names
-    if (counterCode > 0) {
-      errMsg.append(lsDir());
-    }
-  }
-
-  // sample the partitions that are generated so that users have a sense of what's causing the error
-  private String lsDir() {
-    String specPath = conf.getDirName();
-    // need to get a JobConf here because it's not passed through at client side
-    JobConf jobConf = new JobConf(ExecDriver.class);
-    Path tmpPath = Utilities.toTempPath(specPath);
-    StringBuilder sb = new StringBuilder("\n");
-    try {
-      DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
-      int numDP = dpCtx.getNumDPCols();
-      FileSystem fs = tmpPath.getFileSystem(jobConf);
-      int level = numDP;
-      if (conf.isLinkedFileSink()) {
-        level++;
-      }
-      FileStatus[] status = Utilities.getFileStatusRecurse(tmpPath, level, fs);
-      sb.append("Sample of ")
-        .append(Math.min(status.length, 100))
-        .append(" partitions created under ")
-        .append(tmpPath.toString())
-        .append(":\n");
-      for (int i = 0; i < status.length; ++i) {
-        sb.append("\t.../");
-        sb.append(getPartitionSpec(status[i].getPath(), numDP))
-          .append("\n");
-      }
-      sb.append("...\n");
-    } catch (Exception e) {
-      // cannot get the subdirectories, just return the root directory
-      sb.append(tmpPath).append("...\n").append(e.getMessage());
-      e.printStackTrace();
-    } finally {
-      return sb.toString();
-    }
-  }
-
-  private String getPartitionSpec(Path path, int level) {
-    Stack<String> st = new Stack<String>();
-    Path p = path;
-    for (int i = 0; i < level; ++i) {
-      st.push(p.getName());
-      p = p.getParent();
-    }
-    StringBuilder sb = new StringBuilder();
-    while (!st.empty()) {
-      sb.append(st.pop());
-    }
-    return sb.toString();
-  }
-
-  @Override
-  public void closeOp(boolean abort) throws HiveException {
-    if (!bDynParts && !filesCreated) {
-      createBucketFiles(fsp);
-    }
-
-    lastProgressReport = System.currentTimeMillis();
-    if (!abort) {
-      for (FSPaths fsp : valToPaths.values()) {
-        fsp.closeWriters(abort);
-        if (isNativeTable) {
-          fsp.commit(fs);
-        }
-      }
-      // Only publish stats if this operator's flag was set to gather stats
-      if (conf.isGatherStats()) {
-        publishStats();
-      }
-    } else {
-      // Will come here if an Exception was thrown in map() or reduce().
-      // Hadoop always call close() even if an Exception was thrown in map() or
-      // reduce().
-      for (FSPaths fsp : valToPaths.values()) {
-        fsp.abortWriters(fs, abort, !autoDelete && isNativeTable);
-      }
-    }
-  }
-
-  /**
-   * @return the name of the operator
-   */
-  @Override
-  public String getName() {
-    return getOperatorName();
-  }
-
-  static public String getOperatorName() {
-    return "FS";
-  }
-
-  @Override
-  public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
-      throws HiveException {
-    try {
-      if ((conf != null) && isNativeTable) {
-        String specPath = conf.getDirName();
-        DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
-        if (conf.isLinkedFileSink() && (dpCtx != null)) {
-          specPath = conf.getParentDir();
-        }
-        Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
-            reporter);
-      }
-    } catch (IOException e) {
-      throw new HiveException(e);
-    }
-    super.jobCloseOp(hconf, success, feedBack);
-  }
-
-  @Override
-  public OperatorType getType() {
-    return OperatorType.FILESINK;
-  }
-
-  @Override
-  public void augmentPlan() {
-    PlanUtils.configureOutputJobPropertiesForStorageHandler(
-        getConf().getTableInfo());
-  }
-
-  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
-    if (hiveOutputFormat == null) {
-      try {
-        hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
-      } catch (Exception ex) {
-        throw new IOException(ex);
-      }
-    }
-    Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), job);
-
-    if (conf.getTableInfo().isNonNative()) {
-      //check the ouput specs only if it is a storage handler (native tables's outputformats does
-      //not set the job's output properties correctly)
-      try {
-        hiveOutputFormat.checkOutputSpecs(ignored, job);
-      } catch (NoSuchMethodError e) {
-        //For BC, ignore this for now, but leave a log message
-        LOG.warn("HiveOutputFormat should implement checkOutputSpecs() method`");
-      }
-    }
-  }
-
-  private void publishStats() throws HiveException {
-    boolean isStatsReliable = conf.isStatsReliable();
-
-    // Initializing a stats publisher
-    StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
-
-    if (statsPublisher == null) {
-      // just return, stats gathering should not block the main query
-      LOG.error("StatsPublishing error: StatsPublisher is not initialized.");
-      if (isStatsReliable) {
-        throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
-      }
-      return;
-    }
-
-    if (!statsPublisher.connect(hconf)) {
-      // just return, stats gathering should not block the main query
-      LOG.error("StatsPublishing error: cannot connect to database");
-      if (isStatsReliable) {
-        throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
-      }
-      return;
-    }
-
-    String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf));
-    String spSpec = conf.getStaticSpec() != null ? conf.getStaticSpec() : "";
-
-    for (String fspKey : valToPaths.keySet()) {
-      FSPaths fspValue = valToPaths.get(fspKey);
-      String key;
-
-      // construct the key(fileID) to insert into the intermediate stats table
-      if (fspKey == "") {
-        // for non-partitioned/static partitioned table, the key for temp storage is
-        // common key prefix + static partition spec + taskID
-        String keyPrefix = Utilities.getHashedStatsPrefix(
-            conf.getStatsAggPrefix() + spSpec, conf.getMaxStatsKeyPrefixLength());
-        key = keyPrefix + taskID;
-      } else {
-        // for partitioned table, the key is
-        // common key prefix + static partition spec + DynamicPartSpec + taskID
-        key = createKeyForStatsPublisher(taskID, spSpec, fspKey);
-      }
-      Map<String, String> statsToPublish = new HashMap<String, String>();
-      for (String statType : fspValue.stat.getStoredStats()) {
-        statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType)));
-      }
-      if (!statsPublisher.publishStat(key, statsToPublish)) {
-        // The original exception is lost.
-        // Not changing the interface to maintain backward compatibility
-        if (isStatsReliable) {
-          throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg());
-        }
-      }
-    }
-    if (!statsPublisher.closeConnection()) {
-      // The original exception is lost.
-      // Not changing the interface to maintain backward compatibility
-      if (isStatsReliable) {
-        throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
-      }
-    }
-  }
-
-  /**
-   * This is server side code to create key in order to save statistics to stats database.
-   * Client side will read it via StatsTask.java aggregateStats().
-   * Client side reads it via db query prefix which is based on partition spec.
-   * Since store-as-subdir information is not part of partition spec, we have to
-   * remove store-as-subdir information from variable "keyPrefix" calculation.
-   * But we have to keep store-as-subdir information in variable "key" calculation
-   * since each skewed value has a row in stats db and "key" is db key,
-   * otherwise later value overwrites previous value.
-   * Performance impact due to string handling is minimum since this method is
-   * only called once in FileSinkOperator closeOp().
-   * For example,
-   * create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES;
-   * skewedValueDirList contains 2 elements:
-   * 1. key=484/value=val_484
-   * 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
-   * Case #1: Static partition with store-as-sub-dir
-   * spSpec has SP path
-   * fspKey has either
-   * key=484/value=val_484 or
-   * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
-   * After filter, fspKey is empty, storedAsDirPostFix has either
-   * key=484/value=val_484 or
-   * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
-   * so, at the end, "keyPrefix" doesnt have subdir information but "key" has
-   * Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr
-   * spSpec has SP path
-   * fspKey has either
-   * hr=11/key=484/value=val_484 or
-   * hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
-   * After filter, fspKey is hr=11, storedAsDirPostFix has either
-   * key=484/value=val_484 or
-   * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
-   * so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has
-   * @param taskID
-   * @param spSpec
-   * @param fspKey
-   * @return
-   */
-  private String createKeyForStatsPublisher(String taskID, String spSpec, String fspKey) {
-    String key;
-    String newFspKey = fspKey;
-    String storedAsDirPostFix = "";
-    if (isSkewedStoredAsSubDirectories) {
-      List<String> skewedValueDirList = this.lbCtx.getSkewedValuesDirNames();
-      for (String dir : skewedValueDirList) {
-        newFspKey = newFspKey.replace(dir, "");
-        if (!newFspKey.equals(fspKey)) {
-          storedAsDirPostFix = dir;
-          break;
-        }
-      }
-    }
-    String keyPrefix = Utilities.getHashedStatsPrefix(
-        conf.getStatsAggPrefix() + spSpec + newFspKey + Path.SEPARATOR,
-        conf.getMaxStatsKeyPrefixLength());
-    key = keyPrefix + storedAsDirPostFix + taskID;
-    return key;
-  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Tue Sep  3 18:33:13 2013
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -48,31 +49,47 @@ public class VectorFilterOperator extend
   }
 
   private final transient LongWritable filtered_count, passed_count;
-  private transient VectorExpression conditionEvaluator;
+  private VectorExpression conditionEvaluator = null;
   transient int heartbeatInterval;
-  private final VectorizationContext vContext;
 
-  public VectorFilterOperator(VectorizationContext ctxt, OperatorDesc conf) {
+  // filterMode is 1 if condition is always true, -1 if always false
+  // and 0 if condition needs to be computed.
+  transient private int filterMode = 0;
+
+  public VectorFilterOperator(VectorizationContext vContext, OperatorDesc conf)
+      throws HiveException {
+    this();
+    vContext.setOperatorType(OperatorType.FILTER);
+    ExprNodeDesc oldExpression = ((FilterDesc) conf).getPredicate();
+    conditionEvaluator = vContext.getVectorExpression(oldExpression);
+  }
+
+  public VectorFilterOperator() {
     super();
-    this.vContext = ctxt;
     filtered_count = new LongWritable();
     passed_count = new LongWritable();
     this.conf = (FilterDesc) conf;
   }
 
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
-      ExprNodeDesc oldExpression = conf.getPredicate();
-      vContext.setOperatorType(OperatorType.FILTER);
-      conditionEvaluator = vContext.getVectorExpression(oldExpression);
       statsMap.put(Counter.FILTERED, filtered_count);
       statsMap.put(Counter.PASSED, passed_count);
     } catch (Throwable e) {
       throw new HiveException(e);
     }
+    if (conditionEvaluator instanceof ConstantVectorExpression) {
+      ConstantVectorExpression cve = (ConstantVectorExpression) this.conditionEvaluator;
+      if (cve.getLongValue() == 1) {
+        filterMode = 1;
+      } else {
+        filterMode = -1;
+      }
+    }
     initializeChildren(hconf);
   }
 
@@ -86,7 +103,18 @@ public class VectorFilterOperator extend
     VectorizedRowBatch vrg = (VectorizedRowBatch) row;
     //Evaluate the predicate expression
     //The selected vector represents selected rows.
-    conditionEvaluator.evaluate(vrg);
+    switch (filterMode) {
+      case 0:
+        conditionEvaluator.evaluate(vrg);
+        break;
+      case -1:
+        // All will be filtered out
+        vrg.size = 0;
+        break;
+      case 1:
+      default:
+        // All are selected, do nothing
+    }
     if (vrg.size > 0) {
       forward(vrg, null);
     }
@@ -108,4 +136,12 @@ public class VectorFilterOperator extend
   public OperatorType getType() {
     return OperatorType.FILTER;
   }
+
+  public VectorExpression getConditionEvaluator() {
+    return conditionEvaluator;
+  }
+
+  public void setConditionEvaluator(VectorExpression conditionEvaluator) {
+    this.conditionEvaluator = conditionEvaluator;
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Sep  3 18:33:13 2013
@@ -57,21 +57,19 @@ public class VectorGroupByOperator exten
   private static final Log LOG = LogFactory.getLog(
       VectorGroupByOperator.class.getName());
 
-  private final VectorizationContext vContext;
-
   /**
    * This is the vector of aggregators. They are stateless and only implement
    * the algorithm of how to compute the aggregation. state is kept in the
    * aggregation buffers and is our responsibility to match the proper state for each key.
    */
-  private transient VectorAggregateExpression[] aggregators;
+  private VectorAggregateExpression[] aggregators;
 
   /**
    * Key vector expressions.
    */
-  private transient VectorExpression[] keyExpressions;
+  private VectorExpression[] keyExpressions;
 
-  private VectorExpressionWriter[] keyOutputWriters;
+  private transient VectorExpressionWriter[] keyOutputWriters;
 
   /**
    * The aggregation buffers to use for the current batch.
@@ -141,10 +139,24 @@ public class VectorGroupByOperator exten
 
   private static final long serialVersionUID = 1L;
 
-  public VectorGroupByOperator(VectorizationContext ctxt, OperatorDesc conf) {
+  public VectorGroupByOperator(VectorizationContext vContext, OperatorDesc conf)
+      throws HiveException {
+    this();
+    GroupByDesc desc = (GroupByDesc) conf;
+    this.conf = desc;
+    vContext.setOperatorType(OperatorType.GROUPBY);
+    List<ExprNodeDesc> keysDesc = desc.getKeys();
+    keyExpressions = vContext.getVectorExpressions(keysDesc);
+    ArrayList<AggregationDesc> aggrDesc = desc.getAggregators();
+    aggregators = new VectorAggregateExpression[aggrDesc.size()];
+    for (int i = 0; i < aggrDesc.size(); ++i) {
+      AggregationDesc aggDesc = aggrDesc.get(i);
+      aggregators[i] = vContext.getAggregatorExpression(aggDesc);
+    }
+  }
+
+  public VectorGroupByOperator() {
     super();
-    this.vContext = ctxt;
-    this.conf = (GroupByDesc) conf;
   }
 
   @Override
@@ -152,11 +164,8 @@ public class VectorGroupByOperator exten
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
+    List<ExprNodeDesc> keysDesc = conf.getKeys();
     try {
-      vContext.setOperatorType(OperatorType.GROUPBY);
-
-      List<ExprNodeDesc> keysDesc = conf.getKeys();
-      keyExpressions = vContext.getVectorExpressions(keysDesc);
 
       keyOutputWriters = new VectorExpressionWriter[keyExpressions.length];
 
@@ -166,11 +175,8 @@ public class VectorGroupByOperator exten
         objectInspectors.add(keyOutputWriters[i].getObjectInspector());
       }
 
-      ArrayList<AggregationDesc> aggrDesc = conf.getAggregators();
-      aggregators = new VectorAggregateExpression[aggrDesc.size()];
-      for (int i = 0; i < aggrDesc.size(); ++i) {
-        AggregationDesc desc = aggrDesc.get(i);
-        aggregators[i] = vContext.getAggregatorExpression (desc);
+      for (int i = 0; i < aggregators.length; ++i) {
+        aggregators[i].init(conf.getAggregators().get(i));
         objectInspectors.add(aggregators[i].getOutputObjectInspector());
       }
 
@@ -215,13 +221,15 @@ public class VectorGroupByOperator exten
 
     maxHashTblMemory = (int)(maxMemory * memoryThreshold);
 
-    LOG.info(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
-        maxHashTblMemory/1024/1024,
-        maxMemory/1024/1024,
-        memoryThreshold,
-        fixedHashEntrySize,
-        keyWrappersBatch.getKeysFixedSize(),
-        aggregationBatchInfo.getAggregatorsFixedSize()));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
+          maxHashTblMemory/1024/1024,
+          maxMemory/1024/1024,
+          memoryThreshold,
+          fixedHashEntrySize,
+          keyWrappersBatch.getKeysFixedSize(),
+          aggregationBatchInfo.getAggregatorsFixedSize()));
+    }
 
   }
 
@@ -264,15 +272,16 @@ public class VectorGroupByOperator exten
       (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
     int entriesFlushed = 0;
 
-    LOG.info(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)",
-        entriesToFlush, all ? "(all)" : "",
-        numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
-        numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
-        maxHashTblMemory/1024/1024));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)",
+          entriesToFlush, all ? "(all)" : "",
+          numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
+          numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
+          maxHashTblMemory/1024/1024));
+    }
 
     Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
     if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) {
-
       // if this is a global aggregation (no keys) and empty set, must still emit NULLs
       VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer();
       for (int i = 0; i < aggregators.length; ++i) {
@@ -280,7 +289,6 @@ public class VectorGroupByOperator exten
       }
       forward(forwardCache, outputObjInspector);
     } else {
-
       /* Iterate the global (keywrapper,aggregationbuffers) map and emit
        a row for each key */
       Iterator<Map.Entry<KeyWrapper, VectorAggregationBufferRow>> iter =
@@ -297,8 +305,10 @@ public class VectorGroupByOperator exten
           forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue()
               .getAggregationBuffer(i));
         }
-        LOG.debug(String.format("forwarding keys: %s: %s",
-            pair.getKey().toString(), Arrays.toString(forwardCache)));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("forwarding keys: %s: %s",
+              pair.getKey().toString(), Arrays.toString(forwardCache)));
+        }
         forward(forwardCache, outputObjInspector);
 
         if (!all) {
@@ -441,5 +451,21 @@ public class VectorGroupByOperator exten
     return OperatorType.GROUPBY;
   }
 
+  public VectorExpression[] getKeyExpressions() {
+    return keyExpressions;
+  }
+
+  public void setKeyExpressions(VectorExpression[] keyExpressions) {
+    this.keyExpressions = keyExpressions;
+  }
+
+  public VectorAggregateExpression[] getAggregators() {
+    return aggregators;
+  }
+
+  public void setAggregators(VectorAggregateExpression[] aggregators) {
+    this.aggregators = aggregators;
+  }
+
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Tue Sep  3 18:33:13 2013
@@ -52,26 +52,24 @@ public class VectorReduceSinkOperator ex
 
   private static final long serialVersionUID = 1L;
 
-  private final VectorizationContext vContext;
-
   /**
    * The evaluators for the key columns. Key columns decide the sort order on
    * the reducer side. Key columns are passed to the reducer in the "key".
    */
-  protected transient VectorExpression[] keyEval;
-  
+  protected VectorExpression[] keyEval;
+
   /**
    * The key value writers. These know how to write the necessary writable type
    * based on key column metadata, from the primitive vector type.
    */
   protected transient VectorExpressionWriter[] keyWriters;
-  
+
   /**
    * The evaluators for the value columns. Value columns are passed to reducer
    * in the "value".
    */
-  protected transient VectorExpression[] valueEval;
-  
+  protected VectorExpression[] valueEval;
+
   /**
    * The output value writers. These know how to write the necessary writable type
    * based on value column metadata, from the primitive vector type.
@@ -83,19 +81,19 @@ public class VectorReduceSinkOperator ex
    * Hive language). Partition columns decide the reducer that the current row
    * goes to. Partition columns are not passed to reducer.
    */
-  protected transient VectorExpression[] partitionEval;
-  
+  protected VectorExpression[] partitionEval;
+
   /**
    * The partition value writers. These know how to write the necessary writable type
    * based on partition column metadata, from the primitive vector type.
-   */  
+   */
   protected transient VectorExpressionWriter[] partitionWriters;
 
-  private int numDistributionKeys;
+  private transient int numDistributionKeys;
 
-  private List<List<Integer>> distinctColIndices;
+  private transient List<List<Integer>> distinctColIndices;
 
-  private int numDistinctExprs;
+  private transient int numDistinctExprs;
 
   transient HiveKey keyWritable = new HiveKey();
   transient Writable value;
@@ -115,14 +113,24 @@ public class VectorReduceSinkOperator ex
   transient ObjectInspector[] partitionObjectInspectors;
   transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
 
+  public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
+      throws HiveException {
+    this();
+    ReduceSinkDesc desc = (ReduceSinkDesc) conf;
+    this.conf = desc;
+    vContext.setOperatorType(OperatorType.REDUCESINK);
+    keyEval = vContext.getVectorExpressions(desc.getKeyCols());
+    valueEval = vContext.getVectorExpressions(desc.getValueCols());
+    partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
+  }
+
+  public VectorReduceSinkOperator() {
+    super();
+  }
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
-      vContext.setOperatorType(OperatorType.REDUCESINK);
-      keyEval = vContext.getVectorExpressions(conf.getKeyCols());
-      valueEval = vContext.getVectorExpressions(conf.getValueCols());
-      partitionEval = vContext.getVectorExpressions(conf.getPartitionCols());
 
       numDistributionKeys = conf.getNumDistributionKeys();
       distinctColIndices = conf.getDistinctColumnIndices();
@@ -133,12 +141,12 @@ public class VectorReduceSinkOperator ex
           .newInstance();
       keySerializer.initialize(null, keyTableDesc.getProperties());
       keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-      
+
       /*
-       * Compute and assign the key writers and the key object inspector 
+       * Compute and assign the key writers and the key object inspector
        */
       VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getKeyCols(), 
+          conf.getKeyCols(),
           conf.getOutputKeyColumnNames(),
           new VectorExpressionWriterFactory.Closure() {
             @Override
@@ -148,7 +156,7 @@ public class VectorReduceSinkOperator ex
               keyObjectInspector = objectInspector;
             }
           });
-      
+
       String colNames = "";
       for(String colName : conf.getOutputKeyColumnNames()) {
         colNames = String.format("%s %s", colNames, colName);
@@ -160,12 +168,12 @@ public class VectorReduceSinkOperator ex
           colNames));
 
       partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
-      
+
       TableDesc valueTableDesc = conf.getValueSerializeInfo();
       valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
           .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
-      
+
       /*
        * Compute and assign the value writers and the value object inspector
        */
@@ -323,13 +331,6 @@ public class VectorReduceSinkOperator ex
     }
   }
 
-  public VectorReduceSinkOperator (
-      VectorizationContext context,
-      OperatorDesc conf) {
-    this.vContext = context;
-    this.conf = (ReduceSinkDesc) conf;
-  }
-
   /**
    * @return the name of the operator
    */
@@ -352,4 +353,28 @@ public class VectorReduceSinkOperator ex
     return false;
   }
 
+  public VectorExpression[] getPartitionEval() {
+    return partitionEval;
+  }
+
+  public void setPartitionEval(VectorExpression[] partitionEval) {
+    this.partitionEval = partitionEval;
+  }
+
+  public VectorExpression[] getValueEval() {
+    return valueEval;
+  }
+
+  public void setValueEval(VectorExpression[] valueEval) {
+    this.valueEval = valueEval;
+  }
+
+  public VectorExpression[] getKeyEval() {
+    return keyEval;
+  }
+
+  public void setKeyEval(VectorExpression[] keyEval) {
+    this.keyEval = keyEval;
+  }
+
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Tue Sep  3 18:33:13 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
@@ -43,17 +44,27 @@ public class VectorSelectOperator extend
 
   private static final long serialVersionUID = 1L;
 
-  protected transient VectorExpression[] vExpressions;
+  protected VectorExpression[] vExpressions = null;
 
-  private final VectorizationContext vContext;
+  private transient int [] projectedColumns = null;
 
-  private int [] projectedColumns = null;
+  private transient VectorExpressionWriter [] valueWriters = null;
 
-  private VectorExpressionWriter [] valueWriters = null;
-
-  public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
-    this.vContext = ctxt;
+  public VectorSelectOperator(VectorizationContext vContext, OperatorDesc conf)
+      throws HiveException {
     this.conf = (SelectDesc) conf;
+    List<ExprNodeDesc> colList = this.conf.getColList();
+    vContext.setOperatorType(OperatorType.SELECT);
+    vExpressions = new VectorExpression[colList.size()];
+    for (int i = 0; i < colList.size(); i++) {
+      vExpressions[i] = vContext.getVectorExpression(colList.get(i));
+      String columnName = this.conf.getOutputColumnNames().get(i);
+      // Update column map with output column names
+      vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
+    }
+  }
+
+  public VectorSelectOperator() {
   }
 
   @Override
@@ -67,14 +78,6 @@ public class VectorSelectOperator extend
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
     List<ExprNodeDesc> colList = conf.getColList();
-    vContext.setOperatorType(OperatorType.SELECT);
-    vExpressions = new VectorExpression[colList.size()];
-    for (int i = 0; i < colList.size(); i++) {
-      vExpressions[i] = vContext.getVectorExpression(colList.get(i));
-      String columnName = conf.getOutputColumnNames().get(i);
-      // Update column map with output column names
-      vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
-    }
     valueWriters = VectorExpressionWriterFactory.getExpressionWriters(colList);
     for (VectorExpressionWriter vew : valueWriters) {
       objectInspectors.add(vew.getObjectInspector());
@@ -141,4 +144,21 @@ public class VectorSelectOperator extend
   public OperatorType getType() {
     return OperatorType.SELECT;
   }
+
+  @Explain (displayName = "vector expressions")
+  public VectorExpression[] getvExpressions() {
+    return vExpressions;
+  }
+
+  public VectorExpression[] getVExpressions() {
+    return vExpressions;
+  }
+
+  public void setvExpressions(VectorExpression[] vExpressions) {
+    this.vExpressions = vExpressions;
+  }
+
+  public void setVExpressions(VectorExpression[] vExpressions) {
+    this.vExpressions = vExpressions;
+  }
 }