You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/12 19:23:14 UTC

svn commit: r1541190 [4/15] - in /hive/branches/tez: ./ ant/src/org/apache/hadoop/hive/ant/ beeline/ beeline/src/java/org/apache/hive/beeline/ cli/ cli/src/java/org/apache/hadoop/hive/cli/ common/ common/src/java/org/apache/hadoop/hive/common/ common/s...

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov 12 18:23:05 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -46,6 +47,7 @@ import org.apache.hadoop.io.BinaryCompar
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
@@ -54,6 +56,7 @@ public class ReduceSinkOperator extends 
     implements Serializable, TopNHash.BinaryCollector {
 
   private static final long serialVersionUID = 1L;
+  protected transient OutputCollector out;
 
   /**
    * The evaluators for the key columns. Key columns decide the sort order on
@@ -91,6 +94,10 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
+  public void setOutputCollector(OutputCollector _out) {
+    this.out = _out;
+  }
+
   // picks topN K:V pairs from input.
   protected transient TopNHash reducerHash = new TopNHash();
   @Override
@@ -153,8 +160,8 @@ public class ReduceSinkOperator extends 
   transient InspectableObject tempInspectableObject = new InspectableObject();
   protected transient HiveKey keyWritable = new HiveKey();
 
-  transient StructObjectInspector keyObjectInspector;
-  transient StructObjectInspector valueObjectInspector;
+  protected transient ObjectInspector keyObjectInspector;
+  protected transient ObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
   protected transient Object[] cachedValues;
@@ -173,6 +180,7 @@ public class ReduceSinkOperator extends 
    * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
    * see {@link ExprNodeColumnEvaluator}
    */
+  // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
   protected transient Object[][] cachedKeys;
   boolean firstRow;
   protected transient Random random;
@@ -237,51 +245,41 @@ public class ReduceSinkOperator extends 
             .getOutputValueColumnNames(), rowInspector);
         partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
         int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
-        int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
-          numDistributionKeys;
+        int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys;
         cachedKeys = new Object[numKeys][keyLen];
         cachedValues = new Object[valueEval.length];
       }
 
-      // Evaluate the keys
-      for (int i = 0; i < numDistributionKeys; i++) {
-        cachedKeys[0][i] = keyEval[i].evaluate(row);
-      }
+      // Determine distKeyLength (w/o distincts), and then add the first if present.
+      populateCachedDistributionKeys(row, 0);
+      HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+      int distKeyLength = firstKey.getDistKeyLength();
       if (numDistinctExprs > 0) {
-        // with distinct key(s)
-        for (int i = 0; i < numDistinctExprs; i++) {
-          if (i > 0) {
-            System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
-          }
-          StandardUnion union = (StandardUnion) cachedKeys[i][numDistributionKeys];
-          if (union == null) {
-            cachedKeys[i][numDistributionKeys] =
-              union = new StandardUnion((byte)i, new Object[distinctColIndices.get(i).size()]);
-          }
-          Object[] distinctParameters = (Object[]) union.getObject();
-          for (int j = 0; j < distinctParameters.length; j++) {
-            distinctParameters[j] =
-              keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
-          }
-          union.setTag((byte) i);
-        }
+        populateCachedDistinctKeys(row, 0);
+        firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
       }
 
-      for (int i = 0; i < cachedKeys.length; i++) {
-        // Serialize the keys and append the tag
-        Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
-        setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
-        int topNIndex = reducerHash.tryStoreKey(keyWritable);
-        if (TopNHash.EXCLUDED == topNIndex) continue;
-        int keyHashCode = computeHashCode(row);
-        BytesWritable valueWritable = getValue(row);
-        if (TopNHash.FORWARD == topNIndex) {
-          keyWritable.setHashCode(keyHashCode);
-          collect(keyWritable, valueWritable);
-          continue;
-        }
-        assert topNIndex >= 0;
-        reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
+      // Try to store the first key. If it's not excluded, we will proceed.
+      int firstIndex = reducerHash.tryStoreKey(firstKey);
+      if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
+      // Compute value and hashcode - we'd either store or forward them.
+      BytesWritable value = makeValueWritable(row);
+      int hashCode = computeHashCode(row);
+      if (firstIndex == TopNHash.FORWARD) {
+        firstKey.setHashCode(hashCode);
+        collect(firstKey, value);
+      } else {
+        assert firstIndex >= 0;
+        reducerHash.storeValue(firstIndex, value, hashCode, false);
+      }
+
+      // All other distinct keys will just be forwarded. This could be optimized...
+      for (int i = 1; i < numDistinctExprs; i++) {
+        System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
+        populateCachedDistinctKeys(row, i);
+        HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+        hiveKey.setHashCode(hashCode);
+        collect(hiveKey, value);
       }
     } catch (HiveException e) {
       throw e;
@@ -290,14 +288,38 @@ public class ReduceSinkOperator extends 
     }
   }
 
+  private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
+    for (int i = 0; i < numDistributionKeys; i++) {
+      cachedKeys[index][i] = keyEval[i].evaluate(row);
+    }
+    if (cachedKeys[0].length > numDistributionKeys) {
+      cachedKeys[index][numDistributionKeys] = null;
+    }
+  }
+
+  /**
+   * Populate distinct keys part of cachedKeys for a particular row.
+   * @param row the row
+   * @param index the cachedKeys index to write to
+   */
+  private void populateCachedDistinctKeys(Object row, int index) throws HiveException {
+    StandardUnion union;
+    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+          (byte)index, new Object[distinctColIndices.get(index).size()]);
+    Object[] distinctParameters = (Object[]) union.getObject();
+    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+      distinctParameters[distinctParamI] =
+          keyEval[distinctColIndices.get(index).get(distinctParamI)].evaluate(row);
+    }
+    union.setTag((byte) index);
+  }
+
   private int computeHashCode(Object row) throws HiveException {
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide
-      // better
-      // load balance. If the requirement is to have a single reducer, we
-      // should set
+      // If no partition cols, just distribute the data uniformly to provide better
+      // load balance. If the requirement is to have a single reducer, we should set
       // the number of reducers to 1.
       // Use a constant seed to make the code deterministic.
       if (random == null) {
@@ -314,15 +336,19 @@ public class ReduceSinkOperator extends 
     return keyHashCode;
   }
 
-  protected void setKeyWritable(BinaryComparable key, int tag) {
+  // Serialize the keys and append the tag
+  protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException {
+    BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector);
+    int keyLength = key.getLength();
     if (tag == -1) {
-      keyWritable.set(key.getBytes(), 0, key.getLength());
+      keyWritable.set(key.getBytes(), 0, keyLength);
     } else {
-      int keyLength = key.getLength();
       keyWritable.setSize(keyLength + 1);
       System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
       keyWritable.get()[keyLength] = tagByte[0];
     }
+    keyWritable.setDistKeyLength((distLength == null) ? keyLength : distLength);
+    return keyWritable;
   }
 
   public void collect(byte[] key, byte[] value, int hash) throws IOException {
@@ -331,11 +357,6 @@ public class ReduceSinkOperator extends 
     collect(keyWritable, valueWritable);
   }
 
-  protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
-    HiveKey keyWritable = new HiveKey(key, hash);
-    collect(keyWritable, valueWritable);
-  }
-
   protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called
@@ -351,7 +372,7 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  private BytesWritable getValue(Object row) throws Exception {
+  private BytesWritable makeValueWritable(Object row) throws Exception {
     // Evaluate the value
     for (int i = 0; i < valueEval.length; i++) {
       cachedValues[i] = valueEval[i].evaluate(row);
@@ -366,6 +387,7 @@ public class ReduceSinkOperator extends 
       reducerHash.flush();
     }
     super.closeOp(abort);
+    out = null;
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov 12 18:23:05 2013
@@ -66,9 +66,6 @@ public class StatsTask extends Task<Stat
   private Table table;
   private List<LinkedHashMap<String, String>> dpPartSpecs;
 
-  private static final List<String> collectableStats = StatsSetupConst.getStatsToBeCollected();
-  private static final List<String> supportedStats = StatsSetupConst.getSupportedStats();
-
   public StatsTask() {
     super();
     dpPartSpecs = null;
@@ -84,7 +81,7 @@ public class StatsTask extends Task<Stat
 
     public Statistics() {
       stats = new HashMap<String, LongWritable>();
-      for (String statType : supportedStats) {
+      for (String statType : StatsSetupConst.supportedStats) {
         stats.put(statType, new LongWritable(0L));
       }
     }
@@ -108,7 +105,7 @@ public class StatsTask extends Task<Stat
 
     @Override
     public String toString() {
-      return org.apache.commons.lang.StringUtils.join(supportedStats, ", ");
+      return org.apache.commons.lang.StringUtils.join(StatsSetupConst.supportedStats, ", ");
     }
   }
 
@@ -207,7 +204,7 @@ public class StatsTask extends Task<Stat
 
       boolean tableStatsExist = this.existStats(parameters);
 
-      for (String statType : supportedStats) {
+      for (String statType : StatsSetupConst.supportedStats) {
         if (parameters.containsKey(statType)) {
           tblStats.setStat(statType, Long.parseLong(parameters.get(statType)));
         }
@@ -226,14 +223,14 @@ public class StatsTask extends Task<Stat
         // In case of a non-partitioned table, the key for stats temporary store is "rootDir"
         if (statsAggregator != null) {
           String aggKey = Utilities.getHashedStatsPrefix(work.getAggKey(), maxPrefixLength);
-          updateStats(collectableStats, tblStats, statsAggregator, parameters,
+          updateStats(StatsSetupConst.statsRequireCompute, tblStats, statsAggregator, parameters,
               aggKey, atomic);
           statsAggregator.cleanUp(aggKey);
         }
         // The collectable stats for the aggregator needs to be cleared.
         // For eg. if a file is being loaded, the old number of rows are not valid
         else if (work.isClearAggregatorStats()) {
-          for (String statType : collectableStats) {
+          for (String statType : StatsSetupConst.statsRequireCompute) {
             if (parameters.containsKey(statType)) {
               tblStats.setStat(statType, 0L);
             }
@@ -242,9 +239,10 @@ public class StatsTask extends Task<Stat
 
         // write table stats to metastore
         parameters = tTable.getParameters();
-        for (String statType : collectableStats) {
+        for (String statType : StatsSetupConst.statsRequireCompute) {
           parameters.put(statType, Long.toString(tblStats.getStat(statType)));
         }
+        parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
         tTable.setParameters(parameters);
 
         String tableFullName = table.getDbName() + "." + table.getTableName();
@@ -269,7 +267,7 @@ public class StatsTask extends Task<Stat
           }
 
           Map<String, Long> currentValues = new HashMap<String, Long>();
-          for (String statType : supportedStats) {
+          for (String statType : StatsSetupConst.supportedStats) {
             Long val = parameters.containsKey(statType) ? Long.parseLong(parameters.get(statType))
                 : 0L;
             currentValues.put(statType, val);
@@ -288,11 +286,11 @@ public class StatsTask extends Task<Stat
           LOG.info("Stats aggregator : " + partitionID);
 
           if (statsAggregator != null) {
-            updateStats(collectableStats, newPartStats, statsAggregator,
+            updateStats(StatsSetupConst.statsRequireCompute, newPartStats, statsAggregator,
                 parameters, partitionID, atomic);
             statsAggregator.cleanUp(partitionID);
           } else {
-            for (String statType : collectableStats) {
+            for (String statType : StatsSetupConst.statsRequireCompute) {
               // The collectable stats for the aggregator needs to be cleared.
               // For eg. if a file is being loaded, the old number of rows are not valid
               if (work.isClearAggregatorStats()) {
@@ -320,13 +318,14 @@ public class StatsTask extends Task<Stat
           //
           // update the metastore
           //
-          for (String statType : supportedStats) {
+          for (String statType : StatsSetupConst.supportedStats) {
             long statValue = newPartStats.getStat(statType);
             if (statValue >= 0) {
               parameters.put(statType, Long.toString(newPartStats.getStat(statType)));
             }
           }
 
+          parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
           tPart.setParameters(parameters);
           String tableFullName = table.getDbName() + "." + table.getTableName();
           db.alterPartition(tableFullName, new Partition(table, tPart));
@@ -364,7 +363,7 @@ public class StatsTask extends Task<Stat
         || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
   }
 
-  private void updateStats(List<String> statsList, Statistics stats,
+  private void updateStats(String[] statsList, Statistics stats,
       StatsAggregator statsAggregator, Map<String, String> parameters,
       String aggKey, boolean atomic) throws HiveException {
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Tue Nov 12 18:23:05 2013
@@ -93,6 +93,8 @@ public abstract class Task<T extends Ser
 
   // Bean methods
 
+  protected boolean rootTask;
+
   protected List<Task<? extends Serializable>> childTasks;
   protected List<Task<? extends Serializable>> parentTasks;
   /**
@@ -172,6 +174,14 @@ public abstract class Task<T extends Ser
     return false;
   }
 
+  public boolean isRootTask() {
+    return rootTask;
+  }
+
+  public void setRootTask(boolean rootTask) {
+    this.rootTask = rootTask;
+  }
+
   public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
     this.childTasks = childTasks;
   }
@@ -506,4 +516,8 @@ public abstract class Task<T extends Ser
   void setException(Throwable ex) {
     exception = ex;
   }
+
+  public String toString() {
+    return getId() + ":" + getType();
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Tue Nov 12 18:23:05 2013
@@ -31,6 +31,8 @@ import com.google.common.collect.MinMaxP
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
@@ -51,11 +53,9 @@ public class TopNHash {
     public void collect(byte[] key, byte[] value, int hash) throws IOException;
   }
 
-  public static final int FORWARD = -1;
-  public static final int EXCLUDED = -2;
-  private static final int FLUSH = -3;
-  private static final int DISABLE = -4;
-  private static final int MAY_FORWARD = -5;
+  public static final int FORWARD = -1; // Forward the row to reducer as is.
+  public static final int EXCLUDE = -2; // Discard the row.
+  private static final int MAY_FORWARD = -3; // Vectorized - may forward the row, not sure yet.
 
   private BinaryCollector collector;
   private int topN;
@@ -67,14 +67,17 @@ public class TopNHash {
   private byte[][] keys;
   private byte[][] values;
   private int[] hashes;
+  private int[] distKeyLengths;
   private IndexStore indexes; // The heap over the keys, storing indexes in the array.
 
   private int evicted; // recently evicted index (used for next key/value)
   private int excluded; // count of excluded rows from previous flush
 
-  // temporary stuff used for vectorization
+  // temporary single-batch context used for vectorization
   private int batchNumForwards = 0; // whether current batch has any forwarded keys
   private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
+  private int[] batchIndexToResult; // mapping of index in the batch (linear) to hash result
+  private int batchSize; // Size of the current batch.
 
   private boolean isEnabled = false;
 
@@ -82,7 +85,9 @@ public class TopNHash {
     public int compare(Integer o1, Integer o2) {
       byte[] key1 = keys[o1];
       byte[] key2 = keys[o2];
-      return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length);
+      int length1 = distKeyLengths[o1];
+      int length2 = distKeyLengths[o2];
+      return WritableComparator.compareBytes(key1, 0, length1, key2, 0, length2);
     }
   };
 
@@ -107,6 +112,7 @@ public class TopNHash {
     this.keys = new byte[topN + 1][];
     this.values = new byte[topN + 1][];
     this.hashes = new int[topN + 1];
+    this.distKeyLengths = new int[topN + 1];
     this.evicted = topN;
     this.isEnabled = true;
   }
@@ -118,12 +124,12 @@ public class TopNHash {
    *         TopNHash.EXCLUDED if the row should be discarded;
    *         any other number if the row is to be stored; the index should be passed to storeValue.
    */
-  public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+  public int tryStoreKey(HiveKey key) throws HiveException, IOException {
     if (!isEnabled) {
       return FORWARD; // short-circuit quickly - forward all rows
     }
     if (topN == 0) {
-      return EXCLUDED; // short-circuit quickly - eat all rows
+      return EXCLUDE; // short-circuit quickly - eat all rows
     }
     int index = insertKeyIntoHeap(key);
     if (index >= 0) {
@@ -132,21 +138,8 @@ public class TopNHash {
     }
     // IndexStore is trying to tell us something.
     switch (index) {
-      case DISABLE: {
-        LOG.info("Top-N hash is disabled");
-        flushInternal();
-        isEnabled = false;
-        return FORWARD;
-      }
-      case FLUSH: {
-        LOG.info("Top-N hash is flushed");
-        flushInternal();
-        // we can now retry adding key/value into hash, which is flushed.
-        // but for simplicity, just forward them
-        return FORWARD;
-      }
       case FORWARD:  return FORWARD;
-      case EXCLUDED: return EXCLUDED; // skip the row.
+      case EXCLUDE: return EXCLUDE; // skip the row.
       default: {
         assert false;
         throw new HiveException("Invalid result trying to store the key: " + index);
@@ -157,15 +150,16 @@ public class TopNHash {
 
   /**
    * Perform basic checks and initialize TopNHash for the new vectorized row batch.
+   * @param size batch size
    * @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
    *         TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
    *         any other result means the batch has been started.
    */
-  public int startVectorizedBatch() throws IOException, HiveException {
+  public int startVectorizedBatch(int size) throws IOException, HiveException {
     if (!isEnabled) {
       return FORWARD; // short-circuit quickly - forward all rows
     } else if (topN == 0) {
-      return EXCLUDED; // short-circuit quickly - eat all rows
+      return EXCLUDE; // short-circuit quickly - eat all rows
     }
     // Flush here if the memory usage is too high. After that, we have the entire
     // batch already in memory anyway so we will bypass the memory checks.
@@ -179,8 +173,13 @@ public class TopNHash {
         return FORWARD; // Hash is ineffective, disable.
       }
     }
+    // Started ok; initialize context for new batch.
+    batchSize = size;
+    if (batchIndexToResult == null || batchIndexToResult.length < batchSize) {
+      batchIndexToResult = new int[Math.max(batchSize, VectorizedRowBatch.DEFAULT_SIZE)];
+    }
     if (indexToBatchIndex == null) {
-      indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+      indexToBatchIndex = new int[topN + 1];
     }
     Arrays.fill(indexToBatchIndex, -1);
     batchNumForwards = 0;
@@ -191,33 +190,28 @@ public class TopNHash {
    * Try to put the key from the current vectorized batch into the heap.
    * @param key the key.
    * @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
-   * @param results The results; the number of elements equivalent to vrg.size, by kindex.
-   *   The result should be the same across the calls for the batch; in then end, for each k-index:
-   *     - TopNHash.EXCLUDED - discard the row.
-   *     - positive index - store the row using storeValue, same as tryStoreRow.
-   *     - negative index - forward the row. getVectorizedKeyToForward called w/this index will
-   *        return the key to use so it doesn't have to be rebuilt.
    */
-  public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
-          throws HiveException, IOException {
+  public void tryStoreVectorizedKey(HiveKey key, int batchIndex)
+      throws HiveException, IOException {
     // Assumption - batchIndex is increasing; startVectorizedBatch was called
     int size = indexes.size();
     int index = size < topN ? size : evicted;
     keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    distKeyLengths[index] = key.getDistKeyLength();
     Integer collisionIndex = indexes.store(index);
     if (null != collisionIndex) {
       // forward conditional on the survival of the corresponding key currently in indexes.
       ++batchNumForwards;
-      results[batchIndex] = MAY_FORWARD - collisionIndex;
+      batchIndexToResult[batchIndex] = MAY_FORWARD - collisionIndex;
       return;
     }
     indexToBatchIndex[index] = batchIndex;
-    results[batchIndex] = index;
+    batchIndexToResult[batchIndex] = index;
     if (size != topN) return;
     evicted = indexes.removeBiggest();  // remove the biggest key
     if (index == evicted) {
       excluded++;
-      results[batchIndex] = EXCLUDED;
+      batchIndexToResult[batchIndex] = EXCLUDE;
       indexToBatchIndex[index] = -1;
       return; // input key is bigger than any of keys in hash
     }
@@ -225,36 +219,54 @@ public class TopNHash {
     int evictedBatchIndex = indexToBatchIndex[evicted];
     if (evictedBatchIndex >= 0) {
       // reset the result for the evicted index
-      results[evictedBatchIndex] = EXCLUDED;
+      batchIndexToResult[evictedBatchIndex] = EXCLUDE;
       indexToBatchIndex[evicted] = -1;
     }
-    // Also evict all results grouped with this index; cannot be current key or before it.
-    if (batchNumForwards > 0) {
-      int evictedForward = (MAY_FORWARD - evicted);
-      boolean forwardRemoved = false;
-      for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
-        if (results[i] == evictedForward) {
-          results[i] = EXCLUDED;
-          forwardRemoved = true;
-        }
-      }
-      if (forwardRemoved) {
+    // Evict all results grouped with this index; it cannot be any key further in the batch.
+    // If we evict a key from this batch, the keys grouped with it cannot be earlier that that key.
+    // If we evict a key that is not from this batch, initial i = (-1) + 1 = 0, as intended.
+    int evictedForward = (MAY_FORWARD - evicted);
+    for (int i = evictedBatchIndex + 1; i < batchIndex && (batchNumForwards > 0); ++i) {
+      if (batchIndexToResult[i] == evictedForward) {
+        batchIndexToResult[i] = EXCLUDE;
         --batchNumForwards;
       }
     }
   }
 
   /**
+   * Get vectorized batch result for particular index.
+   * @param batchIndex index of the key in the batch.
+   * @return the result, same as from {@link #tryStoreKey(HiveKey)}
+   */
+  public int getVectorizedBatchResult(int batchIndex) {
+    int result = batchIndexToResult[batchIndex];
+    return (result <= MAY_FORWARD) ? FORWARD : result;
+  }
+
+  /**
    * After vectorized batch is processed, can return the key that caused a particular row
    * to be forwarded. Because the row could only be marked to forward because it has
    * the same key with some row already in the heap (for GBY), we can use that key from the
    * heap to emit the forwarded row.
-   * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
-   * @return The key corresponding to the row.
+   * @param batchIndex index of the key in the batch.
+   * @return The key corresponding to the index.
    */
-  public byte[] getVectorizedKeyToForward(int index) {
-    assert index <= MAY_FORWARD;
-    return keys[MAY_FORWARD - index];
+  public HiveKey getVectorizedKeyToForward(int batchIndex) {
+    int index = MAY_FORWARD - batchIndexToResult[batchIndex];
+    HiveKey hk = new HiveKey();
+    hk.set(keys[index], 0, keys[index].length);
+    hk.setDistKeyLength(distKeyLengths[index]);
+    return hk;
+  }
+
+  /**
+   * After vectorized batch is processed, can return distribution keys length of a key.
+   * @param batchIndex index of the key in the batch.
+   * @return The distribution length corresponding to the key.
+   */
+  public int getVectorizedKeyDistLength(int batchIndex) {
+    return distKeyLengths[batchIndexToResult[batchIndex]];
   }
 
   /**
@@ -289,16 +301,22 @@ public class TopNHash {
    * <p/>
    * -1 for FORWARD   : should be forwarded to output collector (for GBY)
    * -2 for EXCLUDED  : not in top-k. ignore it
-   * -3 for FLUSH     : memory is not enough. flush values (keep keys only)
-   * -4 for DISABLE   : hash is not effective. flush and disable it
    */
-  private int insertKeyIntoHeap(BinaryComparable key) {
+  private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException {
     if (usage > threshold) {
-      return excluded == 0 ? DISABLE : FLUSH;
+      flushInternal();
+      if (excluded == 0) {
+        LOG.info("Top-N hash is disabled");
+        isEnabled = false;
+      }
+      // we can now retry adding key/value into hash, which is flushed.
+      // but for simplicity, just forward them
+      return FORWARD;
     }
     int size = indexes.size();
     int index = size < topN ? size : evicted;
     keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    distKeyLengths[index] = key.getDistKeyLength();
     if (null != indexes.store(index)) {
       // it's only for GBY which should forward all values associated with the key in the range
       // of limit. new value should be attatched with the key but in current implementation,
@@ -310,7 +328,7 @@ public class TopNHash {
       evicted = indexes.removeBiggest();  // remove the biggest key
       if (index == evicted) {
         excluded++;
-        return EXCLUDED;          // input key is bigger than any of keys in hash
+        return EXCLUDE;          // input key is bigger than any of keys in hash
       }
       removed(evicted);
     }
@@ -326,6 +344,7 @@ public class TopNHash {
       values[index] = null;
     }
     hashes[index] = -1;
+    distKeyLengths[index] = -1;
   }
 
   private void flushInternal() throws IOException, HiveException {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Tue Nov 12 18:23:05 2013
@@ -44,7 +44,7 @@ public class UDTFOperator extends Operat
 
   protected final Log LOG = LogFactory.getLog(this.getClass().getName());
 
-  ObjectInspector[] udtfInputOIs = null;
+  StructObjectInspector udtfInputOI = null;
   Object[] objToSendToUDTF = null;
 
   GenericUDTF genericUDTF;
@@ -63,22 +63,16 @@ public class UDTFOperator extends Operat
 
     genericUDTF.setCollector(collector);
 
-    // Make an object inspector [] of the arguments to the UDTF
-    List<? extends StructField> inputFields =
-        ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs();
-
-    udtfInputOIs = new ObjectInspector[inputFields.size()];
-    for (int i = 0; i < inputFields.size(); i++) {
-      udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
-    }
-    objToSendToUDTF = new Object[inputFields.size()];
+    udtfInputOI = (StructObjectInspector) inputObjInspectors[0];
+
+    objToSendToUDTF = new Object[udtfInputOI.getAllStructFieldRefs().size()];
 
     MapredContext context = MapredContext.get();
     if (context != null) {
       context.setup(genericUDTF);
     }
-    StructObjectInspector udtfOutputOI = genericUDTF.initialize(
-        udtfInputOIs);
+    StructObjectInspector udtfOutputOI = genericUDTF.initialize(udtfInputOI);
+
     if (conf.isOuterLV()) {
       outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 12 18:23:05 2013
@@ -100,6 +100,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -126,6 +127,9 @@ import org.apache.hadoop.hive.ql.io.rcfi
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -2031,7 +2035,7 @@ public final class Utilities {
     }
   }
 
-  public static Object INPUT_SUMMARY_LOCK = new Object();
+  private static final Object INPUT_SUMMARY_LOCK = new Object();
 
   /**
    * Calculate the total size of input files.
@@ -2114,26 +2118,47 @@ public final class Utilities {
           // is not correct.
           final Configuration myConf = conf;
           final JobConf myJobConf = jobConf;
+          final Map<String, Operator<?>> aliasToWork = work.getAliasToWork();
+          final Map<String, ArrayList<String>> pathToAlias = work.getPathToAliases();
           final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
               p.toString());
           Runnable r = new Runnable() {
             public void run() {
               try {
-                ContentSummary resultCs;
-
                 Class<? extends InputFormat> inputFormatCls = partDesc
                     .getInputFileFormatClass();
                 InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
                     inputFormatCls, myJobConf);
                 if (inputFormatObj instanceof ContentSummaryInputFormat) {
-                  resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
-                      myJobConf);
-                } else {
+                  ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj;
+                  resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
+                  return;
+                }
+                HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf,
+                    partDesc.getOverlayedProperties().getProperty(
+                    hive_metastoreConstants.META_TABLE_STORAGE));
+                if (handler == null) {
+                  // native table
                   FileSystem fs = p.getFileSystem(myConf);
-                  resultCs = fs.getContentSummary(p);
+                  resultMap.put(pathStr, fs.getContentSummary(p));
+                  return;
+                }
+                if (handler instanceof InputEstimator) {
+                  long total = 0;
+                  TableDesc tableDesc = partDesc.getTableDesc();
+                  InputEstimator estimator = (InputEstimator) handler;
+                  for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) {
+                    JobConf jobConf = new JobConf(myJobConf);
+                    TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias);
+                    Utilities.setColumnNameList(jobConf, scanOp, true);
+                    Utilities.setColumnTypeList(jobConf, scanOp, true);
+                    PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
+                    Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
+                    total += estimator.estimate(myJobConf, scanOp, -1).getTotalLength();
+                  }
+                  resultMap.put(pathStr, new ContentSummary(total, -1, -1));
                 }
-                resultMap.put(pathStr, resultCs);
-              } catch (IOException e) {
+              } catch (Exception e) {
                 // We safely ignore this exception for summary data.
                 // We don't update the cache to protect it from polluting other
                 // usages. The worst case is that IOException will always be
@@ -2340,12 +2365,19 @@ public final class Utilities {
   }
 
   public static void setColumnNameList(JobConf jobConf, Operator op) {
+    setColumnNameList(jobConf, op, false);
+  }
+
+  public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) {
     RowSchema rowSchema = op.getSchema();
     if (rowSchema == null) {
       return;
     }
     StringBuilder columnNames = new StringBuilder();
     for (ColumnInfo colInfo : rowSchema.getSignature()) {
+      if (excludeVCs && colInfo.getIsVirtualCol()) {
+        continue;
+      }
       if (columnNames.length() > 0) {
         columnNames.append(",");
       }
@@ -2356,12 +2388,19 @@ public final class Utilities {
   }
 
   public static void setColumnTypeList(JobConf jobConf, Operator op) {
+    setColumnTypeList(jobConf, op, false);
+  }
+
+  public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) {
     RowSchema rowSchema = op.getSchema();
     if (rowSchema == null) {
       return;
     }
     StringBuilder columnTypes = new StringBuilder();
     for (ColumnInfo colInfo : rowSchema.getSignature()) {
+      if (excludeVCs && colInfo.getIsVirtualCol()) {
+        continue;
+      }
       if (columnTypes.length() > 0) {
         columnTypes.append(",");
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov 12 18:23:05 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
 import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
 import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.PartitionKeySampler;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -81,6 +82,7 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.log4j.Appender;
@@ -128,7 +130,6 @@ public class ExecDriver extends Task<Map
   private void initializeFiles(String prop, String files) {
     if (files != null && files.length() > 0) {
       job.set(prop, files);
-      ShimLoader.getHadoopShims().setTmpFiles(prop, files);
     }
   }
 
@@ -543,7 +544,7 @@ public class ExecDriver extends Task<Map
       FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts);
       try {
         ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()});
-        ts.setOutputCollector(sampler);
+        OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler);
         while (fetcher.pushRow()) { }
       } finally {
         fetcher.clearFetchContext();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Nov 12 18:23:05 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -158,7 +159,7 @@ public class ExecMapper extends MapReduc
     if (oc == null) {
       oc = output;
       rp = reporter;
-      mo.setOutputCollector(oc);
+      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
       mo.setReporter(rp);
       MapredContext.get().setReporter(reporter);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Nov 12 18:23:05 2013
@@ -188,7 +188,6 @@ public class ExecReducer extends MapRedu
       // propagate reporter and output collector to all operators
       oc = output;
       rp = reporter;
-      reducer.setOutputCollector(oc);
       reducer.setReporter(rp);
       MapredContext.get().setReporter(reporter);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 12 18:23:05 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapred.TaskReport;
@@ -240,7 +241,7 @@ public class HadoopJobExecHelper {
       } catch (InterruptedException e) {
       }
 
-      if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) {
+      if (initializing && rj.getJobState() == JobStatus.PREP) {
         // No reason to poll untill the job is initialized
         continue;
       } else {
@@ -590,12 +591,6 @@ public class HadoopJobExecHelper {
     List<Integer> reducersRunTimes = new ArrayList<Integer>();
 
     for (TaskCompletionEvent taskCompletion : taskCompletions) {
-      String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion);
-      if (taskJobIds == null) {
-        // Task attempt info is unavailable in this Hadoop version");
-        continue;
-      }
-      String taskId = taskJobIds[0];
       if (!taskCompletion.isMapTask()) {
         reducersRunTimes.add(new Integer(taskCompletion.getTaskRunTime()));
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java Tue Nov 12 18:23:05 2013
@@ -157,20 +157,10 @@ public class JobDebugger implements Runn
         boolean more = true;
         boolean firstError = true;
         for (TaskCompletionEvent t : taskCompletions) {
-          // getTaskJobIDs returns Strings for compatibility with Hadoop versions
-          // without TaskID or TaskAttemptID
-          String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
-
-          if (taskJobIds == null) {
-            console.printError("Task attempt info is unavailable in this Hadoop version");
-            more = false;
-            break;
-          }
-
           // For each task completion event, get the associated task id, job id
           // and the logs
-          String taskId = taskJobIds[0];
-          String jobId = taskJobIds[1];
+          String taskId = t.getTaskAttemptId().getTaskID().toString();
+          String jobId = t.getTaskAttemptId().getJobID().toString();
           if (firstError) {
             console.printError("Examining task ID: " + taskId + " (and more) from job " + jobId);
             firstError = false;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Tue Nov 12 18:23:05 2013
@@ -178,12 +178,7 @@ public class MapRedTask extends ExecDriv
       String isSilent = "true".equalsIgnoreCase(System
           .getProperty("test.silent")) ? "-nolog" : "";
 
-      String jarCmd;
-      if (ShimLoader.getHadoopShims().usesJobShell()) {
-        jarCmd = libJarsOption + hiveJar + " " + ExecDriver.class.getName();
-      } else {
-        jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
-      }
+      String jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
 
       String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
           + planPath.toString() + " " + isSilent + " " + hiveConfArgs;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Nov 12 18:23:05 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
@@ -123,7 +124,7 @@ public class MapRecordProcessor  extends
         }
       }
 
-      mapOp.setOutputCollector(out);
+      OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), out);
       mapOp.setReporter(reporter);
       MapredContext.get().setReporter(reporter);
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Nov 12 18:23:05 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
@@ -154,6 +156,17 @@ public class ReduceRecordProcessor  exte
         }
       }
 
+      // set output collector for any reduce sink operators in the pipeline.
+      List<Operator<? extends OperatorDesc>> children = new LinkedList<Operator<? extends OperatorDesc>>();
+      children.add(reducer);
+      if (dummyOps != null) {
+        children.addAll(dummyOps);
+      }
+      OperatorUtils.setChildrenCollector(children, out);
+      
+      reducer.setReporter(reporter);
+      MapredContext.get().setReporter(reporter);
+
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
@@ -164,10 +177,6 @@ public class ReduceRecordProcessor  exte
       }
     }
 
-    reducer.setOutputCollector(out);
-    reducer.setReporter(reporter);
-    MapredContext.get().setReporter(reporter);
-
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Tue Nov 12 18:23:05 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.To
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -39,9 +40,11 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+// import org.apache.hadoop.util.StringUtils;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
@@ -87,12 +90,6 @@ public class VectorReduceSinkOperator ex
    */
   private transient VectorExpressionWriter[] partitionWriters;
 
-  private transient ObjectInspector keyObjectInspector;
-  private transient ObjectInspector valueObjectInspector;
-  private transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
-
-  private transient int[] hashResult; // the pre-created array for reducerHash results
-
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
     this();
@@ -110,7 +107,6 @@ public class VectorReduceSinkOperator ex
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
-
       numDistributionKeys = conf.getNumDistributionKeys();
       distinctColIndices = conf.getDistinctColumnIndices();
       numDistinctExprs = distinctColIndices.size();
@@ -183,7 +179,7 @@ public class VectorReduceSinkOperator ex
         numDistributionKeys;
       cachedKeys = new Object[numKeys][keyLen];
       cachedValues = new Object[valueEval.length];
-      
+
       int tag = conf.getTag();
       tagByte[0] = (byte) tag;
       LOG.info("Using tag = " + tag);
@@ -209,81 +205,84 @@ public class VectorReduceSinkOperator ex
         partitionEval.length));
 
     try {
-
-      for (int i = 0; i < partitionEval.length; i++) {
-        partitionEval[i].evaluate(vrg);
-      }
-
-      // run the vector evaluations
-      for (int i = 0; i < valueEval.length; i++) {
-         valueEval[i].evaluate(vrg);
-      }
       // Evaluate the keys
       for (int i = 0; i < keyEval.length; i++) {
         keyEval[i].evaluate(vrg);
       }
 
-      Object[] distributionKeys = new Object[numDistributionKeys];
-
       // Determine which rows we need to emit based on topN optimization
-      int startResult = reducerHash.startVectorizedBatch();
-      if (startResult == TopNHash.EXCLUDED) {
+      int startResult = reducerHash.startVectorizedBatch(vrg.size);
+      if (startResult == TopNHash.EXCLUDE) {
         return; // TopN wants us to exclude all rows.
       }
-      boolean useTopN = startResult != TopNHash.FORWARD;
-      if (useTopN && (hashResult == null || hashResult.length < vrg.size)) {
-        hashResult = new int[Math.max(vrg.size, VectorizedRowBatch.DEFAULT_SIZE)];
+      // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
+      for (int i = 0; i < partitionEval.length; i++) {
+        partitionEval[i].evaluate(vrg);
+      }
+      // run the vector evaluations
+      for (int i = 0; i < valueEval.length; i++) {
+         valueEval[i].evaluate(vrg);
       }
 
-      for (int j = 0 ; j < vrg.size; ++j) {
-        int rowIndex = j;
+      boolean useTopN = startResult != TopNHash.FORWARD;
+      // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
+      // If we are using topN, we will make the first key for each row and store/forward it.
+      // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
+      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+        int rowIndex = batchIndex;
         if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[j];
+          rowIndex = vrg.selected[batchIndex];
         }
-        // First, evaluate the key - the way things stand we'd need it regardless.
-        for (int i = 0; i < keyEval.length; i++) {
-          int batchColumn = keyEval[i].getOutputColumn();
-          ColumnVector vectorColumn = vrg.cols[batchColumn];
-          distributionKeys[i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+        // First, make distrib key components for this row and determine distKeyLength.
+        populatedCachedDistributionKeys(vrg, rowIndex, 0);
+        HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+        int distKeyLength = firstKey.getDistKeyLength();
+        // Add first distinct expression, if any.
+        if (numDistinctExprs > 0) {
+          populateCachedDistinctKeys(vrg, rowIndex, 0);
+          firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
         }
-        // no distinct key
-        System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
-        // TopN is not supported for multi-distinct currently. If we have more cachedKeys
-        // than one for every input key horrible things will happen (OOB error on array likely).
-        assert !useTopN || cachedKeys.length <= 1;
-        for (int i = 0; i < cachedKeys.length; i++) {
-          // Serialize the keys and append the tag.
-          Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
-          setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
-          if (useTopN) {
-            reducerHash.tryStoreVectorizedKey(keyWritable, j, hashResult);
-          } else {
-            // No TopN, just forward the key
-            keyWritable.setHashCode(computeHashCode(vrg, rowIndex));
-            collect(keyWritable, makeValueWritable(vrg, rowIndex));
-           }
+
+        if (useTopN) {
+          reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
+        } else {
+        // No TopN, just forward the first key and all others.
+          int hashCode = computeHashCode(vrg, rowIndex);
+          firstKey.setHashCode(hashCode);
+          BytesWritable value = makeValueWritable(vrg, rowIndex);
+          collect(firstKey, value);
+          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
         }
       }
 
       if (!useTopN) return; // All done.
 
       // If we use topN, we have called tryStore on every key now. We can process the results.
-      for (int j = 0 ; j < vrg.size; ++j) {
-        int index = hashResult[j];
-        if (index == TopNHash.EXCLUDED) continue;
-        int rowIndex = j;
+      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+        int result = reducerHash.getVectorizedBatchResult(batchIndex);
+        if (result == TopNHash.EXCLUDE) continue;
+        int rowIndex = batchIndex;
         if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[j];
+          rowIndex = vrg.selected[batchIndex];
         }
-        // Compute everything now - we'd either store it, or forward it.
+        // Compute value and hashcode - we'd either store or forward them.
         int hashCode = computeHashCode(vrg, rowIndex);
         BytesWritable value = makeValueWritable(vrg, rowIndex);
-        if (index < 0) {
-          // Kinda hacky; see getVectorizedKeyToForward javadoc.
-          byte[] key = reducerHash.getVectorizedKeyToForward(index);
-          collect(key, value, hashCode);
+        int distKeyLength = -1;
+        if (result == TopNHash.FORWARD) {
+          HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
+          firstKey.setHashCode(hashCode);
+          distKeyLength = firstKey.getDistKeyLength();
+          collect(firstKey, value);
         } else {
-          reducerHash.storeValue(index, value, hashCode, true);
+          reducerHash.storeValue(result, value, hashCode, true);
+          distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
+        }
+        // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
+        // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
+        if (numDistinctExprs > 1) {
+          populatedCachedDistributionKeys(vrg, rowIndex, 1);
+          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
         }
       }
     } catch (SerDeException e) {
@@ -293,6 +292,74 @@ public class VectorReduceSinkOperator ex
     }
   }
 
+  /**
+   * This function creates and forwards all the additional KVs for the multi-distinct case,
+   * after the first (0th) KV pertaining to the row has already been stored or forwarded.
+   * @param vrg the batch
+   * @param rowIndex the row index in the batch
+   * @param hashCode the partitioning hash code to use; same as for the first KV
+   * @param value the value to use; same as for the first KV
+   * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
+   * @param tag the tag
+   * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
+   */
+  private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
+      BytesWritable value, int distKeyLength, int tag, int baseIndex)
+          throws HiveException, SerDeException, IOException {
+    // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
+    //       the first key has already been stored. There's few bytes difference between keys
+    //       for different distincts, and the value/etc. are all the same.
+    //       We could store deltas to re-gen extra rows when flushing TopN.
+    for (int i = 1; i < numDistinctExprs; i++) {
+      if (i != baseIndex) {
+        System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
+      }
+      populateCachedDistinctKeys(vrg, rowIndex, i);
+      HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+      hiveKey.setHashCode(hashCode);
+      collect(hiveKey, value);
+    }
+  }
+
+  /**
+   * Populate distribution keys part of cachedKeys for a particular row from the batch.
+   * @param vrg the batch
+   * @param rowIndex the row index in the batch
+   * @param index the cachedKeys index to write to
+   */
+  private void populatedCachedDistributionKeys(
+      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+    for (int i = 0; i < numDistributionKeys; i++) {
+      int batchColumn = keyEval[i].getOutputColumn();
+      ColumnVector vectorColumn = vrg.cols[batchColumn];
+      cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+    }
+    if (cachedKeys[index].length > numDistributionKeys) {
+      cachedKeys[index][numDistributionKeys] = null;
+    }
+  }
+
+  /**
+   * Populate distinct keys part of cachedKeys for a particular row from the batch.
+   * @param vrg the batch
+   * @param rowIndex the row index in the batch
+   * @param index the cachedKeys index to write to
+   */
+  private void populateCachedDistinctKeys(
+      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+    StandardUnion union;
+    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+        (byte)index, new Object[distinctColIndices.get(index).size()]);
+    Object[] distinctParameters = (Object[]) union.getObject();
+    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+      int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
+      int batchColumn = keyEval[distinctColIndex].getOutputColumn();
+      distinctParameters[distinctParamI] =
+          keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
+    }
+    union.setTag((byte) index);
+  }
+
   private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
       throws HiveException, SerDeException {
     for (int i = 0; i < valueEval.length; i++) {
@@ -308,10 +375,8 @@ public class VectorReduceSinkOperator ex
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide
-      // better
-      // load balance. If the requirement is to have a single reducer, we
-      // should set
+      // If no partition cols, just distribute the data uniformly to provide better
+      // load balance. If the requirement is to have a single reducer, we should set
       // the number of reducers to 1.
       // Use a constant seed to make the code deterministic.
       if (random == null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Nov 12 18:23:05 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ex
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
 import org.apache.hadoop.hive.ql.udf.UDFConv;
 import org.apache.hadoop.hive.ql.udf.UDFHex;
 import org.apache.hadoop.hive.ql.udf.UDFOPNegative;
@@ -556,7 +558,9 @@ public class VectorizationContext {
       List<ExprNodeDesc> childExpr, Mode mode) throws HiveException {
     //First handle special cases
     if (udf instanceof GenericUDFBetween) {
-      return getBetweenFilterExpression(childExpr);
+      return getBetweenFilterExpression(childExpr, mode);
+    } else if (udf instanceof GenericUDFIn) {
+      return getInFilterExpression(childExpr);
     } else if (udf instanceof GenericUDFBridge) {
       VectorExpression v = getGenericUDFBridgeVectorExpression((GenericUDFBridge) udf, childExpr, mode);
       if (v != null) {
@@ -579,6 +583,87 @@ public class VectorizationContext {
   }
 
   /**
+   * Create a filter expression for column IN ( <list-of-constants> )
+   * @param childExpr
+   * @return
+   */
+  private VectorExpression getInFilterExpression(List<ExprNodeDesc> childExpr)
+      throws HiveException {
+    ExprNodeDesc colExpr = childExpr.get(0);
+    String colType = colExpr.getTypeString();
+
+    // prepare arguments for createVectorExpression
+    List<ExprNodeDesc> childrenForInList =
+        foldConstantsForUnaryExprs(childExpr.subList(1, childExpr.size()));
+
+    /* This method assumes that the IN list has no NULL entries. That is enforced elsewhere,
+     * in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined.
+     * If in the future, NULL values are allowed in the IN list, be sure to handle 3-valued
+     * logic correctly. E.g. NOT (col IN (null)) should be considered UNKNOWN, so that would
+     * become FALSE in the WHERE clause, and cause the row in question to be filtered out.
+     * See the discussion in Jira HIVE-5583.
+     */
+
+    VectorExpression expr = null;
+
+    // determine class
+    Class<?> cl = null;
+    if (isIntFamily(colType)) {
+      cl = FilterLongColumnInList.class;
+      long[] inVals = new long[childrenForInList.size()];
+      for (int i = 0; i != inVals.length; i++) {
+        inVals[i] = getIntFamilyScalarAsLong((ExprNodeConstantDesc) childrenForInList.get(i));
+      }
+      FilterLongColumnInList f = (FilterLongColumnInList)
+          createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+      f.setInListValues(inVals);
+      expr = f;
+    } else if (colType.equals("timestamp")) {
+      cl = FilterLongColumnInList.class;
+      long[] inVals = new long[childrenForInList.size()];
+      for (int i = 0; i != inVals.length; i++) {
+        inVals[i] = getTimestampScalar(childrenForInList.get(i));
+      }
+      FilterLongColumnInList f = (FilterLongColumnInList)
+          createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+      f.setInListValues(inVals);
+      expr = f;
+    } else if (colType.equals("string")) {
+      cl = FilterStringColumnInList.class;
+      byte[][] inVals = new byte[childrenForInList.size()][];
+      for (int i = 0; i != inVals.length; i++) {
+        inVals[i] = getStringScalarAsByteArray((ExprNodeConstantDesc) childrenForInList.get(i));
+      }
+      FilterStringColumnInList f =(FilterStringColumnInList)
+          createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+      f.setInListValues(inVals);
+      expr = f;
+    } else if (isFloatFamily(colType)) {
+      cl = FilterDoubleColumnInList.class;
+      double[] inValsD = new double[childrenForInList.size()];
+      for (int i = 0; i != inValsD.length; i++) {
+        inValsD[i] = getNumericScalarAsDouble(childrenForInList.get(i));
+      }
+      FilterDoubleColumnInList f = (FilterDoubleColumnInList)
+          createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+      f.setInListValues(inValsD);
+      expr = f;
+    } else {
+      throw new HiveException("Type " + colType + " not supported for IN in vectorized mode");
+    }
+    return expr;
+  }
+
+  private byte[] getStringScalarAsByteArray(ExprNodeConstantDesc exprNodeConstantDesc)
+      throws HiveException {
+    Object o = getScalarValue(exprNodeConstantDesc);
+    if (!(o instanceof byte[])) {
+      throw new HiveException("Expected constant argument of type string");
+    }
+    return (byte[]) o;
+  }
+
+  /**
    * Invoke special handling for expressions that can't be vectorized by regular
    * descriptor based lookup.
    */
@@ -673,9 +758,16 @@ public class VectorizationContext {
    * needs to be done differently than the standard way where all arguments are
    * passed to the VectorExpression constructor.
    */
-  private VectorExpression getBetweenFilterExpression(List<ExprNodeDesc> childExpr)
+  private VectorExpression getBetweenFilterExpression(List<ExprNodeDesc> childExpr, Mode mode)
       throws HiveException {
 
+    if (mode == Mode.PROJECTION) {
+
+      // Projection mode is not yet supported for [NOT] BETWEEN. Return null so Vectorizer
+      // knows to revert to row-at-a-time execution.
+      return null;
+    }
+
     boolean notKeywordPresent = (Boolean) ((ExprNodeConstantDesc) childExpr.get(0)).getValue();
     ExprNodeDesc colExpr = childExpr.get(1);
 
@@ -850,8 +942,38 @@ public class VectorizationContext {
     }
   }
 
-  // Get a timestamp as a long in number of nanos, from a string constant.
+  private long getIntFamilyScalarAsLong(ExprNodeConstantDesc constDesc)
+      throws HiveException {
+    Object o = getScalarValue(constDesc);
+    if (o instanceof Integer) {
+      return (Integer) o;
+    } else if (o instanceof Long) {
+      return (Long) o;
+    }
+    throw new HiveException("Unexpected type when converting to long");
+  }
+
+  private double getNumericScalarAsDouble(ExprNodeDesc constDesc)
+      throws HiveException {
+    Object o = getScalarValue((ExprNodeConstantDesc) constDesc);
+    if (o instanceof Double) {
+      return (Double) o;
+    } else if (o instanceof Float) {
+      return (Float) o;
+    } else if (o instanceof Integer) {
+      return (Integer) o;
+    } else if (o instanceof Long) {
+      return (Long) o;
+    }
+    throw new HiveException("Unexpected type when converting to double");
+  }
+
+  // Get a timestamp as a long in number of nanos, from a string constant or cast
   private long getTimestampScalar(ExprNodeDesc expr) throws HiveException {
+    if (expr instanceof ExprNodeGenericFuncDesc &&
+        ((ExprNodeGenericFuncDesc) expr).getGenericUDF() instanceof GenericUDFTimestamp) {
+      return evaluateCastToTimestamp(expr);
+    }
     if (!(expr instanceof ExprNodeConstantDesc)) {
       throw new HiveException("Constant timestamp value expected for expression argument. " +
           "Non-constant argument not supported for vectorization.");
@@ -868,25 +990,26 @@ public class VectorizationContext {
       expr2.setChildren(children);
 
       // initialize and evaluate
-      ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr2);
-      ObjectInspector output = evaluator.initialize(null);
-      Object constant = evaluator.evaluate(null);
-      Object java = ObjectInspectorUtils.copyToStandardJavaObject(constant, output);
-
-      if (!(java instanceof Timestamp)) {
-        throw new HiveException("Udf: failed to convert from string to timestamp");
-      }
-      Timestamp ts = (Timestamp) java;
-      long result = ts.getTime();
-      result *= 1000000;    // shift left 6 digits to make room for nanos below ms precision
-      result += ts.getNanos() % 1000000;     // add in nanos, after removing the ms portion
-      return result;
+      return evaluateCastToTimestamp(expr2);
     }
 
     throw new HiveException("Udf: unhandled constant type for scalar argument. "
         + "Expecting string.");
   }
 
+  private long evaluateCastToTimestamp(ExprNodeDesc expr) throws HiveException {
+    ExprNodeGenericFuncDesc expr2 = (ExprNodeGenericFuncDesc) expr;
+    ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr2);
+    ObjectInspector output = evaluator.initialize(null);
+    Object constant = evaluator.evaluate(null);
+    Object java = ObjectInspectorUtils.copyToStandardJavaObject(constant, output);
+
+    if (!(java instanceof Timestamp)) {
+      throw new HiveException("Udf: failed to convert to timestamp");
+    }
+    Timestamp ts = (Timestamp) java;
+    return TimestampUtils.getTimeNanoSec(ts);
+  }
 
   private Constructor<?> getConstructor(Class<?> cl) throws HiveException {
     try {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Nov 12 18:23:05 2013
@@ -341,30 +341,6 @@ public class HiveInputFormat<K extends W
     return result.toArray(new HiveInputSplit[result.size()]);
   }
 
-  public void validateInput(JobConf job) throws IOException {
-
-    init(job);
-
-    Path[] dirs = FileInputFormat.getInputPaths(job);
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-    JobConf newjob = new JobConf(job);
-
-    // for each dir, get the InputFormat, and do validateInput.
-    for (Path dir : dirs) {
-      PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
-      // create a new InputFormat instance if this is the first time to see this
-      // class
-      InputFormat inputFormat = getInputFormatFromCache(part
-          .getInputFileFormatClass(), job);
-
-      FileInputFormat.setInputPaths(newjob, dir);
-      newjob.setInputFormat(inputFormat.getClass());
-      ShimLoader.getHadoopShims().inputFormatValidateInput(inputFormat, newjob);
-    }
-  }
-
   protected static PartitionDesc getPartitionDescFromPath(
       Map<String, PartitionDesc> pathToPartitionInfo, Path dir)
       throws IOException {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Tue Nov 12 18:23:05 2013
@@ -29,7 +29,10 @@ public class HiveKey extends BytesWritab
 
   private static final int LENGTH_BYTES = 4;
 
-  boolean hashCodeValid;
+  private int hashCode;
+  private boolean hashCodeValid;
+
+  private transient int distKeyLength;
 
   public HiveKey() {
     hashCodeValid = false;
@@ -37,15 +40,13 @@ public class HiveKey extends BytesWritab
 
   public HiveKey(byte[] bytes, int hashcode) {
     super(bytes);
-    myHashCode = hashcode;
+    hashCode = hashcode;
     hashCodeValid = true;
   }
 
-  protected int myHashCode;
-
   public void setHashCode(int myHashCode) {
     hashCodeValid = true;
-    this.myHashCode = myHashCode;
+    hashCode = myHashCode;
   }
 
   @Override
@@ -54,7 +55,15 @@ public class HiveKey extends BytesWritab
       throw new RuntimeException("Cannot get hashCode() from deserialized "
           + HiveKey.class);
     }
-    return myHashCode;
+    return hashCode;
+  }
+
+  public void setDistKeyLength(int distKeyLength) {
+    this.distKeyLength = distKeyLength;
+  }
+
+  public int getDistKeyLength() {
+    return distKeyLength;
   }
 
   /** A Comparator optimized for HiveKey. */

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Nov 12 18:23:05 2013
@@ -41,8 +41,17 @@ public final class FileDump {
         System.out.println("Compression size: " + reader.getCompressionSize());
       }
       System.out.println("Type: " + reader.getObjectInspector().getTypeName());
+      System.out.println("\nStripe Statistics:");
+      Metadata metadata = reader.getMetadata();
+      for (int n = 0; n < metadata.getStripeStatistics().size(); n++) {
+        System.out.println("  Stripe " + (n + 1) + ":");
+        StripeStatistics ss = metadata.getStripeStatistics().get(n);
+        for (int i = 0; i < ss.getColumnStatistics().length; ++i) {
+          System.out.println("    Column " + i + ": " + ss.getColumnStatistics()[i].toString());
+        }
+      }
       ColumnStatistics[] stats = reader.getStatistics();
-      System.out.println("\nStatistics:");
+      System.out.println("\nFile Statistics:");
       for(int i=0; i < stats.length; ++i) {
         System.out.println("  Column " + i + ": " + stats[i].toString());
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Nov 12 18:23:05 2013
@@ -104,7 +104,8 @@ public final class OrcFile {
   public static final String ENABLE_INDEXES = "orc.create.index";
   public static final String BLOCK_PADDING = "orc.block.padding";
 
-  static final long DEFAULT_STRIPE_SIZE = 256 * 1024 * 1024;
+  static final long DEFAULT_STRIPE_SIZE =
+      HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal;
   static final CompressionKind DEFAULT_COMPRESSION_KIND =
     CompressionKind.ZLIB;
   static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
@@ -138,7 +139,7 @@ public final class OrcFile {
     private final Configuration configuration;
     private FileSystem fileSystemValue = null;
     private ObjectInspector inspectorValue = null;
-    private long stripeSizeValue = DEFAULT_STRIPE_SIZE;
+    private long stripeSizeValue;
     private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
     private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
     private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
@@ -149,6 +150,9 @@ public final class OrcFile {
     WriterOptions(Configuration conf) {
       configuration = conf;
       memoryManagerValue = getMemoryManager(conf);
+      stripeSizeValue =
+          conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
+             DEFAULT_STRIPE_SIZE);
       String versionName =
         conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
       if (versionName == null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Tue Nov 12 18:23:05 2013
@@ -47,10 +47,12 @@ public class OrcSplit extends FileSplit 
       // serialize FileMetaInfo fields
       Text.writeString(out, fileMetaInfo.compressionType);
       WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
+      WritableUtils.writeVInt(out, fileMetaInfo.metadataSize);
 
       // serialize FileMetaInfo field footer
       ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
       footerBuff.reset();
+
       // write length of buffer
       WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
       out.write(footerBuff.array(), footerBuff.position(),
@@ -69,13 +71,14 @@ public class OrcSplit extends FileSplit 
       // deserialize FileMetaInfo fields
       String compressionType = Text.readString(in);
       int bufferSize = WritableUtils.readVInt(in);
+      int metadataSize = WritableUtils.readVInt(in);
 
       // deserialize FileMetaInfo field footer
       int footerBuffSize = WritableUtils.readVInt(in);
       ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
       in.readFully(footerBuff.array(), 0, footerBuffSize);
 
-      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff);
+      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff);
     }
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Tue Nov 12 18:23:05 2013
@@ -108,6 +108,13 @@ public interface Reader {
   ColumnStatistics[] getStatistics();
 
   /**
+   * Get the metadata information like stripe level column statistics etc.
+   * @return the information about the column
+   * @throws IOException
+   */
+  Metadata getMetadata() throws IOException;
+
+  /**
    * Get the list of types contained in the file. The root type is the first
    * type in the list.
    * @return the list of flattened types
@@ -122,10 +129,12 @@ public interface Reader {
   class FileMetaInfo{
     final String compressionType;
     final int bufferSize;
+    final int metadataSize;
     final ByteBuffer footerBuffer;
-    FileMetaInfo(String compressionType, int bufferSize, ByteBuffer footerBuffer){
+    FileMetaInfo(String compressionType, int bufferSize, int metadataSize, ByteBuffer footerBuffer){
       this.compressionType = compressionType;
       this.bufferSize = bufferSize;
+      this.metadataSize = metadataSize;
       this.footerBuffer = footerBuffer;
     }
   }