You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC

svn commit: r1629544 [8/33] - in /hive/branches/spark-new: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ co...

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sun Oct  5 22:26:43 2014
@@ -18,99 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.PTFTopNHash;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TopNHash;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
 
-  private static final Log LOG = LogFactory.getLog(
-      VectorReduceSinkOperator.class.getName());
-
   private static final long serialVersionUID = 1L;
 
-  /**
-   * The evaluators for the key columns. Key columns decide the sort order on
-   * the reducer side. Key columns are passed to the reducer in the "key".
-   */
-  private VectorExpression[] keyEval;
-
-  /**
-   * The key value writers. These know how to write the necessary writable type
-   * based on key column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] keyWriters;
-
-  /**
-   * The evaluators for the value columns. Value columns are passed to reducer
-   * in the "value".
-   */
-  private VectorExpression[] valueEval;
-
-  /**
-   * The output value writers. These know how to write the necessary writable type
-   * based on value column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] valueWriters;
-
-  /**
-   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
-   * Hive language). Partition columns decide the reducer that the current row
-   * goes to. Partition columns are not passed to reducer.
-   */
-  private VectorExpression[] partitionEval;
-
-  /**
-  * Evaluators for bucketing columns. This is used to compute bucket number.
-  */
-  private VectorExpression[] bucketEval;
-  private int buckColIdxInKey;
-
-  /**
-   * The partition value writers. These know how to write the necessary writable type
-   * based on partition column metadata, from the primitive vector type.
-   */
-  private transient VectorExpressionWriter[] partitionWriters;
-  private transient VectorExpressionWriter[] bucketWriters = null;
-
-  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+  // Writer for producing row from input batch.
+  private VectorExpressionWriter[] rowWriters;
+  
+  protected transient Object[] singleRow;
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
     this();
     ReduceSinkDesc desc = (ReduceSinkDesc) conf;
     this.conf = desc;
-    keyEval = vContext.getVectorExpressions(desc.getKeyCols());
-    valueEval = vContext.getVectorExpressions(desc.getValueCols());
-    partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
-    bucketEval = null;
-    if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
-      bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
-      buckColIdxInKey = desc.getPartitionCols().size();
-    }
   }
 
   public VectorReduceSinkOperator() {
@@ -119,399 +50,49 @@ public class VectorReduceSinkOperator ex
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    try {
-      numDistributionKeys = conf.getNumDistributionKeys();
-      distinctColIndices = conf.getDistinctColumnIndices();
-      numDistinctExprs = distinctColIndices.size();
-
-      TableDesc keyTableDesc = conf.getKeySerializeInfo();
-      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
-          .newInstance();
-      keySerializer.initialize(null, keyTableDesc.getProperties());
-      keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
-      /*
-       * Compute and assign the key writers and the key object inspector
-       */
-      VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getKeyCols(),
-          conf.getOutputKeyColumnNames(),
-          new VectorExpressionWriterFactory.SingleOIDClosure() {
-            @Override
-            public void assign(VectorExpressionWriter[] writers,
-              ObjectInspector objectInspector) {
-              keyWriters = writers;
-              keyObjectInspector = objectInspector;
-            }
-          });
-
-      String colNames = "";
-      for(String colName : conf.getOutputKeyColumnNames()) {
-        colNames = String.format("%s %s", colNames, colName);
-      }
-
-      if (isDebugEnabled) {
-        LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
-          keyObjectInspector.getClass(),
-          keyObjectInspector,
-          colNames));
-      }
-
-      partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
-      if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
-        bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
-      }
-
-      TableDesc valueTableDesc = conf.getValueSerializeInfo();
-      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
-          .newInstance();
-      valueSerializer.initialize(null, valueTableDesc.getProperties());
-
-      /*
-       * Compute and assign the value writers and the value object inspector
-       */
-      VectorExpressionWriterFactory.processVectorExpressions(
-          conf.getValueCols(),
-          conf.getOutputValueColumnNames(),
-          new VectorExpressionWriterFactory.SingleOIDClosure() {
-            @Override
-            public void assign(VectorExpressionWriter[] writers,
-                ObjectInspector objectInspector) {
-                valueWriters = writers;
-                valueObjectInspector = objectInspector;
+    // We need a input object inspector that is for the row we will extract out of the
+    // vectorized row batch, not for example, an original inspector for an ORC table, etc.
+    VectorExpressionWriterFactory.processVectorInspector(
+            (StructObjectInspector) inputObjInspectors[0],
+            new VectorExpressionWriterFactory.SingleOIDClosure() {
+              @Override
+              public void assign(VectorExpressionWriter[] writers,
+                  ObjectInspector objectInspector) {
+                rowWriters = writers;
+                inputObjInspectors[0] = objectInspector;
               }
-          });
-
-      if (isDebugEnabled) {
-        colNames = "";
-        for(String colName : conf.getOutputValueColumnNames()) {
-          colNames = String.format("%s %s", colNames, colName);
-        }
-      }
-
-      if (isDebugEnabled) {
-        LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
-            valueObjectInspector.getClass(),
-            valueObjectInspector,
-            colNames));
-      }
+            });
+    singleRow = new Object[rowWriters.length];
 
-      int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
-      int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
-        numDistributionKeys;
-      cachedKeys = new Object[numKeys][keyLen];
-      cachedValues = new Object[valueEval.length];
-
-      int tag = conf.getTag();
-      tagByte[0] = (byte) tag;
-      LOG.info("Using tag = " + tag);
-
-      int limit = conf.getTopN();
-      float memUsage = conf.getTopNMemoryUsage();
-      if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
-        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
-      }
-
-      autoParallel = conf.isAutoParallel();
-
-    } catch(Exception e) {
-      throw new HiveException(e);
-    }
+    // Call ReduceSinkOperator with new input inspector.
+    super.initializeOp(hconf);
   }
 
   @Override
-  public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
-
-    if (isDebugEnabled) {
-      LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
-          vrg.size,
-          valueEval.length,
-          keyEval.length,
-          partitionEval.length));
-    }
-
-    try {
-      // Evaluate the keys
-      for (int i = 0; i < keyEval.length; i++) {
-        keyEval[i].evaluate(vrg);
-      }
-
-      // Determine which rows we need to emit based on topN optimization
-      int startResult = reducerHash.startVectorizedBatch(vrg.size);
-      if (startResult == TopNHash.EXCLUDE) {
-        return; // TopN wants us to exclude all rows.
-      }
-      // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
-      for (int i = 0; i < partitionEval.length; i++) {
-        partitionEval[i].evaluate(vrg);
-      }
-      if (bucketEval != null) {
-        for (int i = 0; i < bucketEval.length; i++) {
-          bucketEval[i].evaluate(vrg);
-        }
-      }
-      // run the vector evaluations
-      for (int i = 0; i < valueEval.length; i++) {
-         valueEval[i].evaluate(vrg);
-      }
-
-      boolean useTopN = startResult != TopNHash.FORWARD;
-      // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
-      // If we are using topN, we will make the first key for each row and store/forward it.
-      // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
-      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
-        int rowIndex = batchIndex;
-        if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[batchIndex];
-        }
-        // First, make distrib key components for this row and determine distKeyLength.
-        populatedCachedDistributionKeys(vrg, rowIndex, 0);
-
-        // replace bucketing columns with hashcode % numBuckets
-        int buckNum = -1;
-        if (bucketEval != null) {
-          buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
-          cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
-        }
-        HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
-        int distKeyLength = firstKey.getDistKeyLength();
-        // Add first distinct expression, if any.
-        if (numDistinctExprs > 0) {
-          populateCachedDistinctKeys(vrg, rowIndex, 0);
-          firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
-        }
-
-        final int hashCode;
-
-        // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
-        if (autoParallel && partitionEval.length > 0) {
-          hashCode = computeMurmurHash(firstKey);
-        } else {
-          hashCode = computeHashCode(vrg, rowIndex, buckNum);
-        }
-
-        firstKey.setHashCode(hashCode);
-
-        if (useTopN) {
-          /*
-           * in case of TopN for windowing, we need to distinguish between 
-           * rows with null partition keys and rows with value 0 for partition keys.
-           */
-          boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex);
-          reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex);
-        } else {
-          // No TopN, just forward the first key and all others.
-          BytesWritable value = makeValueWritable(vrg, rowIndex);
-          collect(firstKey, value);
-          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
-        }
-      }
-
-      if (!useTopN) return; // All done.
-
-      // If we use topN, we have called tryStore on every key now. We can process the results.
-      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
-        int result = reducerHash.getVectorizedBatchResult(batchIndex);
-        if (result == TopNHash.EXCLUDE) continue;
-        int rowIndex = batchIndex;
-        if (vrg.selectedInUse) {
-          rowIndex = vrg.selected[batchIndex];
-        }
-        // Compute value and hashcode - we'd either store or forward them.
-        BytesWritable value = makeValueWritable(vrg, rowIndex);
-        int distKeyLength = -1;
-        int hashCode;
-        if (result == TopNHash.FORWARD) {
-          HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
-          distKeyLength = firstKey.getDistKeyLength();
-          hashCode = firstKey.hashCode();
-          collect(firstKey, value);
-        } else {
-          hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex);
-          reducerHash.storeValue(result, hashCode, value, true);
-          distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
-        }
-        // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
-        // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
-        if (numDistinctExprs > 1) {
-          populatedCachedDistributionKeys(vrg, rowIndex, 1);
-          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
-        }
-      }
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (IOException e) {
-      throw new HiveException(e);
-    }
-  }
-
-  /**
-   * This function creates and forwards all the additional KVs for the multi-distinct case,
-   * after the first (0th) KV pertaining to the row has already been stored or forwarded.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param hashCode the partitioning hash code to use; same as for the first KV
-   * @param value the value to use; same as for the first KV
-   * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
-   * @param tag the tag
-   * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
-   */
-  private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
-      BytesWritable value, int distKeyLength, int tag, int baseIndex)
-          throws HiveException, SerDeException, IOException {
-    // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
-    //       the first key has already been stored. There's few bytes difference between keys
-    //       for different distincts, and the value/etc. are all the same.
-    //       We could store deltas to re-gen extra rows when flushing TopN.
-    for (int i = 1; i < numDistinctExprs; i++) {
-      if (i != baseIndex) {
-        System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
-      }
-      populateCachedDistinctKeys(vrg, rowIndex, i);
-      HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
-      hiveKey.setHashCode(hashCode);
-      collect(hiveKey, value);
-    }
-  }
-
-  /**
-   * Populate distribution keys part of cachedKeys for a particular row from the batch.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param index the cachedKeys index to write to
-   */
-  private void populatedCachedDistributionKeys(
-      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
-    for (int i = 0; i < numDistributionKeys; i++) {
-      int batchColumn = keyEval[i].getOutputColumn();
-      ColumnVector vectorColumn = vrg.cols[batchColumn];
-      cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
-    }
-    if (cachedKeys[index].length > numDistributionKeys) {
-      cachedKeys[index][numDistributionKeys] = null;
-    }
-  }
-
-  /**
-   * Populate distinct keys part of cachedKeys for a particular row from the batch.
-   * @param vrg the batch
-   * @param rowIndex the row index in the batch
-   * @param index the cachedKeys index to write to
-   */
-  private void populateCachedDistinctKeys(
-      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
-    StandardUnion union;
-    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
-        (byte)index, new Object[distinctColIndices.get(index).size()]);
-    Object[] distinctParameters = (Object[]) union.getObject();
-    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
-      int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
-      int batchColumn = keyEval[distinctColIndex].getOutputColumn();
-      distinctParameters[distinctParamI] =
-          keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
-    }
-    union.setTag((byte) index);
-  }
+  public void processOp(Object data, int tag) throws HiveException {
+    VectorizedRowBatch vrg = (VectorizedRowBatch) data;
 
-  private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
-      throws HiveException, SerDeException {
-    for (int i = 0; i < valueEval.length; i++) {
-      int batchColumn = valueEval[i].getOutputColumn();
-      ColumnVector vectorColumn = vrg.cols[batchColumn];
-      cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
+    for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+      Object row = getRowObject(vrg, batchIndex);
+      super.processOp(row, tag);
     }
-    // Serialize the value
-    return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
   }
 
-  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException {
-    // Evaluate the HashCode
-    int keyHashCode = 0;
-    if (partitionEval.length == 0) {
-      // If no partition cols, just distribute the data uniformly to provide better
-      // load balance. If the requirement is to have a single reducer, we should set
-      // the number of reducers to 1.
-      // Use a constant seed to make the code deterministic.
-      if (random == null) {
-        random = new Random(12345);
-      }
-      keyHashCode = random.nextInt();
-    } else {
-      for (int p = 0; p < partitionEval.length; p++) {
-        ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
-        Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
-        keyHashCode = keyHashCode
-            * 31
-            + ObjectInspectorUtils.hashCode(
-                partitionValue,
-                partitionWriters[p].getObjectInspector());
-      }
-    }
-    return buckNum < 0  ? keyHashCode : keyHashCode * 31 + buckNum;
-  }
-
-  private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex)
+  private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex)
       throws HiveException {
-    if (partitionEval.length != 0) {
-      for (int p = 0; p < partitionEval.length; p++) {
-        ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
-        Object partitionValue = partitionWriters[p].writeValue(columnVector,
-            rowIndex);
-        if (partitionValue != null) {
-          return false;
-        }
+    int batchIndex = rowIndex;
+    if (vrg.selectedInUse) {
+      batchIndex = vrg.selected[rowIndex];
+    }
+    for (int i = 0; i < vrg.projectionSize; i++) {
+      ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
+      if (vectorColumn != null) {
+        singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex);
+      } else {
+        // Some columns from tables are not used.
+        singleRow[i] = null;
       }
-      return true;
-    }
-    return false;
-  }
-
-  private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
-    int bucketNum = 0;
-    for (int p = 0; p < bucketEval.length; p++) {
-      ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
-      Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
-      bucketNum = bucketNum
-          * 31
-          + ObjectInspectorUtils.hashCode(
-              bucketValue,
-              bucketWriters[p].getObjectInspector());
     }
-
-    if (bucketNum < 0) {
-      bucketNum = -1 * bucketNum;
-    }
-
-    return bucketNum % numBuckets;
-  }
-
-  static public String getOperatorName() {
-    return "RS";
-  }
-
-  public VectorExpression[] getPartitionEval() {
-    return partitionEval;
-  }
-
-  public void setPartitionEval(VectorExpression[] partitionEval) {
-    this.partitionEval = partitionEval;
-  }
-
-  public VectorExpression[] getValueEval() {
-    return valueEval;
-  }
-
-  public void setValueEval(VectorExpression[] valueEval) {
-    this.valueEval = valueEval;
-  }
-
-  public VectorExpression[] getKeyEval() {
-    return keyEval;
-  }
-
-  public void setKeyEval(VectorExpression[] keyEval) {
-    this.keyEval = keyEval;
+    return singleRow;
   }
 }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Sun Oct  5 22:26:43 2014
@@ -1889,47 +1889,47 @@ public class VectorizationContext {
   // TODO:   And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used..  Right now they are conservatively
   //         marked map-side (HASH).
   static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMinLong.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMinDouble.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMinString.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMinDecimal.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMaxLong.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMaxDouble.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMaxString.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMaxDecimal.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,          GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFSumLong.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFSumDouble.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFSumDecimal.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,                          VectorUDAFMinLong.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFMinDouble.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,                          VectorUDAFMinString.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFMinDecimal.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    null,                          VectorUDAFMaxLong.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFMaxDouble.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          null,                          VectorUDAFMaxString.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFMaxDecimal.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,                   GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY,          GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,             null,                          VectorUDAFSumLong.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           null,                          VectorUDAFSumDouble.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                null,                          VectorUDAFSumDecimal.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY,   GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,           GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,                GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
   }};
 
   public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce)

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sun Oct  5 22:26:43 2014
@@ -140,6 +140,20 @@ public class VectorizedRowBatchCtx {
   
 
   /**
+   * Initializes the VectorizedRowBatch context based on an scratch column type map and
+   * object inspector.
+   * @param columnTypeMap
+   * @param rowOI
+   *          Object inspector that shapes the column types
+   */
+  public void init(Map<Integer, String> columnTypeMap,
+      StructObjectInspector rowOI) {
+    this.columnTypeMap = columnTypeMap;
+    this.rowOI= rowOI;
+    this.rawRowOI = rowOI;
+  }
+
+  /**
    * Initializes VectorizedRowBatch context based on the
    * split and Hive configuration (Job conf with hive Plan).
    *

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Sun Oct  5 22:26:43 2014
@@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite
     closure.assign(writers, oids);
   }
 
+  /**
+   * Creates the value writers for an struct object inspector.
+   * Creates an appropriate output object inspector.
+   */
+  public static void processVectorInspector(
+      StructObjectInspector structObjInspector,
+      SingleOIDClosure closure)
+      throws HiveException {
+    List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs();
+    VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()];
+    List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length);
+    ArrayList<String> columnNames = new ArrayList<String>();
+    int i = 0;
+    for(StructField field : fields) {
+      ObjectInspector fieldObjInsp = field.getFieldObjectInspector();
+      writers[i] = VectorExpressionWriterFactory.
+                genVectorExpressionWritable(fieldObjInsp);
+      columnNames.add(field.getFieldName());
+      oids.add(writers[i].getObjectInspector());
+      i++;
+    }
+    ObjectInspector objectInspector = ObjectInspectorFactory.
+        getStandardStructObjectInspector(columnNames,oids);
+    closure.assign(writers, objectInspector);
+  }
 
   /**
    * Returns {@link VectorExpressionWriter} objects for the fields in the given

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java Sun Oct  5 22:26:43 2014
@@ -147,7 +147,7 @@ public class HookContext {
  }
 
   public String getOperationName() {
-    return SessionState.get().getHiveOperation().name();
+    return queryPlan.getOperationName();
   }
 
   public String getUserName() {

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sun Oct  5 22:26:43 2014
@@ -40,6 +40,8 @@ import java.util.regex.Pattern;
  * are used by the compactor and cleaner and thus must be format agnostic.
  */
 public class AcidUtils {
+  // This key will be put in the conf file when planning an acid operation
+  public static final String CONF_ACID_KEY = "hive.doing.acid";
   public static final String BASE_PREFIX = "base_";
   public static final String DELTA_PREFIX = "delta_";
   public static final PathFilter deltaFileFilter = new PathFilter() {

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Sun Oct  5 22:26:43 2014
@@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe
   }
 
   public IOContext getIOContext() {
-    return IOContext.get();
+    return IOContext.get(jobConf.get(Utilities.INPUT_NAME));
   }
 
-  public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
+  private void initIOContext(long startPos, boolean isBlockPointer,
+      Path inputPath) {
     ioCxtRef = this.getIOContext();
     ioCxtRef.currentBlockStart = startPos;
     ioCxtRef.isBlockPointer = isBlockPointer;
@@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe
 
     boolean blockPointer = false;
     long blockStart = -1;
-    FileSplit fileSplit = (FileSplit) split;
+    FileSplit fileSplit = split;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(job);
     if (inputFormatClass.getName().contains("SequenceFile")) {
@@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe
       blockStart = in.getPosition();
       in.close();
     }
+    this.jobConf = job;
     this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
 
     this.initIOContextSortedProps(split, recordReader, job);
   }
 
   public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) {
+    this.jobConf = job;
+
     this.getIOContext().resetSortingValues();
     this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
 

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sun Oct  5 22:26:43 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -253,7 +254,14 @@ public class HiveInputFormat<K extends W
   }
 
   protected void init(JobConf job) {
-    mrwork = Utilities.getMapWork(job);
+    if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+      mrwork = (MapWork) Utilities.getMergeWork(job);
+      if (mrwork == null) {
+        mrwork = Utilities.getMapWork(job);
+      }
+    } else {
+      mrwork = Utilities.getMapWork(job);
+    }
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 
@@ -420,6 +428,9 @@ public class HiveInputFormat<K extends W
 
   public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
 
+    // ensure filters are not set from previous pushFilters
+    jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
+    jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
     TableScanDesc scanDesc = tableScan.getConf();
     if (scanDesc == null) {
       return;

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Sun Oct  5 22:26:43 2014
@@ -18,7 +18,14 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 
 /**
@@ -31,14 +38,25 @@ import org.apache.hadoop.fs.Path;
  */
 public class IOContext {
 
-
   private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
     @Override
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
-  public static IOContext get() {
-    return IOContext.threadLocal.get();
+  private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
+  private static IOContext ioContext = new IOContext();
+
+  public static Map<String, IOContext> getMap() {
+    return inputNameIOContextMap;
+  }
+
+  public static IOContext get(String inputName) {
+    if (inputNameIOContextMap.containsKey(inputName) == false) {
+      IOContext ioContext = new IOContext();
+      inputNameIOContextMap.put(inputName, ioContext);
+    }
+
+    return inputNameIOContextMap.get(inputName);
   }
 
   public static void clear() {

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sun Oct  5 22:26:43 2014
@@ -132,7 +132,7 @@ public class OrcInputFormat  implements 
   @Override
   public boolean shouldSkipCombine(Path path,
                                    Configuration conf) throws IOException {
-    return AcidUtils.isAcid(path, conf);
+    return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf);
   }
 
   private static class OrcRecordReader

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Sun Oct  5 22:26:43 2014
@@ -118,13 +118,11 @@ public class OrcNewInputFormat extends I
   public List<InputSplit> getSplits(JobContext jobContext)
       throws IOException, InterruptedException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
-    Configuration conf =
-        ShimLoader.getHadoopShims().getConfiguration(jobContext);
     List<OrcSplit> splits =
         OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
         .getConfiguration(jobContext));
-    List<InputSplit> result = new ArrayList<InputSplit>();
-    for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) {
+    List<InputSplit> result = new ArrayList<InputSplit>(splits.size());
+    for(OrcSplit split: splits) {
       result.add(new OrcNewSplit(split));
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Sun Oct  5 22:26:43 2014
@@ -418,138 +418,120 @@ class RunLengthIntegerWriterV2 implement
 
   private void determineEncoding() {
 
-    int idx = 0;
+    // we need to compute zigzag values for DIRECT encoding if we decide to
+    // break early for delta overflows or for shorter runs
+    computeZigZagLiterals();
 
-    // for identifying monotonic sequences
-    boolean isIncreasing = false;
-    int increasingCount = 1;
-    boolean isDecreasing = false;
-    int decreasingCount = 1;
+    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
 
-    // for identifying type of delta encoding
-    min = literals[0];
-    long max = literals[0];
-    isFixedDelta = true;
-    long currDelta = 0;
+    // not a big win for shorter runs to determine encoding
+    if (numLiterals <= MIN_REPEAT) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
 
-    min = literals[0];
-    long deltaMax = 0;
+    // DELTA encoding check
 
-    // populate all variables to identify the encoding type
-    if (numLiterals >= 1) {
-      currDelta = literals[1] - literals[0];
-      for(int i = 0; i < numLiterals; i++) {
-        if (i > 0 && literals[i] >= max) {
-          max = literals[i];
-          increasingCount++;
-        }
+    // for identifying monotonic sequences
+    boolean isIncreasing = true;
+    boolean isDecreasing = true;
+    this.isFixedDelta = true;
 
-        if (i > 0 && literals[i] <= min) {
-          min = literals[i];
-          decreasingCount++;
-        }
+    this.min = literals[0];
+    long max = literals[0];
+    final long initialDelta = literals[1] - literals[0];
+    long currDelta = initialDelta;
+    long deltaMax = initialDelta;
+    this.adjDeltas[0] = initialDelta;
+
+    for (int i = 1; i < numLiterals; i++) {
+      final long l1 = literals[i];
+      final long l0 = literals[i - 1];
+      currDelta = l1 - l0;
+      min = Math.min(min, l1);
+      max = Math.max(max, l1);
+
+      isIncreasing &= (l0 <= l1);
+      isDecreasing &= (l0 >= l1);
+
+      isFixedDelta &= (currDelta == initialDelta);
+      if (i > 1) {
+        adjDeltas[i - 1] = Math.abs(currDelta);
+        deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
+      }
+    }
 
-        // if delta doesn't changes then mark it as fixed delta
-        if (i > 0 && isFixedDelta) {
-          if (literals[i] - literals[i - 1] != currDelta) {
-            isFixedDelta = false;
-          }
+    // its faster to exit under delta overflow condition without checking for
+    // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+    // overhead than PATCHED_BASE
+    if (!utils.isSafeSubtract(max, min)) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
 
-          fixedDelta = currDelta;
-        }
+    // invariant - subtracting any number from any other in the literals after
+    // this point won't overflow
 
-        // populate zigzag encoded literals
-        long zzEncVal = 0;
-        if (signed) {
-          zzEncVal = utils.zigzagEncode(literals[i]);
-        } else {
-          zzEncVal = literals[i];
-        }
-        zigzagLiterals[idx] = zzEncVal;
-        idx++;
+    // if initialDelta is 0 then we cannot delta encode as we cannot identify
+    // the sign of deltas (increasing or decreasing)
+    if (initialDelta != 0) {
+
+      // if min is equal to max then the delta is 0, this condition happens for
+      // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+      if (min == max) {
+        assert isFixedDelta : min + "==" + max +
+            ", isFixedDelta cannot be false";
+        assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
+        fixedDelta = 0;
+        encoding = EncodingType.DELTA;
+        return;
+      }
 
-        // max delta value is required for computing the fixed bits
-        // required for delta blob in delta encoding
-        if (i > 0) {
-          if (i == 1) {
-            // first value preserve the sign
-            adjDeltas[i - 1] = literals[i] - literals[i - 1];
-          } else {
-            adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
-            if (adjDeltas[i - 1] > deltaMax) {
-              deltaMax = adjDeltas[i - 1];
-            }
-          }
-        }
+      if (isFixedDelta) {
+        assert currDelta == initialDelta
+            : "currDelta should be equal to initialDelta for fixed delta encoding";
+        encoding = EncodingType.DELTA;
+        fixedDelta = currDelta;
+        return;
       }
 
       // stores the number of bits required for packing delta blob in
       // delta encoding
       bitsDeltaMax = utils.findClosestNumBits(deltaMax);
 
-      // if decreasing count equals total number of literals then the
-      // sequence is monotonically decreasing
-      if (increasingCount == 1 && decreasingCount == numLiterals) {
-        isDecreasing = true;
-      }
-
-      // if increasing count equals total number of literals then the
-      // sequence is monotonically increasing
-      if (decreasingCount == 1 && increasingCount == numLiterals) {
-        isIncreasing = true;
+      // monotonic condition
+      if (isIncreasing || isDecreasing) {
+        encoding = EncodingType.DELTA;
+        return;
       }
     }
 
-    // if the sequence is both increasing and decreasing then it is not
-    // monotonic
-    if (isDecreasing && isIncreasing) {
-      isDecreasing = false;
-      isIncreasing = false;
-    }
-
-    // fixed delta condition
-    if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
-      encoding = EncodingType.DELTA;
-      return;
-    }
-
-    // monotonic condition
-    if (isIncreasing || isDecreasing) {
-      encoding = EncodingType.DELTA;
-      return;
-    }
+    // PATCHED_BASE encoding check
 
     // percentile values are computed for the zigzag encoded values. if the
     // number of bit requirement between 90th and 100th percentile varies
     // beyond a threshold then we need to patch the values. if the variation
-    // is not significant then we can use direct or delta encoding
-
-    double p = 0.9;
-    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p);
-
-    p = 1.0;
-    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p);
+    // is not significant then we can use direct encoding
 
+    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
     int diffBitsLH = zzBits100p - zzBits90p;
 
     // if the difference between 90th percentile and 100th percentile fixed
     // bits is > 1 then we need patch the values
-    if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
-        && isFixedDelta == false) {
+    if (diffBitsLH > 1) {
+
       // patching is done only on base reduced values.
       // remove base from literals
-      for(int i = 0; i < numLiterals; i++) {
+      for (int i = 0; i < numLiterals; i++) {
         baseRedLiterals[i] = literals[i] - min;
       }
 
       // 95th percentile width is used to determine max allowed value
       // after which patching will be done
-      p = 0.95;
-      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p);
+      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
 
       // 100th percentile is used to compute the max patch width
-      p = 1.0;
-      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p);
+      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
 
       // after base reducing the values, if the difference in bits between
       // 95th percentile and 100th percentile value is zero then there
@@ -565,19 +547,24 @@ class RunLengthIntegerWriterV2 implement
         encoding = EncodingType.DIRECT;
         return;
       }
-    }
-
-    // if difference in bits between 95th percentile and 100th percentile is
-    // 0, then patch length will become 0. Hence we will fallback to direct
-    if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
-        && isFixedDelta == false) {
+    } else {
+      // if difference in bits between 95th percentile and 100th percentile is
+      // 0, then patch length will become 0. Hence we will fallback to direct
       encoding = EncodingType.DIRECT;
       return;
     }
+  }
 
-    // this should not happen
-    if (encoding == null) {
-      throw new RuntimeException("Integer encoding cannot be determined.");
+  private void computeZigZagLiterals() {
+    // populate zigzag encoded literals
+    long zzEncVal = 0;
+    for (int i = 0; i < numLiterals; i++) {
+      if (signed) {
+        zzEncVal = utils.zigzagEncode(literals[i]);
+      } else {
+        zzEncVal = literals[i];
+      }
+      zigzagLiterals[i] = zzEncVal;
     }
   }
 
@@ -700,7 +687,7 @@ class RunLengthIntegerWriterV2 implement
     patchWidth = 0;
     gapVsPatchList = null;
     min = 0;
-    isFixedDelta = false;
+    isFixedDelta = true;
   }
 
   @Override

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Sun Oct  5 22:26:43 2014
@@ -1283,4 +1283,9 @@ final class SerializationUtils {
         + ((readBuffer[rbOffset + 7] & 255) << 0));
   }
 
+  // Do not want to use Guava LongMath.checkedSubtract() here as it will throw
+  // ArithmeticException in case of overflow
+  public boolean isSafeSubtract(long left, long right) {
+    return (left ^ right) >= 0 | (left ^ (left - right)) >= 0;
+  }
 }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java Sun Oct  5 22:26:43 2014
@@ -13,9 +13,6 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.convert;
 
-import java.util.List;
-
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -33,7 +30,7 @@ public class ArrayWritableGroupConverter
   private Writable[] mapPairContainer;
 
   public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent,
-      final int index, List<TypeInfo> hiveSchemaTypeInfos) {
+      final int index) {
     this.parent = parent;
     this.index = index;
     int count = groupType.getFieldCount();
@@ -43,8 +40,7 @@ public class ArrayWritableGroupConverter
     isMap = count == 2;
     converters = new Converter[count];
     for (int i = 0; i < count; i++) {
-      converters[i] = getConverterFromDescription(groupType.getType(i), i, this,
-          hiveSchemaTypeInfos);
+      converters[i] = getConverterFromDescription(groupType.getType(i), i, this);
     }
   }
 

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java Sun Oct  5 22:26:43 2014
@@ -16,7 +16,6 @@ package org.apache.hadoop.hive.ql.io.par
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -37,21 +36,19 @@ public class DataWritableGroupConverter 
   private final Object[] currentArr;
   private Writable[] rootMap;
 
-  public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema,
-      final List<TypeInfo> hiveSchemaTypeInfos) {
-    this(requestedSchema, null, 0, tableSchema, hiveSchemaTypeInfos);
+  public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) {
+    this(requestedSchema, null, 0, tableSchema);
     final int fieldCount = tableSchema.getFieldCount();
     this.rootMap = new Writable[fieldCount];
   }
 
   public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent,
-      final int index, final List<TypeInfo> hiveSchemaTypeInfos) {
-    this(groupType, parent, index, groupType, hiveSchemaTypeInfos);
+      final int index) {
+    this(groupType, parent, index, groupType);
   }
 
   public DataWritableGroupConverter(final GroupType selectedGroupType,
-      final HiveGroupConverter parent, final int index, final GroupType containingGroupType,
-      final List<TypeInfo> hiveSchemaTypeInfos) {
+      final HiveGroupConverter parent, final int index, final GroupType containingGroupType) {
     this.parent = parent;
     this.index = index;
     final int totalFieldCount = containingGroupType.getFieldCount();
@@ -65,8 +62,7 @@ public class DataWritableGroupConverter 
       Type subtype = selectedFields.get(i);
       if (containingGroupType.getFields().contains(subtype)) {
         converters[i] = getConverterFromDescription(subtype,
-            containingGroupType.getFieldIndex(subtype.getName()), this,
-            hiveSchemaTypeInfos);
+            containingGroupType.getFieldIndex(subtype.getName()), this);
       } else {
         throw new IllegalStateException("Group type [" + containingGroupType +
             "] does not contain requested field: " + subtype);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java Sun Oct  5 22:26:43 2014
@@ -31,10 +31,8 @@ public class DataWritableRecordConverter
 
   private final DataWritableGroupConverter root;
 
-  public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema,
-      final List<TypeInfo> hiveColumnTypeInfos) {
-    this.root = new DataWritableGroupConverter(requestedSchema, tableSchema,
-        hiveColumnTypeInfos);
+  public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) {
+    this.root = new DataWritableGroupConverter(requestedSchema, tableSchema);
   }
 
   @Override

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java Sun Oct  5 22:26:43 2014
@@ -16,19 +16,12 @@ package org.apache.hadoop.hive.ql.io.par
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -152,32 +145,6 @@ public enum ETypeConverter {
         }
       };
     }
-  },
-  ECHAR_CONVERTER(HiveCharWritable.class) {
-    @Override
-    Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
-      return new BinaryConverter<HiveCharWritable>(type, parent, index) {
-        @Override
-        protected HiveCharWritable convert(Binary binary) {
-          HiveChar hiveChar = new HiveChar();
-          hiveChar.setValue(binary.toStringUsingUTF8());
-          return new HiveCharWritable(hiveChar);
-        }
-      };
-    }
-  },
-  EVARCHAR_CONVERTER(HiveVarcharWritable.class) {
-    @Override
-    Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
-      return new BinaryConverter<HiveVarcharWritable>(type, parent, index) {
-        @Override
-        protected HiveVarcharWritable convert(Binary binary) {
-          HiveVarchar hiveVarchar = new HiveVarchar();
-          hiveVarchar.setValue(binary.toStringUsingUTF8());
-          return new HiveVarcharWritable(hiveVarchar);
-        }
-      };
-    }
   };
 
   final Class<?> _type;
@@ -193,7 +160,7 @@ public enum ETypeConverter {
   abstract Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent);
 
   public static Converter getNewConverter(final PrimitiveType type, final int index,
-      final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
+      final HiveGroupConverter parent) {
     if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) {
       //TODO- cleanup once parquet support Timestamp type annotation.
       return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent);
@@ -201,15 +168,7 @@ public enum ETypeConverter {
     if (OriginalType.DECIMAL == type.getOriginalType()) {
       return EDECIMAL_CONVERTER.getConverter(type, index, parent);
     } else if (OriginalType.UTF8 == type.getOriginalType()) {
-      if (hiveSchemaTypeInfos.get(index).getTypeName()
-          .startsWith(serdeConstants.CHAR_TYPE_NAME)) {
-        return ECHAR_CONVERTER.getConverter(type, index, parent);
-      } else if (hiveSchemaTypeInfos.get(index).getTypeName()
-          .startsWith(serdeConstants.VARCHAR_TYPE_NAME)) {
-        return EVARCHAR_CONVERTER.getConverter(type, index, parent);
-      } else if (type.isPrimitive()) {
-        return ESTRING_CONVERTER.getConverter(type, index, parent);
-      }
+      return ESTRING_CONVERTER.getConverter(type, index, parent);
     }
 
     Class<?> javaType = type.getPrimitiveTypeName().javaType;

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java Sun Oct  5 22:26:43 2014
@@ -13,9 +13,6 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.convert;
 
-import java.util.List;
-
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.Writable;
 
 import parquet.io.api.Converter;
@@ -26,20 +23,17 @@ import parquet.schema.Type.Repetition;
 public abstract class HiveGroupConverter extends GroupConverter {
 
   protected static Converter getConverterFromDescription(final Type type, final int index,
-      final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
+      final HiveGroupConverter parent) {
     if (type == null) {
       return null;
     }
     if (type.isPrimitive()) {
-      return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent,
-          hiveSchemaTypeInfos);
+      return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent);
     } else {
       if (type.asGroupType().getRepetition() == Repetition.REPEATED) {
-        return new ArrayWritableGroupConverter(type.asGroupType(), parent, index,
-            hiveSchemaTypeInfos);
+        return new ArrayWritableGroupConverter(type.asGroupType(), parent, index);
       } else {
-        return new DataWritableGroupConverter(type.asGroupType(), parent, index,
-            hiveSchemaTypeInfos);
+        return new DataWritableGroupConverter(type.asGroupType(), parent, index);
       }
     }
   }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Sun Oct  5 22:26:43 2014
@@ -14,7 +14,6 @@
 package org.apache.hadoop.hive.ql.io.parquet.read;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -24,8 +23,6 @@ import org.apache.hadoop.hive.ql.io.IOCo
 import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -56,7 +53,7 @@ public class DataWritableReadSupport ext
    * From a string which columns names (including hive column), return a list
    * of string columns
    *
-   * @param comma separated list of columns
+   * @param columns comma separated list of columns
    * @return list with virtual columns removed
    */
   private static List<String> getColumns(final String columns) {
@@ -64,27 +61,6 @@ public class DataWritableReadSupport ext
         removeVirtualColumns(StringUtils.getStringCollection(columns));
   }
 
-  private static List<TypeInfo> getColumnTypes(Configuration configuration) {
-
-    List<String> columnNames;
-    String columnNamesProperty = configuration.get(IOConstants.COLUMNS);
-    if (columnNamesProperty.length() == 0) {
-      columnNames = new ArrayList<String>();
-    } else {
-      columnNames = Arrays.asList(columnNamesProperty.split(","));
-    }
-    List<TypeInfo> columnTypes;
-    String columnTypesProperty = configuration.get(IOConstants.COLUMNS_TYPES);
-    if (columnTypesProperty.length() == 0) {
-      columnTypes = new ArrayList<TypeInfo>();
-    } else {
-      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypesProperty);
-    }
-
-    columnTypes = VirtualColumn.removeVirtualColumnTypes(columnNames, columnTypes);
-    return columnTypes;
-  }
-
   /**
    *
    * It creates the readContext for Parquet side with the requested schema during the init phase.
@@ -173,8 +149,7 @@ public class DataWritableReadSupport ext
     }
     final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
         parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
-    return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema,
-        getColumnTypes(configuration));
+    return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
   }
 
   /**
@@ -194,4 +169,4 @@ public class DataWritableReadSupport ext
     }
     return requestedSchema;
   }
-}
+}
\ No newline at end of file

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Sun Oct  5 22:26:43 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmg
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.thrift.TException;
@@ -42,10 +43,10 @@ public class DbLockManager implements Hi
   private static final long MAX_SLEEP = 15000;
   private HiveLockManagerCtx context;
   private Set<DbHiveLock> locks;
-  private HiveMetaStoreClient client;
+  private IMetaStoreClient client;
   private long nextSleep = 50;
 
-  DbLockManager(HiveMetaStoreClient client) {
+  DbLockManager(IMetaStoreClient client) {
     locks = new HashSet<DbHiveLock>();
     this.client = client;
   }
@@ -210,8 +211,8 @@ public class DbLockManager implements Hi
   /**
    * Clear the memory of the locks in this object.  This won't clear the locks from the database.
    * It is for use with
-   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and
-   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}.
+   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).commitTxn} and
+   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).rollbackTxn}.
    */
   void clearLocalLockRecords() {
     locks.clear();

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Sun Oct  5 22:26:43 2014
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.thrift.TException;
 
@@ -46,7 +48,7 @@ public class DbTxnManager extends HiveTx
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
   private DbLockManager lockMgr = null;
-  private HiveMetaStoreClient client = null;
+  private IMetaStoreClient client = null;
   private long txnId = 0;
 
   DbTxnManager() {
@@ -284,7 +286,7 @@ public class DbTxnManager extends HiveTx
   public ValidTxnList getValidTxns() throws LockException {
     init();
     try {
-      return client.getValidTxns();
+      return client.getValidTxns(txnId);
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -311,7 +313,6 @@ public class DbTxnManager extends HiveTx
     try {
       if (txnId > 0) rollbackTxn();
       if (lockMgr != null) lockMgr.close();
-      if (client != null) client.close();
     } catch (Exception e) {
       LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
       + ">, swallowing as there is nothing we can do with it.");
@@ -326,10 +327,12 @@ public class DbTxnManager extends HiveTx
             "methods.");
       }
       try {
-        client = new HiveMetaStoreClient(conf);
+        Hive db = Hive.get(conf);
+        client = db.getMSC();
       } catch (MetaException e) {
-        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(),
-            e);
+        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
+      } catch (HiveException e) {
+        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
       }
     }
   }