You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 03:22:02 UTC

svn commit: r1514554 [5/18] - in /hive/branches/vectorization: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ cli/src/test/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Aug 16 01:21:54 2013
@@ -63,6 +63,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -108,6 +109,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -124,6 +126,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
@@ -131,12 +134,10 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.Adjacency;
 import org.apache.hadoop.hive.ql.plan.api.Graph;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
@@ -150,9 +151,11 @@ import org.apache.hadoop.hive.shims.Shim
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -429,6 +432,7 @@ public final class Utilities {
       return new Expression(dateVal, dateVal.getClass(), "new", args);
     }
 
+    @Override
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       if (oldInstance == null || newInstance == null) {
         return false;
@@ -442,6 +446,7 @@ public final class Utilities {
    * it is not serialization friendly.
    */
   public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+    @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
       Timestamp ts = (Timestamp)oldInstance;
       Object[] args = { ts.getNanos() };
@@ -451,21 +456,21 @@ public final class Utilities {
   }
 
   public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
-    setMapWork(conf, w.getMapWork(), hiveScratchDir);
+    setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
     if (w.getReduceWork() != null) {
-      setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+      setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
     }
   }
 
-  public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
-    setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+  public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) {
+    return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache);
   }
 
-  public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
-    setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+  public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) {
+    return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache);
   }
 
-  private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
+  private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) {
     try {
       setPlanPath(conf, hiveScratchDir);
 
@@ -479,7 +484,7 @@ public final class Utilities {
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
       // able to get the plan directly from the cache
-      if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
+      if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
         // Set up distributed cache
         if (!DistributedCache.getSymlink(conf)) {
           DistributedCache.createSymlink(conf);
@@ -495,6 +500,8 @@ public final class Utilities {
 
       // Cache the plan in this process
       gWorkMap.put(planPath, w);
+
+      return planPath;
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);
@@ -609,6 +616,7 @@ public final class Utilities {
    * De-serialize an object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */
+  @SuppressWarnings("unchecked")
   public static <T> T deserializeObject(InputStream in) {
     XMLDecoder d = null;
     try {
@@ -1778,7 +1786,7 @@ public final class Utilities {
     }
   }
 
-  public static Object getInputSummaryLock = new Object();
+  public static Object INPUT_SUMMARY_LOCK = new Object();
 
   /**
    * Calculate the total size of input files.
@@ -1801,7 +1809,7 @@ public final class Utilities {
 
     // Since multiple threads could call this method concurrently, locking
     // this method will avoid number of threads out of control.
-    synchronized (getInputSummaryLock) {
+    synchronized (INPUT_SUMMARY_LOCK) {
       // For each input path, calculate the total size.
       for (String path : work.getPathToAliases().keySet()) {
         Path p = new Path(path);
@@ -1912,7 +1920,7 @@ public final class Utilities {
                 throw new IOException(e);
               }
             } while (!executorDone);
-    }
+          }
           executor.shutdown();
         }
         HiveInterruptUtils.checkInterrupted();
@@ -1936,7 +1944,7 @@ public final class Utilities {
     }
   }
 
-  public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
+  public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx)
       throws Exception {
     ContentSummary cs = ctx.getCS(dirPath);
     if (cs != null) {
@@ -1946,8 +1954,7 @@ public final class Utilities {
     } else {
       LOG.info("Content Summary not cached for " + dirPath);
     }
-    Path p = new Path(dirPath);
-    return isEmptyPath(job, p);
+    return isEmptyPath(job, dirPath);
   }
 
   public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception {
@@ -2532,5 +2539,415 @@ public final class Utilities {
 
     return sb.toString();
   }
+
+  /**
+   * Estimate the number of reducers needed for this job, based on job input,
+   * and configuration parameters.
+   *
+   * The output of this method should only be used if the output of this
+   * MapRedTask is not being used to populate a bucketed table and the user
+   * has not specified the number of reducers to use.
+   *
+   * @return the number of reducers.
+   */
+  public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,
+                                             MapWork work, boolean finalMapRed) throws IOException {
+    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+    double samplePercentage = getHighestSamplePercentage(work);
+    long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);
+
+    // if all inputs are sampled, we should shrink the size of reducers accordingly.
+    if (totalInputFileSize != inputSummary.getLength()) {
+      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
+    } else {
+      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+    }
+
+    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+    reducers = Math.max(1, reducers);
+    reducers = Math.min(maxReducers, reducers);
+
+    // If this map reduce job writes final data to a table and bucketing is being inferred,
+    // and the user has configured Hive to do this, make sure the number of reducers is a
+    // power of two
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
+        finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) {
+
+      int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
+      int reducersPowerTwo = (int)Math.pow(2, reducersLog);
+
+      // If the original number of reducers was a power of two, use that
+      if (reducersPowerTwo / 2 == reducers) {
+        return reducers;
+      } else if (reducersPowerTwo > maxReducers) {
+        // If the next power of two greater than the original number of reducers is greater
+        // than the max number of reducers, use the preceding power of two, which is strictly
+        // less than the original number of reducers and hence the max
+        reducers = reducersPowerTwo / 2;
+      } else {
+        // Otherwise use the smallest power of two greater than the original number of reducers
+        reducers = reducersPowerTwo;
+      }
+    }
+
+    return reducers;
+  }
+
+  /**
+   * Computes the total input file size. If block sampling was used it will scale this
+   * value by the highest sample percentage (as an estimate for input).
+   *
+   * @param inputSummary
+   * @param work
+   * @param highestSamplePercentage
+   * @return estimated total input size for job
+   */
+  public static long getTotalInputFileSize (ContentSummary inputSummary, MapWork work,
+      double highestSamplePercentage) {
+    long totalInputFileSize = inputSummary.getLength();
+    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+      // If percentage block sampling wasn't used, we don't need to do any estimation
+      return totalInputFileSize;
+    }
+
+    if (highestSamplePercentage >= 0) {
+      totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
+          , totalInputFileSize);
+    }
+    return totalInputFileSize;
+  }
+
+  /**
+   * Computes the total number of input files. If block sampling was used it will scale this
+   * value by the highest sample percentage (as an estimate for # input files).
+   *
+   * @param inputSummary
+   * @param work
+   * @param highestSamplePercentage
+   * @return
+   */
+  public static long getTotalInputNumFiles (ContentSummary inputSummary, MapWork work,
+      double highestSamplePercentage) {
+    long totalInputNumFiles = inputSummary.getFileCount();
+    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+      // If percentage block sampling wasn't used, we don't need to do any estimation
+      return totalInputNumFiles;
+    }
+
+    if (highestSamplePercentage >= 0) {
+      totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
+          , totalInputNumFiles);
+    }
+    return totalInputNumFiles;
+  }
+
+  /**
+   * Returns the highest sample percentage of any alias in the given MapWork
+   */
+  public static double getHighestSamplePercentage (MapWork work) {
+    double highestSamplePercentage = 0;
+    for (String alias : work.getAliasToWork().keySet()) {
+      if (work.getNameToSplitSample().containsKey(alias)) {
+        Double rate = work.getNameToSplitSample().get(alias).getPercent();
+        if (rate != null && rate > highestSamplePercentage) {
+          highestSamplePercentage = rate;
+        }
+      } else {
+        highestSamplePercentage = -1;
+        break;
+      }
+    }
+
+    return highestSamplePercentage;
+  }
+
+  /**
+   * Computes a list of all input paths needed to compute the given MapWork. All aliases
+   * are considered and a merged list of input paths is returned. If any input path points
+   * to an empty table or partition a dummy file in the scratch dir is instead created and
+   * added to the list. This is needed to avoid special casing the operator pipeline for
+   * these cases.
+   *
+   * @param job JobConf used to run the job
+   * @param work MapWork encapsulating the info about the task
+   * @param hiveScratchDir The tmp dir used to create dummy files if needed
+   * @param ctx Context object
+   * @return List of paths to process for the given MapWork
+   * @throws Exception
+   */
+  public static List<Path> getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
+      throws Exception {
+    int sequenceNumber = 0;
+
+    Set<Path> pathsProcessed = new HashSet<Path>();
+    List<Path> pathsToAdd = new LinkedList<Path>();
+    // AliasToWork contains all the aliases
+    for (String alias : work.getAliasToWork().keySet()) {
+      LOG.info("Processing alias " + alias);
+
+      // The alias may not have any path
+      Path path = null;
+      for (String file : new LinkedList<String>(work.getPathToAliases().keySet())) {
+        List<String> aliases = work.getPathToAliases().get(file);
+        if (aliases.contains(alias)) {
+          path = new Path(file);
+
+          // Multiple aliases can point to the same path - it should be
+          // processed only once
+          if (pathsProcessed.contains(path)) {
+            continue;
+          }
+
+          pathsProcessed.add(path);
+
+          LOG.info("Adding input file " + path);
+          if (isEmptyPath(job, path, ctx)) {
+            path = createDummyFileForEmptyPartition(path, job, work,
+                 hiveScratchDir, alias, sequenceNumber++);
+
+          }
+          pathsToAdd.add(path);
+        }
+      }
+
+      // If the query references non-existent partitions
+      // We need to add a empty file, it is not acceptable to change the
+      // operator tree
+      // Consider the query:
+      // select * from (select count(1) from T union all select count(1) from
+      // T2) x;
+      // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
+      // rows)
+      if (path == null) {
+        path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
+            alias, sequenceNumber++);
+        pathsToAdd.add(path);
+      }
+    }
+    return pathsToAdd;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private static Path createEmptyFile(String hiveScratchDir,
+      Class<? extends HiveOutputFormat> outFileFormat, JobConf job,
+      int sequenceNumber, Properties props, boolean dummyRow)
+          throws IOException, InstantiationException, IllegalAccessException {
+
+    // create a dummy empty file in a new directory
+    String newDir = hiveScratchDir + File.separator + sequenceNumber;
+    Path newPath = new Path(newDir);
+    FileSystem fs = newPath.getFileSystem(job);
+    fs.mkdirs(newPath);
+    //Qualify the path against the file system. The user configured path might contain default port which is skipped
+    //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
+    //file path.
+    newPath = fs.makeQualified(newPath);
+    String newFile = newDir + File.separator + "emptyFile";
+    Path newFilePath = new Path(newFile);
+
+    String onefile = newPath.toString();
+    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+        Text.class, false, props, null);
+    if (dummyRow) {
+      // empty files are omitted at CombineHiveInputFormat.
+      // for meta-data only query, it effectively makes partition columns disappear..
+      // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
+      recWriter.write(new Text("empty"));  // written via HiveIgnoreKeyTextOutputFormat
+    }
+    recWriter.close(false);
+
+    return newPath;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
+      String hiveScratchDir, String alias, int sequenceNumber)
+          throws IOException, InstantiationException, IllegalAccessException {
+
+    String strPath = path.toString();
+
+    // The input file does not exist, replace it by a empty file
+    PartitionDesc partDesc = work.getPathToPartitionInfo().get(strPath);
+    boolean nonNative = partDesc.getTableDesc().isNonNative();
+    boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
+    Properties props = partDesc.getProperties();
+    Class<? extends HiveOutputFormat> outFileFormat = partDesc.getOutputFileFormatClass();
+
+    if (nonNative) {
+      // if this isn't a hive table we can't create an empty file for it.
+      return path;
+    }
+
+    Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+        sequenceNumber, props, oneRow);
+
+
+    LOG.info("Changed input file to " + newPath);
+
+    // update the work
+    String strNewPath = newPath.toString();
+
+    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    pathToAliases.put(strNewPath, pathToAliases.get(strPath));
+    pathToAliases.remove(strPath);
+
+    work.setPathToAliases(pathToAliases);
+
+    LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+    pathToPartitionInfo.put(strNewPath, pathToPartitionInfo.get(strPath));
+    pathToPartitionInfo.remove(strPath);
+    work.setPathToPartitionInfo(pathToPartitionInfo);
+
+    return newPath;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static Path createDummyFileForEmptyTable(JobConf job, MapWork work,
+      String hiveScratchDir, String alias, int sequenceNumber)
+          throws IOException, InstantiationException, IllegalAccessException {
+
+    TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
+    Properties props = tableDesc.getProperties();
+    boolean nonNative = tableDesc.isNonNative();
+    Class<? extends HiveOutputFormat> outFileFormat = tableDesc.getOutputFileFormatClass();
+
+    if (nonNative) {
+      // if this isn't a hive table we can't create an empty file for it.
+      return null;
+    }
+
+    Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+        sequenceNumber, props, false);
+
+
+    LOG.info("Changed input file to " + newPath.toString());
+
+    // update the work
+
+    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    ArrayList<String> newList = new ArrayList<String>();
+    newList.add(alias);
+    pathToAliases.put(newPath.toUri().toString(), newList);
+
+    work.setPathToAliases(pathToAliases);
+
+    LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+    PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+    pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
+    work.setPathToPartitionInfo(pathToPartitionInfo);
+
+    return newPath;
+  }
+
+  /**
+   * setInputPaths add all the paths in the provided list to the Job conf object
+   * as input paths for the job.
+   *
+   * @param job
+   * @param pathsToAdd
+   */
+  public static void setInputPaths(JobConf job, List<Path> pathsToAdd) {
+
+    Path[] addedPaths = FileInputFormat.getInputPaths(job);
+    if (addedPaths == null) {
+      addedPaths = new Path[0];
+    }
+
+    Path[] combined = new Path[addedPaths.length + pathsToAdd.size()];
+    System.arraycopy(addedPaths, 0, combined, 0, addedPaths.length);
+
+    int i = 0;
+    for(Path p: pathsToAdd) {
+      combined[addedPaths.length + (i++)] = p;
+    }
+    FileInputFormat.setInputPaths(job, combined);
+  }
+
+  /**
+   * Set hive input format, and input format file if necessary.
+   */
+  public static void setInputAttributes(Configuration conf, MapWork mWork) {
+    if (mWork.getInputformat() != null) {
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+    }
+    if (mWork.getIndexIntermediateFile() != null) {
+      conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+      conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
+    }
+
+    // Intentionally overwrites anything the user may have put here
+    conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
+  }
+
+  /**
+   * Hive uses tmp directories to capture the output of each FileSinkOperator.
+   * This method creates all necessary tmp directories for FileSinks in the Mapwork.
+   *
+   * @param conf Used to get the right FileSystem
+   * @param mWork Used to find FileSinkOperators
+   * @throws IOException
+   */
+  public static void createTmpDirs(Configuration conf, MapWork mWork)
+      throws IOException {
+
+    Map<String, ArrayList<String>> pa = mWork.getPathToAliases();
+    if (pa != null) {
+      List<Operator<? extends OperatorDesc>> ops =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+      for (List<String> ls : pa.values()) {
+        for (String a : ls) {
+          ops.add(mWork.getAliasToWork().get(a));
+        }
+      }
+      createTmpDirs(conf, ops);
+    }
+  }
+
+  /**
+   * Hive uses tmp directories to capture the output of each FileSinkOperator.
+   * This method creates all necessary tmp directories for FileSinks in the ReduceWork.
+   *
+   * @param conf Used to get the right FileSystem
+   * @param rWork Used to find FileSinkOperators
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static void createTmpDirs(Configuration conf, ReduceWork rWork)
+      throws IOException {
+    if (rWork == null) {
+      return;
+    }
+    List<Operator<? extends OperatorDesc>> ops
+      = new LinkedList<Operator<? extends OperatorDesc>>();
+    ops.add(rWork.getReducer());
+    createTmpDirs(conf, ops);
+  }
+
+  private static void createTmpDirs(Configuration conf,
+      List<Operator<? extends OperatorDesc>> ops) throws IOException {
+
+    while (!ops.isEmpty()) {
+      Operator<? extends OperatorDesc> op = ops.remove(0);
+
+      if (op instanceof FileSinkOperator) {
+        FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+        String tempDir = fdesc.getDirName();
+
+        if (tempDir != null) {
+          Path tempPath = Utilities.toTempPath(new Path(tempDir));
+          FileSystem fs = tempPath.getFileSystem(conf);
+          fs.mkdirs(tempPath);
+        }
+      }
+
+      if (op.getChildOperators() != null) {
+        ops.addAll(op.getChildOperators());
+      }
+    }
+  }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Fri Aug 16 01:21:54 2013
@@ -50,5 +50,17 @@ public @interface WindowFunctionDescript
 	 * for all the rows.
 	 */
 	boolean pivotResult() default false;
+
+	/**
+	 * Used in translations process to validate arguments
+	 * @return true if ranking function
+	 */
+	boolean rankingFunction() default false;
+
+	 /**
+	  * Using in analytical functions to specify that UDF implies an ordering
+	  * @return true if the function implies order
+	  */
+	 boolean impliesOrder() default false;
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Fri Aug 16 01:21:54 2013
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import org.apache.hadoop.hive.ql.exec.FunctionInfo;
-import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
 
 @SuppressWarnings("deprecation")
@@ -27,6 +25,7 @@ public class WindowFunctionInfo
 {
 	boolean supportsWindow = true;
 	boolean pivotResult = false;
+	boolean impliesOrder = false;
 	FunctionInfo fInfo;
 
 	WindowFunctionInfo(FunctionInfo fInfo)
@@ -39,6 +38,7 @@ public class WindowFunctionInfo
 		{
 			supportsWindow = def.supportsWindow();
 			pivotResult = def.pivotResult();
+			impliesOrder = def.impliesOrder();
 		}
 	}
 
@@ -52,6 +52,9 @@ public class WindowFunctionInfo
 		return pivotResult;
 	}
 
+	public boolean isImpliesOrder(){
+	  return impliesOrder;
+	}
 	public FunctionInfo getfInfo()
 	{
 		return fInfo;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Fri Aug 16 01:21:54 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.mr;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -30,13 +29,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -56,8 +50,6 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
 import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -71,20 +63,16 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
-import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -95,13 +83,11 @@ import org.apache.hadoop.hive.shims.Shim
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
 import org.apache.log4j.Appender;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.FileAppender;
@@ -216,49 +202,10 @@ public class ExecDriver extends Task<Map
     return false;
   }
 
-  protected void createTmpDirs() throws IOException {
-    // fix up outputs
-    Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
-    if (pa != null) {
-      List<Operator<? extends OperatorDesc>> opList =
-        new ArrayList<Operator<? extends OperatorDesc>>();
-
-      if (work.getReduceWork() != null) {
-        opList.add(work.getReduceWork().getReducer());
-      }
-
-      for (List<String> ls : pa.values()) {
-        for (String a : ls) {
-          opList.add(work.getMapWork().getAliasToWork().get(a));
-
-          while (!opList.isEmpty()) {
-            Operator<? extends OperatorDesc> op = opList.remove(0);
-
-            if (op instanceof FileSinkOperator) {
-              FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
-              String tempDir = fdesc.getDirName();
-
-              if (tempDir != null) {
-                Path tempPath = Utilities.toTempPath(new Path(tempDir));
-                LOG.info("Making Temp Directory: " + tempDir);
-                FileSystem fs = tempPath.getFileSystem(job);
-                fs.mkdirs(tempPath);
-              }
-            }
-
-            if (op.getChildOperators() != null) {
-              opList.addAll(op.getChildOperators());
-            }
-          }
-        }
-      }
-    }
-  }
-
    /**
    * Execute a query plan using Hadoop.
    */
-  @SuppressWarnings("deprecation")
+  @SuppressWarnings({"deprecation", "unchecked"})
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -451,7 +398,8 @@ public class ExecDriver extends Task<Map
         }
       }
       work.configureJobConf(job);
-      addInputPaths(job, mWork, emptyScratchDirStr, ctx);
+      List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx);
+      Utilities.setInputPaths(job, inputPaths);
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
 
@@ -491,7 +439,8 @@ public class ExecDriver extends Task<Map
         }
       }
 
-      this.createTmpDirs();
+      Utilities.createTmpDirs(job, mWork);
+      Utilities.createTmpDirs(job, rWork);
 
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
@@ -629,7 +578,7 @@ public class ExecDriver extends Task<Map
     String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
 
     Path partitionFile = new Path(tmpPath, ".partitions");
-    TotalOrderPartitioner.setPartitionFile(job, partitionFile);
+    ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile);
 
     PartitionKeySampler sampler = new PartitionKeySampler();
 
@@ -732,6 +681,7 @@ public class ExecDriver extends Task<Map
     }
   }
 
+  @SuppressWarnings("unchecked")
   public static void main(String[] args) throws IOException, HiveException {
 
     String planFileName = null;
@@ -739,6 +689,7 @@ public class ExecDriver extends Task<Map
     boolean noLog = false;
     String files = null;
     boolean localtask = false;
+    String hadoopAuthToken = null;
     try {
       for (int i = 0; i < args.length; i++) {
         if (args[i].equals("-plan")) {
@@ -751,6 +702,9 @@ public class ExecDriver extends Task<Map
           files = args[++i];
         } else if (args[i].equals("-localtask")) {
           localtask = true;
+        } else if (args[i].equals("-hadooptoken")) {
+          //set with HS2 in secure mode with doAs
+          hadoopAuthToken = args[++i];
         }
       }
     } catch (IndexOutOfBoundsException e) {
@@ -772,6 +726,9 @@ public class ExecDriver extends Task<Map
     if (files != null) {
       conf.set("tmpfiles", files);
     }
+    if(hadoopAuthToken != null){
+      conf.set("mapreduce.job.credentials.binary", hadoopAuthToken);
+    }
 
     boolean isSilent = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESESSIONSILENT);
 
@@ -908,164 +865,6 @@ public class ExecDriver extends Task<Map
     return w.getReduceWork() != null;
   }
 
-  /**
-   * Handle a empty/null path for a given alias.
-   */
-  private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
-      int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
-    // either the directory does not exist or it is empty
-    assert path == null || isEmptyPath;
-
-    // The input file does not exist, replace it by a empty file
-    Class<? extends HiveOutputFormat> outFileFormat = null;
-    boolean nonNative = true;
-    boolean oneRow = false;
-    Properties props;
-    if (isEmptyPath) {
-      PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
-      props = partDesc.getProperties();
-      outFileFormat = partDesc.getOutputFileFormatClass();
-      nonNative = partDesc.getTableDesc().isNonNative();
-      oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
-    } else {
-      TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
-      props = tableDesc.getProperties();
-      outFileFormat = tableDesc.getOutputFileFormatClass();
-      nonNative = tableDesc.isNonNative();
-    }
-
-    if (nonNative) {
-      FileInputFormat.addInputPaths(job, path);
-      LOG.info("Add a non-native table " + path);
-      return numEmptyPaths;
-    }
-
-    // create a dummy empty file in a new directory
-    String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
-    Path newPath = new Path(newDir);
-    FileSystem fs = newPath.getFileSystem(job);
-    fs.mkdirs(newPath);
-    //Qualify the path against the filesystem. The user configured path might contain default port which is skipped
-    //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
-    //filepath.
-    newPath = fs.makeQualified(newPath);
-    String newFile = newDir + File.separator + "emptyFile";
-    Path newFilePath = new Path(newFile);
-
-    LOG.info("Changed input file to " + newPath.toString());
-
-    // toggle the work
-
-    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
-
-    if (isEmptyPath) {
-      assert path != null;
-      pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
-      pathToAliases.remove(path);
-    } else {
-      assert path == null;
-      ArrayList<String> newList = new ArrayList<String>();
-      newList.add(alias);
-      pathToAliases.put(newPath.toUri().toString(), newList);
-    }
-
-    work.setPathToAliases(pathToAliases);
-
-    LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
-    if (isEmptyPath) {
-      pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
-      pathToPartitionInfo.remove(path);
-    } else {
-      PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
-      pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
-    }
-    work.setPathToPartitionInfo(pathToPartitionInfo);
-
-    String onefile = newPath.toString();
-    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
-        Text.class, false, props, null);
-    if (oneRow) {
-      // empty files are ommited at CombineHiveInputFormat.
-      // for metadata only query, it effectively makes partition columns disappear..
-      // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
-      recWriter.write(new Text("empty"));  // written via HiveIgnoreKeyTextOutputFormat
-    }
-    recWriter.close(false);
-    FileInputFormat.addInputPaths(job, onefile);
-    return numEmptyPaths;
-  }
-
-  public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
-      throws Exception {
-    int numEmptyPaths = 0;
-
-    Set<String> pathsProcessed = new HashSet<String>();
-    List<String> pathsToAdd = new LinkedList<String>();
-    // AliasToWork contains all the aliases
-    for (String oneAlias : work.getAliasToWork().keySet()) {
-      LOG.info("Processing alias " + oneAlias);
-      List<String> emptyPaths = new ArrayList<String>();
-
-      // The alias may not have any path
-      String path = null;
-      for (String onefile : work.getPathToAliases().keySet()) {
-        List<String> aliases = work.getPathToAliases().get(onefile);
-        if (aliases.contains(oneAlias)) {
-          path = onefile;
-
-          // Multiple aliases can point to the same path - it should be
-          // processed only once
-          if (pathsProcessed.contains(path)) {
-            continue;
-          }
-
-          pathsProcessed.add(path);
-
-          LOG.info("Adding input file " + path);
-          if (Utilities.isEmptyPath(job, path, ctx)) {
-            emptyPaths.add(path);
-          } else {
-            pathsToAdd.add(path);
-          }
-        }
-      }
-
-      // Create a empty file if the directory is empty
-      for (String emptyPath : emptyPaths) {
-        numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true,
-            oneAlias);
-      }
-
-      // If the query references non-existent partitions
-      // We need to add a empty file, it is not acceptable to change the
-      // operator tree
-      // Consider the query:
-      // select * from (select count(1) from T union all select count(1) from
-      // T2) x;
-      // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
-      // rows)
-      if (path == null) {
-        numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false,
-            oneAlias);
-      }
-    }
-    setInputPaths(job, pathsToAdd);
-  }
-
-  private static void setInputPaths(JobConf job, List<String> pathsToAdd) {
-    Path[] addedPaths = FileInputFormat.getInputPaths(job);
-    List<Path> toAddPathList = new ArrayList<Path>();
-    if(addedPaths != null) {
-      for(Path added: addedPaths) {
-        toAddPathList.add(added);
-      }
-    }
-    for(String toAdd: pathsToAdd) {
-      toAddPathList.add(new Path(toAdd));
-    }
-    FileInputFormat.setInputPaths(job, toAddPathList.toArray(new Path[0]));
-  }
-
   @Override
   public StageType getType() {
     return StageType.MAPRED;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Fri Aug 16 01:21:54 2013
@@ -108,7 +108,9 @@ public class MapRedTask extends ExecDriv
 
         // set the values of totalInputFileSize and totalInputNumFiles, estimating them
         // if percentage block sampling is being used
-        estimateInputSize();
+        double samplePercentage = Utilities.getHighestSamplePercentage(work.getMapWork());
+        totalInputFileSize = Utilities.getTotalInputFileSize(inputSummary, work.getMapWork(), samplePercentage);
+        totalInputNumFiles = Utilities.getTotalInputNumFiles(inputSummary, work.getMapWork(), samplePercentage);
 
         // at this point the number of reducers is precisely defined in the plan
         int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
@@ -403,7 +405,11 @@ public class MapRedTask extends ExecDriv
             .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
             + reducers);
       } else {
-        int reducers = estimateNumberOfReducers();
+        if (inputSummary == null) {
+          inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
+        }
+        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), 
+                                                          work.isFinalMapRed());
         rWork.setNumReduceTasks(reducers);
         console
             .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
@@ -424,123 +430,6 @@ public class MapRedTask extends ExecDriv
   }
 
   /**
-   * Estimate the number of reducers needed for this job, based on job input,
-   * and configuration parameters.
-   *
-   * The output of this method should only be used if the output of this
-   * MapRedTask is not being used to populate a bucketed table and the user
-   * has not specified the number of reducers to use.
-   *
-   * @return the number of reducers.
-   */
-  private int estimateNumberOfReducers() throws IOException {
-    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
-    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-
-    if(inputSummary == null) {
-      // compute the summary and stash it away
-      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
-    }
-
-    // if all inputs are sampled, we should shrink the size of reducers accordingly.
-    estimateInputSize();
-
-    if (totalInputFileSize != inputSummary.getLength()) {
-      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
-    } else {
-      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
-    }
-
-    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
-    reducers = Math.max(1, reducers);
-    reducers = Math.min(maxReducers, reducers);
-
-    // If this map reduce job writes final data to a table and bucketing is being inferred,
-    // and the user has configured Hive to do this, make sure the number of reducers is a
-    // power of two
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
-        work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
-
-      int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
-      int reducersPowerTwo = (int)Math.pow(2, reducersLog);
-
-      // If the original number of reducers was a power of two, use that
-      if (reducersPowerTwo / 2 == reducers) {
-        return reducers;
-      } else if (reducersPowerTwo > maxReducers) {
-        // If the next power of two greater than the original number of reducers is greater
-        // than the max number of reducers, use the preceding power of two, which is strictly
-        // less than the original number of reducers and hence the max
-        reducers = reducersPowerTwo / 2;
-      } else {
-        // Otherwise use the smallest power of two greater than the original number of reducers
-        reducers = reducersPowerTwo;
-      }
-    }
-
-    return reducers;
-  }
-
-  /**
-   * Sets the values of totalInputFileSize and totalInputNumFiles.  If percentage
-   * block sampling is used, these values are estimates based on the highest
-   * percentage being used for sampling multiplied by the value obtained from the
-   * input summary.  Otherwise, these values are set to the exact value obtained
-   * from the input summary.
-   *
-   * Once the function completes, inputSizeEstimated is set so that the logic is
-   * never run more than once.
-   */
-  private void estimateInputSize() {
-    if (inputSizeEstimated) {
-      // If we've already run this function, return
-      return;
-    }
-
-    MapWork mWork = work.getMapWork();
-
-    // Initialize the values to be those taken from the input summary
-    totalInputFileSize = inputSummary.getLength();
-    totalInputNumFiles = inputSummary.getFileCount();
-
-    if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
-      // If percentage block sampling wasn't used, we don't need to do any estimation
-      inputSizeEstimated = true;
-      return;
-    }
-
-    // if all inputs are sampled, we should shrink the size of the input accordingly
-    double highestSamplePercentage = 0;
-    boolean allSample = false;
-    for (String alias : mWork.getAliasToWork().keySet()) {
-      if (mWork.getNameToSplitSample().containsKey(alias)) {
-        allSample = true;
-        Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
-        if (rate != null && rate > highestSamplePercentage) {
-          highestSamplePercentage = rate;
-        }
-      } else {
-        allSample = false;
-        break;
-      }
-    }
-    if (allSample) {
-      // This is a little bit dangerous if inputs turns out not to be able to be sampled.
-      // In that case, we significantly underestimate the input.
-      // It's the same as estimateNumberOfReducers(). It's just our best
-      // guess and there is no guarantee.
-      totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
-          , totalInputFileSize);
-      totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
-          , totalInputNumFiles);
-    }
-
-    inputSizeEstimated = true;
-  }
-
-  /**
    * Find out if a job can be run in local mode based on it's characteristics
    *
    * @param conf Hive Configuration

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Fri Aug 16 01:21:54 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.Bu
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SecureCmdDoAs;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -70,13 +71,12 @@ import org.apache.hadoop.hive.shims.Shim
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
-
 /**
  * MapredLocalTask represents any local work (i.e.: client side work) that hive needs to
  * execute. E.g.: This is used for generating Hashtables for Mapjoins on the client
  * before the Join is executed on the cluster.
- * 
- * MapRedLocalTask does not actually execute the work in process, but rather generates 
+ *
+ * MapRedLocalTask does not actually execute the work in process, but rather generates
  * a command using ExecDriver. ExecDriver is what will finally drive processing the records.
  */
 public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {
@@ -174,8 +174,6 @@ public class MapredLocalTask extends Tas
         }
       }
 
-      LOG.info("Executing: " + cmdLine);
-
       // Inherit Java system variables
       String hadoopOpts;
       StringBuilder sb = new StringBuilder();
@@ -231,14 +229,29 @@ public class MapredLocalTask extends Tas
         MapRedTask.configureDebugVariablesForChildJVM(variables);
       }
 
+
+      if(ShimLoader.getHadoopShims().isSecurityEnabled() &&
+          conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) == true
+          ){
+        //If kerberos security is enabled, and HS2 doAs is enabled,
+        // then additional params need to be set so that the command is run as
+        // intended user
+        SecureCmdDoAs secureDoAs = new SecureCmdDoAs(conf);
+        cmdLine = secureDoAs.addArg(cmdLine);
+        secureDoAs.addEnv(variables);
+      }
+
       env = new String[variables.size()];
       int pos = 0;
       for (Map.Entry<String, String> entry : variables.entrySet()) {
         String name = entry.getKey();
         String value = entry.getValue();
         env[pos++] = name + "=" + value;
+        LOG.debug("Setting env: " + env[pos-1]);
       }
 
+      LOG.info("Executing: " + cmdLine);
+
       // Run ExecDriver in another JVM
       executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Fri Aug 16 01:21:54 2013
@@ -34,9 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Jo
 import org.apache.hadoop.hive.ql.exec.Stat;
 import org.apache.hadoop.hive.ql.exec.TerminalOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -699,7 +698,7 @@ public class VectorFileSinkOperator exte
     List<String> skewedCols = lbCtx.getSkewedColNames();
     List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
     List<String> skewedValsCandidate = null;
-    Map<SkewedValueList, String> locationMap = lbCtx.getLbLocationMap();
+    Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
 
     /* Convert input row to standard objects. */
     ObjectInspectorUtils.copyToStandardObject(standObjs, row,
@@ -717,14 +716,14 @@ public class VectorFileSinkOperator exte
     if (allSkewedVals.contains(skewedValsCandidate)) {
       /* matches skewed values. */
       lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
-      locationMap.put(new SkewedValueList(skewedValsCandidate), lbDirName);
+      locationMap.put(skewedValsCandidate, lbDirName);
     } else {
       /* create default directory. */
       lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
           lbCtx.getDefaultDirName());
       List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
       if (!locationMap.containsKey(defaultKey)) {
-        locationMap.put(new SkewedValueList(defaultKey), lbDirName);
+        locationMap.put(defaultKey, lbDirName);
       }
     }
     return lbDirName;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Fri Aug 16 01:21:54 2013
@@ -18,55 +18,21 @@
 
 package org.apache.hadoop.hive.ql.history;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
+
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
 
 /**
- * HiveHistory.
- *
+ * HiveHistory. Logs information such as query, query plan, runtime statistics
+ * into a file.
+ * Each session uses a new object, which creates a new file.
  */
-public class HiveHistory {
-
-  PrintWriter histStream; // History File stream
-
-  String histFileName; // History file name
-
-  private static final Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory");
-
-  private LogHelper console;
-
-  private Map<String, String> idToTableMap = null;
-
-  // Job Hash Map
-  private final HashMap<String, QueryInfo> queryInfoMap = new HashMap<String, QueryInfo>();
-
-  // Task Hash Map
-  private final HashMap<String, TaskInfo> taskInfoMap = new HashMap<String, TaskInfo>();
-
-  private static final String DELIMITER = " ";
+public interface HiveHistory {
 
   /**
    * RecordTypes.
@@ -105,20 +71,11 @@ public class HiveHistory {
     ROWS_INSERTED
   };
 
-  private static final String KEY = "(\\w+)";
-  private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
-  private static final String ROW_COUNT_PATTERN = "TABLE_ID_(\\d+)_ROWCOUNT";
-
-  private static final Pattern pattern = Pattern.compile(KEY + "=" + "\""
-      + VALUE + "\"");
-
-  private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN);
-
-  // temp buffer for parsed dataa
-  private static Map<String, String> parseBuffer = new HashMap<String, String>();
-
   /**
-   * Listner interface Parser will call handle function for each record type.
+   * Listener interface.
+   * Parser will call handle function for each history record row, specifying
+   * the record type and its values
+   *
    */
   public static interface Listener {
 
@@ -126,63 +83,6 @@ public class HiveHistory {
   }
 
   /**
-   * Parses history file and calls call back functions.
-   *
-   * @param path
-   * @param l
-   * @throws IOException
-   */
-  public static void parseHiveHistory(String path, Listener l) throws IOException {
-    FileInputStream fi = new FileInputStream(path);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(fi));
-    try {
-      String line = null;
-      StringBuilder buf = new StringBuilder();
-      while ((line = reader.readLine()) != null) {
-        buf.append(line);
-        // if it does not end with " then it is line continuation
-        if (!line.trim().endsWith("\"")) {
-          continue;
-        }
-        parseLine(buf.toString(), l);
-        buf = new StringBuilder();
-      }
-    } finally {
-      try {
-        reader.close();
-      } catch (IOException ex) {
-      }
-    }
-  }
-
-  /**
-   * Parse a single line of history.
-   *
-   * @param line
-   * @param l
-   * @throws IOException
-   */
-  private static void parseLine(String line, Listener l) throws IOException {
-    // extract the record type
-    int idx = line.indexOf(' ');
-    String recType = line.substring(0, idx);
-    String data = line.substring(idx + 1, line.length());
-
-    Matcher matcher = pattern.matcher(data);
-
-    while (matcher.find()) {
-      String tuple = matcher.group(0);
-      String[] parts = tuple.split("=");
-
-      parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1));
-    }
-
-    l.handle(RecordTypes.valueOf(recType), parseBuffer);
-
-    parseBuffer.clear();
-  }
-
-  /**
    * Info.
    *
    */
@@ -216,122 +116,25 @@ public class HiveHistory {
 
   };
 
-  /**
-   * Construct HiveHistory object an open history log file.
-   *
-   * @param ss
-   */
-  public HiveHistory(SessionState ss) {
-
-    try {
-      console = new LogHelper(LOG);
-      String conf_file_loc = ss.getConf().getVar(
-          HiveConf.ConfVars.HIVEHISTORYFILELOC);
-      if ((conf_file_loc == null) || conf_file_loc.length() == 0) {
-        console.printError("No history file location given");
-        return;
-      }
-
-      // Create directory
-      File f = new File(conf_file_loc);
-      if (!f.exists()) {
-        if (!f.mkdirs()) {
-          console.printError("Unable to create log directory " + conf_file_loc);
-          return;
-        }
-      }
-      Random randGen = new Random();
-      do {
-        histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId() + "_"
-          + Math.abs(randGen.nextInt()) + ".txt";
-      } while (new File(histFileName).exists());
-      console.printInfo("Hive history file=" + histFileName);
-      histStream = new PrintWriter(histFileName);
-
-      HashMap<String, String> hm = new HashMap<String, String>();
-      hm.put(Keys.SESSION_ID.name(), ss.getSessionId());
-      log(RecordTypes.SessionStart, hm);
-    } catch (FileNotFoundException e) {
-      console.printError("FAILED: Failed to open Query Log : " + histFileName
-          + " " + e.getMessage(), "\n"
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-
-  }
 
   /**
    * @return historyFileName
    */
-  public String getHistFileName() {
-    return histFileName;
-  }
-
-  /**
-   * Write the a history record to history file.
-   *
-   * @param rt
-   * @param keyValMap
-   */
-  void log(RecordTypes rt, Map<String, String> keyValMap) {
-
-    if (histStream == null) {
-      return;
-    }
-
-    StringBuilder sb = new StringBuilder();
-    sb.append(rt.name());
-
-    for (Map.Entry<String, String> ent : keyValMap.entrySet()) {
-
-      sb.append(DELIMITER);
-      String key = ent.getKey();
-      String val = ent.getValue();
-      if(val != null) {
-        val = val.replace("\r","").replace("\n", " ");
-      }
-      sb.append(key + "=\"" + val + "\"");
-
-    }
-    sb.append(DELIMITER);
-    sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\"");
-    histStream.println(sb);
-    histStream.flush();
-
-  }
+  public String getHistFileName();
 
   /**
-   * Called at the start of job Driver.execute().
+   * Called at the start of query execution in Driver.execute().
    */
-  public void startQuery(String cmd, String id) {
-    SessionState ss = SessionState.get();
-    if (ss == null) {
-      return;
-    }
-    QueryInfo ji = new QueryInfo();
-
-    ji.hm.put(Keys.QUERY_ID.name(), id);
-    ji.hm.put(Keys.QUERY_STRING.name(), cmd);
-
-    queryInfoMap.put(id, ji);
-
-    log(RecordTypes.QueryStart, ji.hm);
-
-  }
+  public void startQuery(String cmd, String id);
 
   /**
-   * Used to set job status and other attributes of a job.
+   * Used to set query status and other attributes of a query
    *
    * @param queryId
    * @param propName
    * @param propValue
    */
-  public void setQueryProperty(String queryId, Keys propName, String propValue) {
-    QueryInfo ji = queryInfoMap.get(queryId);
-    if (ji == null) {
-      return;
-    }
-    ji.hm.put(propName.name(), propValue);
-  }
+  public void setQueryProperty(String queryId, Keys propName, String propValue);
 
   /**
    * Used to set task properties.
@@ -341,14 +144,7 @@ public class HiveHistory {
    * @param propValue
    */
   public void setTaskProperty(String queryId, String taskId, Keys propName,
-      String propValue) {
-    String id = queryId + ":" + taskId;
-    TaskInfo ti = taskInfoMap.get(id);
-    if (ti == null) {
-      return;
-    }
-    ti.hm.put(propName.name(), propValue);
-  }
+      String propValue);
 
   /**
    * Serialize the task counters and set as a task property.
@@ -357,190 +153,62 @@ public class HiveHistory {
    * @param taskId
    * @param ctrs
    */
-  public void setTaskCounters(String queryId, String taskId, Counters ctrs) {
-    String id = queryId + ":" + taskId;
-    QueryInfo ji = queryInfoMap.get(queryId);
-    StringBuilder sb1 = new StringBuilder("");
-    TaskInfo ti = taskInfoMap.get(id);
-    if ((ti == null) || (ctrs == null)) {
-      return;
-    }
-    StringBuilder sb = new StringBuilder("");
-    try {
-
-      boolean first = true;
-      for (Group group : ctrs) {
-        for (Counter counter : group) {
-          if (first) {
-            first = false;
-          } else {
-            sb.append(',');
-          }
-          sb.append(group.getDisplayName());
-          sb.append('.');
-          sb.append(counter.getDisplayName());
-          sb.append(':');
-          sb.append(counter.getCounter());
-          String tab = getRowCountTableName(counter.getDisplayName());
-          if (tab != null) {
-            if (sb1.length() > 0) {
-              sb1.append(",");
-            }
-            sb1.append(tab);
-            sb1.append('~');
-            sb1.append(counter.getCounter());
-            ji.rowCountMap.put(tab, counter.getCounter());
-
-          }
-        }
-      }
-
-    } catch (Exception e) {
-      LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
-    }
-    if (sb1.length() > 0) {
-      taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString());
-      queryInfoMap.get(queryId).hm.put(Keys.ROWS_INSERTED.name(), sb1
-          .toString());
-    }
-    if (sb.length() > 0) {
-      taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString());
-    }
-  }
+  public void setTaskCounters(String queryId, String taskId, Counters ctrs);
 
-  public void printRowCount(String queryId) {
-    QueryInfo ji = queryInfoMap.get(queryId);
-    if (ji == null) {
-      return;
-    }
-    for (String tab : ji.rowCountMap.keySet()) {
-      console.printInfo(ji.rowCountMap.get(tab) + " Rows loaded to " + tab);
-    }
-  }
+  public void printRowCount(String queryId);
 
   /**
-   * Called at the end of Job. A Job is sql query.
+   * Called at the end of a query
    *
    * @param queryId
    */
-  public void endQuery(String queryId) {
-    QueryInfo ji = queryInfoMap.get(queryId);
-    if (ji == null) {
-      return;
-    }
-    log(RecordTypes.QueryEnd, ji.hm);
-    queryInfoMap.remove(queryId);
-  }
+  public void endQuery(String queryId);
 
   /**
-   * Called at the start of a task. Called by Driver.run() A Job can have
+   * Called at the start of a task. Called by Driver.run() A query can have
    * multiple tasks. Tasks will have multiple operator.
    *
    * @param task
    */
   public void startTask(String queryId, Task<? extends Serializable> task,
-      String taskName) {
-    SessionState ss = SessionState.get();
-    if (ss == null) {
-      return;
-    }
-    TaskInfo ti = new TaskInfo();
-
-    ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId());
-    ti.hm.put(Keys.TASK_ID.name(), task.getId());
-    ti.hm.put(Keys.TASK_NAME.name(), taskName);
-
-    String id = queryId + ":" + task.getId();
-    taskInfoMap.put(id, ti);
-
-    log(RecordTypes.TaskStart, ti.hm);
-
-  }
+      String taskName);
 
   /**
    * Called at the end of a task.
    *
    * @param task
    */
-  public void endTask(String queryId, Task<? extends Serializable> task) {
-    String id = queryId + ":" + task.getId();
-    TaskInfo ti = taskInfoMap.get(id);
-
-    if (ti == null) {
-      return;
-    }
-    log(RecordTypes.TaskEnd, ti.hm);
-    taskInfoMap.remove(id);
-  }
+  public void endTask(String queryId, Task<? extends Serializable> task);
 
   /**
-   * Called at the end of a task.
+   * Logs progress of a task if ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS is
+   * set to true
    *
    * @param task
    */
-  public void progressTask(String queryId, Task<? extends Serializable> task) {
-    String id = queryId + ":" + task.getId();
-    TaskInfo ti = taskInfoMap.get(id);
-    if (ti == null) {
-      return;
-    }
-    log(RecordTypes.TaskProgress, ti.hm);
+  public void progressTask(String queryId, Task<? extends Serializable> task);
 
-  }
 
   /**
-   * write out counters.
+   * Logs the current plan state
+   * @param plan
+   * @throws IOException
    */
-  static ThreadLocal<Map<String,String>> ctrMapFactory =
-      new ThreadLocal<Map<String, String>>() {
-        @Override
-        protected Map<String,String> initialValue() {
-          return new HashMap<String,String>();
-        }
-      };
-
-  public void logPlanProgress(QueryPlan plan) throws IOException {
-    Map<String,String> ctrmap = ctrMapFactory.get();
-    ctrmap.put("plan", plan.toString());
-    log(RecordTypes.Counters, ctrmap);
-  }
+  public void logPlanProgress(QueryPlan plan) throws IOException;
+
 
   /**
-   * Set the table to id map.
+   * Set the id to table name map
    *
    * @param map
    */
-  public void setIdToTableMap(Map<String, String> map) {
-    idToTableMap = map;
-  }
+  public void setIdToTableMap(Map<String, String> map);
 
   /**
-   * Returns table name for the counter name.
-   *
-   * @param name
-   * @return tableName
+   * Close the log file stream
    */
-  String getRowCountTableName(String name) {
-    if (idToTableMap == null) {
-      return null;
-    }
-    Matcher m = rowCountPattern.matcher(name);
-
-    if (m.find()) {
-      String tuple = m.group(1);
-      return idToTableMap.get(tuple);
-    }
-    return null;
+  public void closeStream();
 
-  }
 
-  public void closeStream() {
-    IOUtils.cleanup(LOG, histStream);
-  }
 
-  @Override
-  public void finalize() throws Throwable {
-    closeStream();
-    super.finalize();
-  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java Fri Aug 16 01:21:54 2013
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Listener;
 import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
@@ -35,8 +37,8 @@ import org.apache.hadoop.hive.ql.history
 public class HiveHistoryViewer implements Listener {
 
   String historyFile;
-
   String sessionId;
+  private static final Log LOG = LogFactory.getLog(HiveHistoryViewer.class);
 
   // Job Hash Map
   private final HashMap<String, QueryInfo> jobInfoMap = new HashMap<String, QueryInfo>();
@@ -65,19 +67,18 @@ public class HiveHistoryViewer implement
    * Parse history files.
    */
   void init() {
-
     try {
-      HiveHistory.parseHiveHistory(historyFile, this);
+      HiveHistoryUtil.parseHiveHistory(historyFile, this);
     } catch (IOException e) {
-      // TODO Auto-generated catch block
+      // TODO pass on this exception
       e.printStackTrace();
+      LOG.error("Error parsing hive history log file", e);
     }
-
   }
 
   /**
-   * Implementation Listner interface function.
-   * 
+   * Implementation Listener interface function.
+   *
    * @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes,
    *      java.util.Map)
    */

Copied: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (from r1513659, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java)
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?p2=hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java&p1=hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java&r1=1513659&r2=1514554&rev=1514554&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Fri Aug 16 01:21:54 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * Interface for reading integers.
@@ -52,4 +53,12 @@ interface IntegerReader {
    * @throws IOException
    */
   long next() throws IOException;
+
+  /**
+   * Return the next available vector for values.
+   * @return
+   * @throws IOException
+   */
+   void nextVector(LongColumnVector previous, long previousLen)
+      throws IOException;
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Aug 16 01:21:54 2013
@@ -97,7 +97,7 @@ public final class OrcFile {
                                     CompressionKind compress,
                                     int bufferSize,
                                     int rowIndexStride) throws IOException {
-    return new WriterImpl(fs, path, inspector, stripeSize, compress,
+    return new WriterImpl(fs, path, conf, inspector, stripeSize, compress,
       bufferSize, rowIndexStride, getMemoryManager(conf));
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java Fri Aug 16 01:21:54 2013
@@ -76,11 +76,7 @@ class OutStream extends PositionedOutput
   }
 
   public void clear() throws IOException {
-    uncompressedBytes = 0;
-    compressedBytes = 0;
-    compressed = null;
-    overflow = null;
-    current = null;
+    flush();
     suppress = false;
   }
 
@@ -246,7 +242,10 @@ class OutStream extends PositionedOutput
       receiver.output(compressed);
       compressed = null;
     }
-    clear();
+    uncompressedBytes = 0;
+    compressedBytes = 0;
+    overflow = null;
+    current = null;
   }
 
   @Override