You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/31 23:06:35 UTC

[1/9] hive git commit: HIVE-14658 : UDF abs throws NPE when input arg type is string (Niklaus Xiao via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/hive-14535 67f1b92ba -> 87dcab470


HIVE-14658 : UDF abs throws NPE when input arg type is string (Niklaus Xiao via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec4673bb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec4673bb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec4673bb

Branch: refs/heads/hive-14535
Commit: ec4673bbc61b2555ef2d992266055a331443b4d6
Parents: 20824f2
Author: niklaus xiao <st...@live.cn>
Authored: Tue Aug 30 17:29:37 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Aug 30 17:29:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java   | 3 +++
 .../apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java   | 6 ++++++
 2 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ec4673bb/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
index 1fdd41c..a8e2786 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
@@ -130,6 +130,9 @@ public class GenericUDFAbs extends GenericUDF {
     case STRING:
     case DOUBLE:
       valObject = inputConverter.convert(valObject);
+      if (valObject == null) {
+        return null;
+      }
       resultDouble.set(Math.abs(((DoubleWritable) valObject).get()));
       return resultDouble;
     case DECIMAL:

http://git-wip-us.apache.org/repos/asf/hive/blob/ec4673bb/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
index 8c531ea..6dbb33f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
@@ -133,6 +133,12 @@ public class TestGenericUDFAbs extends TestCase {
     output = (DoubleWritable) udf.evaluate(args);
 
     assertEquals("abs() test for String failed ", "123.45", output.toString());
+
+    valueObj = new DeferredJavaObject(new Text("foo"));
+    args[0] = valueObj;
+    output = (DoubleWritable) udf.evaluate(args);
+
+    assertEquals("abs() test for String failed ", null, output);
   }
 
   public void testHiveDecimal() throws HiveException {


[3/9] hive git commit: HIVE-14233 - Improve vectorization for ACID by eliminating row-by-row stitching (Saket Saurabh via Eugene Koifman)

Posted by se...@apache.org.
HIVE-14233 - Improve vectorization for ACID by eliminating row-by-row stitching (Saket Saurabh via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1f6949f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1f6949f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1f6949f9

Branch: refs/heads/hive-14535
Commit: 1f6949f958d63f5b1552a2417d7c042a416cdbe0
Parents: ab60591
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 30 20:38:28 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 30 20:38:28 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 134 +--
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |   7 +-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 822 +++++++++++++++++++
 ...ommands2WithSplitUpdateAndVectorization.java |  52 ++
 .../TestVectorizedOrcAcidRowBatchReader.java    | 263 ++++++
 .../queries/clientpositive/acid_vectorization.q |  12 +
 .../clientpositive/acid_vectorization.q.out     |  62 ++
 8 files changed, 1302 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb0d96f..aa03b63 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1187,6 +1187,12 @@ public class HiveConf extends Configuration {
     HIVE_TRANSACTIONAL_TABLE_SCAN("hive.transactional.table.scan", false,
         "internal usage only -- do transaction (ACID) table scan.", true),
 
+    HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY("hive.transactional.events.mem", 10000000,
+        "Vectorized ACID readers can often load all the delete events from all the delete deltas\n"
+        + "into memory to optimize for performance. To prevent out-of-memory errors, this is a rough heuristic\n"
+        + "that limits the total number of delete events that can be loaded into memory at once.\n"
+        + "Roughly it has been set to 10 million delete events per bucket (~160 MB).\n"),
+
     HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
         "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index d5172ed..70003ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
-import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.SchemaEvolution;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -41,26 +38,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
-import org.apache.orc.ColumnStatistics;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.OrcTail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -73,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Metastore;
 import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -89,22 +67,33 @@ import org.apache.hadoop.hive.ql.io.BatchToRowReader;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -117,7 +106,17 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.orc.ColumnStatistics;
 import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.SchemaEvolution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -318,7 +317,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     List<OrcProto.Type> types = file.getTypes();
     options.include(genIncludedColumns(types, conf, isOriginal));
     setSearchArgument(options, types, conf, isOriginal);
-    return (RecordReader) file.rowsOptions(options);
+    return file.rowsOptions(options);
   }
 
   public static boolean isOriginal(Reader file) {
@@ -1793,17 +1792,28 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     reporter.setStatus(inputSplit.toString());
 
+    boolean isFastVectorizedReaderAvailable =
+        VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit);
+
+    if (vectorMode && isFastVectorizedReaderAvailable) {
+      // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching.
+      return (org.apache.hadoop.mapred.RecordReader)
+          new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter);
+    }
+
     Options options = new Options(conf).reporter(reporter);
     final RowReader<OrcStruct> inner = getReader(inputSplit, options);
-
-    if (vectorMode) {
+    if (vectorMode && !isFastVectorizedReaderAvailable) {
+      // Vectorized regular ACID reader that does row-by-row stitching.
       return (org.apache.hadoop.mapred.RecordReader)
           new VectorizedOrcAcidRowReader(inner, conf,
               Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
     } else {
+      // Non-vectorized regular ACID reader.
       return new NullKeyRecordReader(inner, conf);
     }
   }
+
   /**
    * Return a RecordReader that is compatible with the Hive 0.12 reader
    * with NullWritable for the key instead of RecordIdentifier.
@@ -1890,35 +1900,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             : AcidUtils.deserializeDeltas(root, split.getDeltas());
     final Configuration conf = options.getConfiguration();
 
-
-    /**
-     * Do we have schema on read in the configuration variables?
-     */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
-
-    final Reader reader;
-    final int bucket;
-    Reader.Options readOptions = new Reader.Options().schema(schema);
+    final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
+    final int bucket = OrcInputFormat.getBucketForSplit(conf, split);
+    final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
     readOptions.range(split.getStart(), split.getLength());
 
-    // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
-    final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
-    readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
-    setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
-
-    if (split.hasBase()) {
-      bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf)
-          .getBucket();
-      OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
-          .maxLength(split.getFileLength());
-      if (split.hasFooter()) {
-        readerOptions.orcTail(split.getOrcTail());
-      }
-      reader = OrcFile.createReader(path, readerOptions);
-    } else {
-      bucket = (int) split.getStart();
-      reader = null;
-    }
     String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
     ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
       new ValidReadTxnList(txnString);
@@ -1930,7 +1916,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
       @Override
       public ObjectInspector getObjectInspector() {
-        return OrcStruct.createObjectInspector(0, schemaTypes);
+        return OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(readOptions.getSchema()));
       }
 
       @Override
@@ -1992,6 +1978,44 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         directory);
   }
 
+  static Reader.Options createOptionsForReader(Configuration conf) {
+    /**
+     * Do we have schema on read in the configuration variables?
+     */
+    TypeDescription schema =
+        OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+    Reader.Options readerOptions = new Reader.Options().schema(schema);
+    // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+    final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+    readerOptions.include(OrcInputFormat.genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+    OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+    return readerOptions;
+  }
+
+  static Reader createOrcReaderForSplit(Configuration conf, OrcSplit orcSplit) throws IOException {
+    Path path = orcSplit.getPath();
+    Reader reader;
+    if (orcSplit.hasBase()) {
+      OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+      readerOptions.maxLength(orcSplit.getFileLength());
+      if (orcSplit.hasFooter()) {
+        readerOptions.orcTail(orcSplit.getOrcTail());
+      }
+      reader = OrcFile.createReader(path, readerOptions);
+    } else {
+      reader = null;
+    }
+    return reader;
+  }
+
+  static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) {
+    if (orcSplit.hasBase()) {
+      return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+    } else {
+      return (int) orcSplit.getStart();
+    }
+  }
+
   public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
       OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
       List<StripeStatistics> stripeStats, int stripeCount) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index e46ca51..f4e06ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -49,7 +50,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
                               implements RecordReader {
@@ -79,6 +79,10 @@ public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
     return true;
   }
 
+  public VectorizedRowBatch createRowBatch() {
+    return this.schema.createRowBatch();
+  }
+
   @Override
   public long getRowNumber() {
     return baseRow + rowInBatch;
@@ -129,6 +133,7 @@ public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
     return previous;
   }
 
+  @Override
   public boolean nextBatch(VectorizedRowBatch theirBatch) throws IOException {
     // If the user hasn't been reading by row, use the fast path.
     if (rowInBatch >= batch.size) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
new file mode 100644
index 0000000..75c7680
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -0,0 +1,822 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+/**
+ * A fast vectorized batch reader class for ACID when split-update behavior is enabled.
+ * When split-update is turned on, row-by-row stitching could be avoided to create the final
+ * version of a row. Essentially, there are only insert and delete events. Insert events can be
+ * directly read from the base files/insert_only deltas in vectorized row batches. The deleted
+ * rows can then be easily indicated via the 'selected' field of the vectorized row batch.
+ * Refer HIVE-14233 for more details.
+ */
+public class VectorizedOrcAcidRowBatchReader
+    implements org.apache.hadoop.mapred.RecordReader<NullWritable,VectorizedRowBatch> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class);
+
+  private org.apache.hadoop.hive.ql.io.orc.RecordReader baseReader;
+  private VectorizedRowBatchCtx rbCtx;
+  private VectorizedRowBatch vectorizedRowBatchBase;
+  private long offset;
+  private long length;
+  private float progress = 0.0f;
+  private Object[] partitionValues;
+  private boolean addPartitionCols = true;
+  private ValidTxnList validTxnList;
+  private DeleteEventRegistry deleteEventRegistry;
+
+  public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf,
+        Reporter reporter) throws IOException {
+
+    final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
+    final AcidUtils.AcidOperationalProperties acidOperationalProperties
+            = AcidUtils.getAcidOperationalProperties(conf);
+
+    // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
+    // enabled for an ACID case and the file format is ORC.
+    boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
+                                   || !(inputSplit instanceof OrcSplit);
+    if (isReadNotAllowed) {
+      OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
+    }
+    final OrcSplit orcSplit = (OrcSplit) inputSplit;
+
+    rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+    reporter.setStatus(orcSplit.toString());
+    Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, orcSplit);
+    Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf);
+    readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions);
+
+    this.offset = orcSplit.getStart();
+    this.length = orcSplit.getLength();
+
+    // Careful with the range here now, we do not want to read the whole base file like deltas.
+    this.baseReader = reader.rowsOptions(readerOptions.range(offset, length));
+
+    // VectorizedRowBatchBase schema is picked up from the baseReader because the SchemaEvolution
+    // stuff happens at the ORC layer that understands how to map user schema to acid schema.
+    if (this.baseReader instanceof RecordReaderImpl) {
+      this.vectorizedRowBatchBase = ((RecordReaderImpl) this.baseReader).createRowBatch();
+    } else {
+      throw new IOException("Failed to create vectorized row batch for the reader of type "
+          + this.baseReader.getClass().getName());
+    }
+
+    int partitionColumnCount = (rbCtx != null) ? rbCtx.getPartitionColumnCount() : 0;
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      VectorizedRowBatchCtx.getPartitionValues(rbCtx, conf, orcSplit, partitionValues);
+    } else {
+      partitionValues = null;
+    }
+
+    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+
+    // Clone readerOptions for deleteEvents.
+    Reader.Options deleteEventReaderOptions = readerOptions.clone();
+    // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
+    // we always want to read all the delete delta files.
+    deleteEventReaderOptions.range(0, Long.MAX_VALUE);
+    //  Disable SARGs for deleteEventReaders, as SARGs have no meaning.
+    deleteEventReaderOptions.searchArgument(null, null);
+    try {
+      // See if we can load all the delete events from all the delete deltas in memory...
+      this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+    } catch (DeleteEventsOverflowMemoryException e) {
+      // If not, then create a set of hanging readers that do sort-merge to find the next smallest
+      // delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
+      this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+    }
+  }
+
+  /**
+   * Returns whether it is possible to create a valid instance of this class for a given split.
+   * @param conf is the job configuration
+   * @param inputSplit
+   * @return true if it is possible, else false.
+   */
+  public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) {
+    if (!(inputSplit instanceof OrcSplit)) {
+      return false; // must be an instance of OrcSplit.
+    }
+    // First check if we are reading any original files in the split.
+    // To simplify the vectorization logic, the vectorized acid row batch reader does not handle
+    // original files for now as they have a different schema than a regular ACID file.
+    final OrcSplit split = (OrcSplit) inputSplit;
+    if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) {
+      // When split-update is turned on for ACID, a more optimized vectorized batch reader
+      // can be created. But still only possible when we are *NOT* reading any originals.
+      return true;
+    }
+    return false; // no split-update or possibly reading originals!
+  }
+
+  private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
+    Path path = orcSplit.getPath();
+    Path root;
+    if (orcSplit.hasBase()) {
+      if (orcSplit.isOriginal()) {
+        root = path.getParent();
+      } else {
+        root = path.getParent().getParent();
+      }
+    } else {
+      root = path;
+    }
+    return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas());
+  }
+
+  @Override
+  public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+    try {
+      // Check and update partition cols if necessary. Ideally, this should be done
+      // in CreateValue as the partition is constant per split. But since Hive uses
+      // CombineHiveRecordReader and
+      // as this does not call CreateValue for each new RecordReader it creates, this check is
+      // required in next()
+      if (addPartitionCols) {
+        if (partitionValues != null) {
+          rbCtx.addPartitionColsToBatch(value, partitionValues);
+        }
+        addPartitionCols = false;
+      }
+      if (!baseReader.nextBatch(vectorizedRowBatchBase)) {
+        return false;
+      }
+    } catch (Exception e) {
+      throw new IOException("error iterating", e);
+    }
+
+    // Once we have read the VectorizedRowBatchBase from the file, there are two kinds of cases
+    // for which we might have to discard rows from the batch:
+    // Case 1- when the row is created by a transaction that is not valid, or
+    // Case 2- when the row has been deleted.
+    // We will go through the batch to discover rows which match any of the cases and specifically
+    // remove them from the selected vector. Of course, selectedInUse should also be set.
+
+    BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size);
+    if (vectorizedRowBatchBase.selectedInUse) {
+      // When selectedInUse is true, start with every bit set to false and selectively set
+      // certain bits to true based on the selected[] vector.
+      selectedBitSet.set(0, vectorizedRowBatchBase.size, false);
+      for (int j = 0; j < vectorizedRowBatchBase.size; ++j) {
+        int i = vectorizedRowBatchBase.selected[j];
+        selectedBitSet.set(i);
+      }
+    } else {
+      // When selectedInUse is set to false, everything in the batch is selected.
+      selectedBitSet.set(0, vectorizedRowBatchBase.size, true);
+    }
+
+    // Case 1- find rows which belong to transactions that are not valid.
+    findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+
+    // Case 2- find rows which have been deleted.
+    this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet);
+
+    if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) {
+      // None of the cases above matched and everything is selected. Hence, we will use the
+      // same values for the selected and selectedInUse.
+      value.size = vectorizedRowBatchBase.size;
+      value.selected = vectorizedRowBatchBase.selected;
+      value.selectedInUse = vectorizedRowBatchBase.selectedInUse;
+    } else {
+      value.size = selectedBitSet.cardinality();
+      value.selectedInUse = true;
+      value.selected = new int[selectedBitSet.cardinality()];
+      // This loop fills up the selected[] vector with all the index positions that are selected.
+      for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0;
+           setBitIndex >= 0;
+           setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) {
+        value.selected[selectedItr] = setBitIndex;
+      }
+    }
+
+    // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
+    // NOTE: We only link up the user columns and not the ACID metadata columns because this
+    // vectorized code path is not being used in cases of update/delete, when the metadata columns
+    // would be expected to be passed up the operator pipeline. This is because
+    // currently the update/delete specifically disable vectorized code paths.
+    // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
+    StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
+    // Transfer columnVector objects from base batch to outgoing batch.
+    System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
+    progress = baseReader.getProgress();
+    return true;
+  }
+
+  private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
+    if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
+      // When we have repeating values, we can unset the whole bitset at once
+      // if the repeating value is not a valid transaction.
+      long currentTransactionIdForBatch = ((LongColumnVector)
+          batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0];
+      if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) {
+        selectedBitSet.clear(0, batch.size);
+      }
+      return;
+    }
+    long[] currentTransactionVector =
+        ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector;
+    // Loop through the bits that are set to true and mark those rows as false, if their
+    // current transactions are not valid.
+    for (int setBitIndex = selectedBitSet.nextSetBit(0);
+        setBitIndex >= 0;
+        setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
+      if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) {
+        selectedBitSet.clear(setBitIndex);
+      }
+   }
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public VectorizedRowBatch createValue() {
+    return rbCtx.createVectorizedRowBatch();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return offset + (long) (progress * length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      this.baseReader.close();
+    } finally {
+      this.deleteEventRegistry.close();
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return progress;
+  }
+
+  @VisibleForTesting
+  DeleteEventRegistry getDeleteEventRegistry() {
+    return deleteEventRegistry;
+  }
+
+  /**
+   * An interface that can determine which rows have been deleted
+   * from a given vectorized row batch. Implementations of this interface
+   * will read the delete delta files and will create their own internal
+   * data structures to maintain record ids of the records that got deleted.
+   */
+  static interface DeleteEventRegistry {
+    /**
+     * Modifies the passed bitset to indicate which of the rows in the batch
+     * have been deleted. Assumes that the batch.size is equal to bitset size.
+     * @param batch
+     * @param selectedBitSet
+     * @throws IOException
+     */
+    public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException;
+
+    /**
+     * The close() method can be called externally to signal the implementing classes
+     * to free up resources.
+     * @throws IOException
+     */
+    public void close() throws IOException;
+  }
+
+  /**
+   * An implementation for DeleteEventRegistry that opens the delete delta files all
+   * at once, and then uses the sort-merge algorithm to maintain a sorted list of
+   * delete events. This internally uses the OrcRawRecordMerger and maintains a constant
+   * amount of memory usage, given the number of delete delta files. Therefore, this
+   * implementation will be picked up when the memory pressure is high.
+   */
+  static class SortMergedDeleteEventRegistry implements DeleteEventRegistry {
+    private OrcRawRecordMerger deleteRecords;
+    private OrcRawRecordMerger.ReaderKey deleteRecordKey;
+    private OrcStruct deleteRecordValue;
+    private boolean isDeleteRecordAvailable = true;
+    private ValidTxnList validTxnList;
+
+    public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
+      throws IOException {
+        final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
+        if (deleteDeltas.length > 0) {
+          int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+          String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+          this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+          this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
+                                                      validTxnList, readerOptions, deleteDeltas);
+          this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
+          this.deleteRecordValue = this.deleteRecords.createValue();
+          // Initialize the first value in the delete reader.
+          this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
+        } else {
+          this.isDeleteRecordAvailable = false;
+          this.deleteRecordKey = null;
+          this.deleteRecordValue = null;
+          this.deleteRecords = null;
+        }
+    }
+
+    @Override
+    public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet)
+        throws IOException {
+      if (!isDeleteRecordAvailable) {
+        return;
+      }
+
+      long[] originalTransaction =
+          batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
+              : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+      long[] bucket =
+          batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null
+              : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector;
+      long[] rowId =
+          batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null
+              : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
+
+      // The following repeatedX values will be set, if any of the columns are repeating.
+      long repeatedOriginalTransaction = (originalTransaction != null) ? -1
+          : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+      long repeatedBucket = (bucket != null) ? -1
+          : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+      long repeatedRowId = (rowId != null) ? -1
+          : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0];
+
+
+      // Get the first valid row in the batch still available.
+      int firstValidIndex = selectedBitSet.nextSetBit(0);
+      if (firstValidIndex == -1) {
+        return; // Everything in the batch has already been filtered out.
+      }
+      RecordIdentifier firstRecordIdInBatch =
+          new RecordIdentifier(
+              originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction,
+              bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket,
+              rowId != null ? (int)  rowId[firstValidIndex] : repeatedRowId);
+
+      // Get the last valid row in the batch still available.
+      int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1);
+      RecordIdentifier lastRecordIdInBatch =
+          new RecordIdentifier(
+              originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction,
+              bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket,
+              rowId != null ? (int)  rowId[lastValidIndex] : repeatedRowId);
+
+      // We must iterate over all the delete records, until we find one record with
+      // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records.
+      while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) {
+        isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue);
+        if (!isDeleteRecordAvailable) return; // exhausted all delete records, return.
+      }
+
+      // If we are here, then we have established that firstRecordInBatch <= deleteRecord.
+      // Now continue marking records which have been deleted until we reach the end of the batch
+      // or we exhaust all the delete records.
+
+      int currIndex = firstValidIndex;
+      RecordIdentifier currRecordIdInBatch = new RecordIdentifier();
+      while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) {
+        currRecordIdInBatch.setValues(
+            (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction,
+            (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket,
+            (rowId != null) ? rowId[currIndex] : repeatedRowId);
+
+        if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) {
+          // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted.
+          selectedBitSet.clear(currIndex);
+          currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index.
+        } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) {
+          // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the
+          // next record in the batch.
+          // But before that, can we short-circuit and skip the entire batch itself
+          // by checking if the deleteRecordId > lastRecordInBatch?
+          if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) {
+            return; // Yay! We short-circuited, skip everything remaining in the batch and return.
+          }
+          currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index.
+        } else {
+          // We have deleteRecordId < currRecordIdInBatch, we must now move on to find
+          // next the larger deleteRecordId that can possibly match anything in the batch.
+          isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue);
+        }
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.deleteRecords != null) {
+        this.deleteRecords.close();
+      }
+    }
+  }
+
+  /**
+   * An implementation for DeleteEventRegistry that optimizes for performance by loading
+   * all the delete events into memory at once from all the delete delta files.
+   * It starts by reading all the delete events through a regular sort merge logic
+   * into two vectors- one for original transaction id (otid), and the other for row id.
+   * (In the current version, since the bucket id should be same for all the delete deltas,
+   * it is not stored). The otids are likely to be repeated very often, as a single transaction
+   * often deletes thousands of rows. Hence, the otid vector is compressed to only store the
+   * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a
+   * record id is deleted or not, is done by performing a binary search on the
+   * compressed otid range. If a match is found, then a binary search is then performed on
+   * the larger rowId vector between the given toIndex and fromIndex. Of course, there is rough
+   * heuristic that prevents creation of an instance of this class if the memory pressure is high.
+   * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios.
+   */
+   static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {
+    /**
+     * A simple wrapper class to hold the (otid, rowId) pair.
+     */
+    static class DeleteRecordKey implements Comparable<DeleteRecordKey> {
+      private long originalTransactionId;
+      private long rowId;
+      public DeleteRecordKey() {
+        this.originalTransactionId = -1;
+        this.rowId = -1;
+      }
+      public DeleteRecordKey(long otid, long rowId) {
+        this.originalTransactionId = otid;
+        this.rowId = rowId;
+      }
+      public void set(long otid, long rowId) {
+        this.originalTransactionId = otid;
+        this.rowId = rowId;
+      }
+
+      @Override
+      public int compareTo(DeleteRecordKey other) {
+        if (other == null) {
+          return -1;
+        }
+        if (originalTransactionId != other.originalTransactionId) {
+          return originalTransactionId < other.originalTransactionId ? -1 : 1;
+        }
+        if (rowId != other.rowId) {
+          return rowId < other.rowId ? -1 : 1;
+        }
+        return 0;
+      }
+    }
+
+    /**
+     * This class actually reads the delete delta files in vectorized row batches.
+     * For every call to next(), it returns the next smallest record id in the file if available.
+     * Internally, the next() buffers a row batch and maintains an index pointer, reading the
+     * next batch when the previous batch is exhausted.
+     */
+    static class DeleteReaderValue {
+      private VectorizedRowBatch batch;
+      private final RecordReader recordReader;
+      private int indexPtrInBatch;
+      private final int bucketForSplit; // The bucket value should be same for all the records.
+      private final ValidTxnList validTxnList;
+
+      public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
+          ValidTxnList validTxnList) throws IOException {
+        this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
+        this.bucketForSplit = bucket;
+        this.batch = deleteDeltaReader.getSchema().createRowBatch();
+        if (!recordReader.nextBatch(batch)) { // Read the first batch.
+          this.batch = null; // Oh! the first batch itself was null. Close the reader.
+        }
+        this.indexPtrInBatch = 0;
+        this.validTxnList = validTxnList;
+      }
+
+      public boolean next(DeleteRecordKey deleteRecordKey) throws IOException {
+        if (batch == null) {
+          return false;
+        }
+        boolean isValidNext = false;
+        while (!isValidNext) {
+          if (indexPtrInBatch >= batch.size) {
+            // We have exhausted our current batch, read the next batch.
+            if (recordReader.nextBatch(batch)) {
+              // Whenever we are reading a batch, we must ensure that all the records in the batch
+              // have the same bucket id as the bucket id of the split. If not, throw exception.
+              // NOTE: this assertion might not hold, once virtual bucketing is in place. However,
+              // it should be simple to fix that case. Just replace check for bucket equality with
+              // a check for valid bucket mapping. Until virtual bucketing is added, it means
+              // either the split computation got messed up or we found some corrupted records.
+              long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+              if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating)
+                  || (bucketForRecord != bucketForSplit)){
+                throw new IOException("Corrupted records with different bucket ids "
+                    + "from the containing bucket file found! Expected bucket id "
+                    + bucketForSplit + ", however found the bucket id " + bucketForRecord);
+              }
+              indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning.
+            } else {
+              return false; // no more batches to read, exhausted the reader.
+            }
+          }
+          int originalTransactionIndex =
+              batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+          long originalTransaction =
+              ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+          long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
+          int currentTransactionIndex =
+              batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+          long currentTransaction =
+              ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+          ++indexPtrInBatch;
+          if (validTxnList.isTxnValid(currentTransaction)) {
+            isValidNext = true;
+            deleteRecordKey.set(originalTransaction, rowId);
+          }
+        }
+        return true;
+      }
+
+      public void close() throws IOException {
+        this.recordReader.close();
+      }
+    }
+
+    /**
+     * A CompressedOtid class stores a compressed representation of the original
+     * transaction ids (otids) read from the delete delta files. Since the record ids
+     * are sorted by (otid, rowId) and otids are highly likely to be repetitive, it is
+     * efficient to compress them as a CompressedOtid that stores the fromIndex and
+     * the toIndex. These fromIndex and toIndex reference the larger vector formed by
+     * concatenating the correspondingly ordered rowIds.
+     */
+    private class CompressedOtid implements Comparable<CompressedOtid> {
+      long originalTransactionId;
+      int fromIndex; // inclusive
+      int toIndex; // exclusive
+
+      public CompressedOtid(long otid, int fromIndex, int toIndex) {
+        this.originalTransactionId = otid;
+        this.fromIndex = fromIndex;
+        this.toIndex = toIndex;
+      }
+
+      @Override
+      public int compareTo(CompressedOtid other) {
+        // When comparing the CompressedOtid, the one with the lesser value is smaller.
+        if (originalTransactionId != other.originalTransactionId) {
+          return originalTransactionId < other.originalTransactionId ? -1 : 1;
+        }
+        return 0;
+      }
+    }
+
+    private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
+    private long rowIds[];
+    private CompressedOtid compressedOtids[];
+    private ValidTxnList validTxnList;
+
+    public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
+        Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
+      int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+      String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+      this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+      this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
+      this.rowIds = null;
+      this.compressedOtids = null;
+      int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY);
+
+      try {
+        final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
+        if (deleteDeltaDirs.length > 0) {
+          int totalDeleteEventCount = 0;
+          for (Path deleteDeltaDir : deleteDeltaDirs) {
+            Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket);
+            FileSystem fs = deleteDeltaFile.getFileSystem(conf);
+            // NOTE: Calling last flush length below is more for future-proofing when we have
+            // streaming deletes. But currently we don't support streaming deletes, and this can
+            // be removed if this becomes a performance issue.
+            long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
+            // NOTE: A check for existence of deleteDeltaFile is required because we may not have
+            // deletes for the bucket being taken into consideration for this split processing.
+            if (length != -1 && fs.exists(deleteDeltaFile)) {
+              Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
+                  OrcFile.readerOptions(conf).maxLength(length));
+              AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
+              if (acidStats.deletes == 0) {
+                continue; // just a safe check to ensure that we are not reading empty delete files.
+              }
+              totalDeleteEventCount += acidStats.deletes;
+              if (totalDeleteEventCount > maxEventsInMemory) {
+                // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
+                // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
+                // prevents creation of an object of this class if the total number of delete events
+                // exceed this value. By default, it has been set to 10 million delete events per bucket.
+                LOG.info("Total number of delete events exceeds the maximum number of delete events "
+                    + "that can be loaded into memory for the delete deltas in the directory at : "
+                    + deleteDeltaDirs.toString() +". The max limit is currently set at "
+                    + maxEventsInMemory + " and can be changed by setting the Hive config variable "
+                    + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
+                throw new DeleteEventsOverflowMemoryException();
+              }
+              DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
+                  readerOptions, bucket, validTxnList);
+              DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+              if (deleteReaderValue.next(deleteRecordKey)) {
+                sortMerger.put(deleteRecordKey, deleteReaderValue);
+              } else {
+                deleteReaderValue.close();
+              }
+            }
+          }
+          if (totalDeleteEventCount > 0) {
+            // Initialize the rowId array when we have some delete events.
+            rowIds = new long[totalDeleteEventCount];
+            readAllDeleteEventsFromDeleteDeltas();
+          }
+        }
+      } catch(IOException|DeleteEventsOverflowMemoryException e) {
+        close(); // close any open readers, if there was some exception during initialization.
+        throw e; // rethrow the exception so that the caller can handle.
+      }
+    }
+
+    private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
+      if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read.
+      int distinctOtids = 0;
+      long lastSeenOtid = -1;
+      long otids[] = new long[rowIds.length];
+      int index = 0;
+      while (!sortMerger.isEmpty()) {
+        // The sortMerger is a heap data structure that stores a pair of
+        // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey.
+        // The deleteReaderValue is the actual wrapper class that has the reference to the
+        // underlying delta file that is being read, and its corresponding deleteRecordKey
+        // is the smallest record id for that file. In each iteration of this loop, we extract(poll)
+        // the minimum deleteRecordKey pair. Once we have processed that deleteRecordKey, we
+        // advance the pointer for the corresponding deleteReaderValue. If the underlying file
+        // itself has no more records, then we remove that pair from the heap, or else we
+        // add the updated pair back to the heap.
+        Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
+        DeleteRecordKey deleteRecordKey = entry.getKey();
+        DeleteReaderValue deleteReaderValue = entry.getValue();
+        otids[index] = deleteRecordKey.originalTransactionId;
+        rowIds[index] = deleteRecordKey.rowId;
+        ++index;
+        if (lastSeenOtid != deleteRecordKey.originalTransactionId) {
+          ++distinctOtids;
+          lastSeenOtid = deleteRecordKey.originalTransactionId;
+        }
+        if (deleteReaderValue.next(deleteRecordKey)) {
+          sortMerger.put(deleteRecordKey, deleteReaderValue);
+        } else {
+          deleteReaderValue.close(); // Exhausted reading all records, close the reader.
+        }
+      }
+
+      // Once we have processed all the delete events and seen all the distinct otids,
+      // we compress the otids into CompressedOtid data structure that records
+      // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid.
+      this.compressedOtids = new CompressedOtid[distinctOtids];
+      lastSeenOtid = otids[0];
+      int fromIndex = 0, pos = 0;
+      for (int i = 1; i < otids.length; ++i) {
+        if (otids[i] != lastSeenOtid) {
+          compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i);
+          lastSeenOtid = otids[i];
+          fromIndex = i;
+          ++pos;
+        }
+      }
+      // account for the last distinct otid
+      compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length);
+    }
+
+    private boolean isDeleted(long otid, long rowId) {
+      if (compressedOtids == null || rowIds == null) {
+        return false;
+      }
+      // To find if a given (otid, rowId) pair is deleted or not, we perform
+      // two binary searches at most. The first binary search is on the
+      // compressed otids. If a match is found, only then we do the next
+      // binary search in the larger rowId vector between the given toIndex & fromIndex.
+
+      // Check if otid is outside the range of all otids present.
+      if (otid < compressedOtids[0].originalTransactionId
+          || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) {
+        return false;
+      }
+      // Create a dummy key for searching the otid in the compressed otid ranges.
+      CompressedOtid key = new CompressedOtid(otid, -1, -1);
+      int pos = Arrays.binarySearch(compressedOtids, key);
+      if (pos >= 0) {
+        // Otid with the given value found! Searching now for rowId...
+        key = compressedOtids[pos]; // Retrieve the actual CompressedOtid that matched.
+        // Check if rowId is outside the range of all rowIds present for this otid.
+        if (rowId < rowIds[key.fromIndex]
+            || rowId > rowIds[key.toIndex - 1]) {
+          return false;
+        }
+        if (Arrays.binarySearch(rowIds, key.fromIndex, key.toIndex, rowId) >= 0) {
+          return true; // rowId also found!
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet)
+        throws IOException {
+      if (rowIds == null || compressedOtids == null) {
+        return;
+      }
+      // Iterate through the batch and for each (otid, rowid) in the batch
+      // check if it is deleted or not.
+
+      long[] originalTransactionVector =
+          batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
+              : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+      long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1
+          : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+
+      long[] rowIdVector =
+          ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
+
+      for (int setBitIndex = selectedBitSet.nextSetBit(0);
+          setBitIndex >= 0;
+          setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
+        long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex]
+                                                    : repeatedOriginalTransaction ;
+        long rowId = rowIdVector[setBitIndex];
+        if (isDeleted(otid, rowId)) {
+          selectedBitSet.clear(setBitIndex);
+        }
+     }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // ColumnizedDeleteEventRegistry reads all the delete events into memory during initialization
+      // and it closes the delete event readers after it. If an exception gets thrown during
+      // initialization, we may have to close any readers that are still left open.
+      while (!sortMerger.isEmpty()) {
+        Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
+        entry.getValue().close(); // close the reader for this entry
+      }
+    }
+  }
+
+  static class DeleteEventsOverflowMemoryException extends Exception {
+    private static final long serialVersionUID = 1L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
new file mode 100644
index 0000000..44a9412
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by
+ * default, and having 'transactional_properties' set to 'default'. This specifically tests the
+ * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
+ */
+public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate {
+
+  public TestTxnCommands2WithSplitUpdateAndVectorization() {
+    super();
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+  }
+
+  @Override
+  @Test
+  public void testFailureOnAlteringTransactionalProperties() throws Exception {
+    // Override to do nothing, as the this test is not related with vectorization.
+    // The parent class creates a temporary table in this test and alters its properties.
+    // To not override this test, that temporary table needs to be renamed. However, as
+    // mentioned this does not serve any purpose, as this test does not relate to vectorization.
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
new file mode 100644
index 0000000..4656ab2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+/**
+ * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set
+ * of delete delta files. The split is on an insert delta and there are multiple delete deltas
+ * with interleaving list of record ids that get deleted. Correctness is tested by validating
+ * that the correct set of record ids are returned in sorted order for valid transactions only.
+ */
+public class TestVectorizedOrcAcidRowBatchReader {
+
+  private static final long NUM_ROWID_PER_OTID = 15000L;
+  private static final long NUM_OTID = 10L;
+  private JobConf conf;
+  private FileSystem fs;
+  private Path root;
+
+  static class DummyRow {
+    LongWritable field;
+    RecordIdentifier ROW__ID;
+
+    DummyRow(long val) {
+      field = new LongWritable(val);
+      ROW__ID = null;
+    }
+
+    DummyRow(long val, long rowId, long origTxn, int bucket) {
+      field = new LongWritable(val);
+      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+    }
+
+    static String getColumnNamesProperty() {
+      return "x";
+    }
+    static String getColumnTypesProperty() {
+      return "bigint";
+    }
+
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new JobConf();
+    conf.set("bucket_count", "1");
+    conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true);
+    conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, DummyRow.getColumnNamesProperty());
+    conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, DummyRow.getColumnTypesProperty());
+    conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+    conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+    fs = FileSystem.getLocal(conf);
+    Path workDir = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp"));
+    root = new Path(workDir, "TestVectorizedOrcAcidRowBatch.testDump");
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    int bucket = 0;
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .filesystem(fs)
+        .bucket(bucket)
+        .writingBase(false)
+        .minimumTransactionId(1)
+        .maximumTransactionId(NUM_OTID)
+        .inspector(inspector)
+        .reporter(Reporter.NULL)
+        .recordIdColumn(1)
+        .finalDestination(root);
+    RecordUpdater updater = new OrcRecordUpdater(root, options);
+    // Create a single insert delta with 150,000 rows, with 15000 rowIds per original transaction id.
+    for (long i = 1; i <= NUM_OTID; ++i) {
+      for (long j = 0; j < NUM_ROWID_PER_OTID; ++j) {
+        long payload = (i-1) * NUM_ROWID_PER_OTID + j;
+        updater.insert(i, new DummyRow(payload, j, i, bucket));
+      }
+    }
+    updater.close(false);
+
+    // Now create three types of delete deltas- first has rowIds divisible by 2 but not by 3,
+    // second has rowIds divisible by 3 but not by 2, and the third has rowIds divisible by
+    // both 2 and 3. This should produce delete deltas that will thoroughly test the sort-merge
+    // logic when the delete events in the delete delta files interleave in the sort order.
+
+    // Create a delete delta that has rowIds divisible by 2 but not by 3. This will produce
+    // a delete delta file with 50,000 delete events.
+    long currTxnId = NUM_OTID + 1;
+    options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+    updater = new OrcRecordUpdater(root, options);
+    for (long i = 1; i <= NUM_OTID; ++i) {
+      for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+        if (j % 2 == 0 && j % 3 != 0) {
+          updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+        }
+      }
+    }
+    updater.close(false);
+    // Now, create a delete delta that has rowIds divisible by 3 but not by 2. This will produce
+    // a delete delta file with 25,000 delete events.
+    currTxnId = NUM_OTID + 2;
+    options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+    updater = new OrcRecordUpdater(root, options);
+    for (long i = 1; i <= NUM_OTID; ++i) {
+      for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+        if (j % 2 != 0 && j % 3 == 0) {
+          updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+        }
+      }
+    }
+    updater.close(false);
+    // Now, create a delete delta that has rowIds divisible by both 3 and 2. This will produce
+    // a delete delta file with 25,000 delete events.
+    currTxnId = NUM_OTID + 3;
+    options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+    updater = new OrcRecordUpdater(root, options);
+    for (long i = 1; i <= NUM_OTID; ++i) {
+      for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+        if (j % 2 == 0 && j % 3 == 0) {
+          updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+        }
+      }
+    }
+    updater.close(false);
+  }
+
+  private List<OrcSplit> getSplits() throws Exception {
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+    OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
+    OrcInputFormat.AcidDirInfo adi = gen.call();
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
+        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        null, null, true);
+    assertEquals(1, splitStrategies.size());
+    List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(1, splits.size());
+    assertEquals("file://" + root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
+        splits.get(0).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    return splits;
+  }
+
+  @Test
+  public void testVectorizedOrcAcidRowBatchReader() throws Exception {
+    testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());
+
+    // To test the SortMergedDeleteEventRegistry, we need to explicitly set the
+    // HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
+    int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
+    conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
+    testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());
+
+    // Restore the old value.
+    conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
+  }
+
+
+  private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) throws Exception {
+    List<OrcSplit> splits = getSplits();
+    // Mark one of the transactions as an exception to test that invalid transactions
+    // are being handled properly.
+    conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5
+
+    VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL);
+    if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) {
+      assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry);
+    }
+    if (deleteEventRegistry.equals(SortMergedDeleteEventRegistry.class.getName())) {
+      assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof SortMergedDeleteEventRegistry);
+    }
+    TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+    VectorizedRowBatch vectorizedRowBatch = schema.createRowBatch();
+    vectorizedRowBatch.setPartitionInfo(1, 0); // set data column count as 1.
+    long previousPayload = Long.MIN_VALUE;
+    while (vectorizedReader.next(null, vectorizedRowBatch)) {
+      assertTrue(vectorizedRowBatch.selectedInUse);
+      LongColumnVector col = (LongColumnVector) vectorizedRowBatch.cols[0];
+      for (int i = 0; i < vectorizedRowBatch.size; ++i) {
+        int idx = vectorizedRowBatch.selected[i];
+        long payload = col.vector[idx];
+        long otid = (payload / NUM_ROWID_PER_OTID) + 1;
+        long rowId = payload % NUM_ROWID_PER_OTID;
+        assertFalse(rowId % 2 == 0 || rowId % 3 == 0);
+        assertTrue(otid != 5); // Check that txn#5 has been excluded.
+        assertTrue(payload > previousPayload); // Check that the data is in sorted order.
+        previousPayload = payload;
+      }
+    }
+  }
+
+  @Test
+  public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
+    OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getLegacy().toInt());
+    // Test false when trying to create a vectorized ACID row batch reader for a legacy table.
+    assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    Mockito.when(mockSplit.isOriginal()).thenReturn(true);
+    // Test false when trying to create a vectorized ACID row batch reader when reading originals.
+    assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+
+    // A positive test case.
+    Mockito.when(mockSplit.isOriginal()).thenReturn(false);
+    assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/queries/clientpositive/acid_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization.q b/ql/src/test/queries/clientpositive/acid_vectorization.q
index 832909b..4c37563 100644
--- a/ql/src/test/queries/clientpositive/acid_vectorization.q
+++ b/ql/src/test/queries/clientpositive/acid_vectorization.q
@@ -15,3 +15,15 @@ set hive.vectorized.execution.enabled=true;
 delete from acid_vectorized where b = 'foo';
 set hive.vectorized.execution.enabled=true;
 select a, b from acid_vectorized order by a, b;
+
+
+CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10;
+set hive.vectorized.execution.enabled=true;
+insert into table acid_fast_vectorized values (1, 'bar');
+set hive.vectorized.execution.enabled=true;
+update acid_fast_vectorized set b = 'foo' where b = 'bar';
+set hive.vectorized.execution.enabled=true;
+delete from acid_fast_vectorized where b = 'foo';
+set hive.vectorized.execution.enabled=true;
+select a, b from acid_fast_vectorized order by a, b;

http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/results/clientpositive/acid_vectorization.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_vectorization.q.out b/ql/src/test/results/clientpositive/acid_vectorization.q.out
index 1792979..24330bd 100644
--- a/ql/src/test/results/clientpositive/acid_vectorization.q.out
+++ b/ql/src/test/results/clientpositive/acid_vectorization.q.out
@@ -60,3 +60,65 @@ POSTHOOK: Input: default@acid_vectorized
 -1070883071	0ruyd6Y50JpdGRf6HqD
 -1070551679	iUR3Q
 -1069736047	k17Am8uPHWk02cEf1jet
+PREHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: Lineage: acid_fast_vectorized.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+PREHOOK: query: insert into table acid_fast_vectorized values (1, 'bar')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: insert into table acid_fast_vectorized values (1, 'bar')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: Lineage: acid_fast_vectorized.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: delete from acid_fast_vectorized where b = 'foo'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: delete from acid_fast_vectorized where b = 'foo'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: select a, b from acid_fast_vectorized order by a, b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+#### A masked pattern was here ####
+POSTHOOK: query: select a, b from acid_fast_vectorized order by a, b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+#### A masked pattern was here ####
+-1073279343	oj1YrV5Wa
+-1073051226	A34p7oRr2WvUJNf
+-1072910839	0iqrc5
+-1072081801	dPkN74F7
+-1072076362	2uLyD28144vklju213J1mr
+-1071480828	aw724t8c5558x2xneC624
+-1071363017	Anj0oF
+-1070883071	0ruyd6Y50JpdGRf6HqD
+-1070551679	iUR3Q
+-1069736047	k17Am8uPHWk02cEf1jet


[8/9] hive git commit: HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2cef25db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2cef25db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2cef25db

Branch: refs/heads/hive-14535
Commit: 2cef25db66161fa83921913031cd8f408f8b83e2
Parents: 67f1b92 3187fb3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 31 16:04:07 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 31 16:04:07 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +
 .../apache/hadoop/hive/ql/TestMTQueries.java    |   3 +-
 .../test/resources/testconfiguration.properties |   3 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  11 +-
 .../hadoop/hive/metastore/txn/TestTxnUtils.java |   6 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 134 +--
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |   7 +-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java | 822 ++++++++++++++++++
 .../hive/ql/metadata/HiveMetaStoreChecker.java  |  16 +-
 .../ql/optimizer/pcr/PcrExprProcFactory.java    | 423 +++++----
 .../hadoop/hive/ql/parse/ParseContext.java      |  25 +
 .../hive/ql/udf/generic/GenericUDFAbs.java      |   3 +
 ...ommands2WithSplitUpdateAndVectorization.java |  52 ++
 .../TestVectorizedOrcAcidRowBatchReader.java    | 263 ++++++
 .../hive/ql/udf/generic/TestGenericUDFAbs.java  |   6 +
 .../queries/clientpositive/acid_vectorization.q |  12 +
 ql/src/test/queries/clientpositive/orc_llap.q   |  76 +-
 .../partition_condition_remover.q               |  13 +
 .../clientpositive/acid_vectorization.q.out     |  62 ++
 .../results/clientpositive/llap/orc_llap.q.out  | 866 ++++++++++---------
 .../clientpositive/orc_merge_diff_fs.q.out      |  18 +-
 .../partition_condition_remover.q.out           |  79 ++
 ql/src/test/results/clientpositive/pcs.q.out    |   4 +-
 23 files changed, 2163 insertions(+), 747 deletions(-)
----------------------------------------------------------------------



[2/9] hive git commit: HIVE-14290: Refactor HIVE-14054 to use Collections#newSetFromMap (Peter Slawski via Ashutosh Chauhan)

Posted by se...@apache.org.
HIVE-14290: Refactor HIVE-14054 to use Collections#newSetFromMap (Peter Slawski via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ab605910
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ab605910
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ab605910

Branch: refs/heads/hive-14535
Commit: ab605910e45cfa65b9895399f09194801f2cc091
Parents: ec4673b
Author: Peter Slawski <pe...@amazon.com>
Authored: Mon Jun 20 10:18:25 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Aug 30 17:52:43 2016 -0700

----------------------------------------------------------------------
 .../hive/ql/metadata/HiveMetaStoreChecker.java      | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ab605910/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 34b76b8..13d9651 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -379,9 +379,7 @@ public class HiveMetaStoreChecker {
   private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
     ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
     basePaths.add(basePath);
-    // we only use the keySet of ConcurrentHashMap
-    // Neither the key nor the value can be null.
-    Map<Path, Object> dirSet = new ConcurrentHashMap<>();
+    Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());    
     // Here we just reuse the THREAD_COUNT configuration for
     // HIVE_MOVE_FILES_THREAD_COUNT
     final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors
@@ -396,12 +394,12 @@ public class HiveMetaStoreChecker {
     }
     checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth);
     pool.shutdown();
-    allDirs.addAll(dirSet.keySet());
+    allDirs.addAll(dirSet);
   }
 
   // process the basePaths in parallel and then the next level of basePaths
   private void checkPartitionDirs(final ExecutorService pool,
-      final ConcurrentLinkedQueue<Path> basePaths, final Map<Path, Object> allDirs,
+      final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs,
       final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException {
     final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
     if (null == pool) {
@@ -437,9 +435,7 @@ public class HiveMetaStoreChecker {
             }
           }
         } else {
-          // true is just a boolean object place holder because neither the
-          // key nor the value can be null.
-          allDirs.put(path, true);
+          allDirs.add(path);
         }
       }
     } else {
@@ -483,9 +479,7 @@ public class HiveMetaStoreChecker {
                 }
               }
             } else {
-              // true is just a boolean object place holder because neither the
-              // key nor the value can be null.
-              allDirs.put(path, true);
+              allDirs.add(path);
             }
             return null;
           }


[7/9] hive git commit: HIVE-14652 : incorrect results for not in on partition columns (Sergey Shelukhin, reviewed by Jesus Camacho Rodriguez)

Posted by se...@apache.org.
HIVE-14652 : incorrect results for not in on partition columns (Sergey Shelukhin, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3187fb36
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3187fb36
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3187fb36

Branch: refs/heads/hive-14535
Commit: 3187fb369fb6e27a542edbc110df528a444987fa
Parents: 8f5dee8
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 31 15:36:47 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 31 15:42:40 2016 -0700

----------------------------------------------------------------------
 .../ql/optimizer/pcr/PcrExprProcFactory.java    | 423 +++++++++----------
 .../hadoop/hive/ql/parse/ParseContext.java      |  25 ++
 .../partition_condition_remover.q               |  13 +
 .../partition_condition_remover.q.out           |  79 ++++
 ql/src/test/results/clientpositive/pcs.q.out    |   4 +-
 5 files changed, 330 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
index f9388e2..461dbe5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer.pcr;
 
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -185,7 +187,7 @@ public final class PcrExprProcFactory {
   }
 
   public enum WalkState {
-    PART_COL, TRUE, FALSE, CONSTANT, UNKNOWN, DIVIDED
+    PART_COL, TRUE, FALSE, CONSTANT, UNKNOWN, DIVIDED, PART_COL_STRUCT
   }
 
   public static class NodeInfoWrapper {
@@ -253,242 +255,239 @@ public final class PcrExprProcFactory {
         Object... nodeOutputs) throws SemanticException {
       PcrExprProcCtx ctx = (PcrExprProcCtx) procCtx;
       ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) nd;
+      if (LOG.isDebugEnabled()) {
+        String err = "Processing " + fd.getExprString() + " "
+            + fd.getGenericUDF().getUdfName() + " outputs ";
+        for (Object child : nodeOutputs) {
+          NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
+          err += "{" + wrapper.state + ", " + wrapper.outExpr + "}, ";
+        }
+        LOG.debug(err);
+      }
 
       if (FunctionRegistry.isOpNot(fd)) {
-        assert (nodeOutputs.length == 1);
-        NodeInfoWrapper wrapper = (NodeInfoWrapper) nodeOutputs[0];
-        if (wrapper.state == WalkState.TRUE) {
-          ExprNodeConstantDesc falseDesc = new ExprNodeConstantDesc(
-              wrapper.outExpr.getTypeInfo(), Boolean.FALSE);
-          return new NodeInfoWrapper(WalkState.FALSE, null, falseDesc);
-        } else if (wrapper.state == WalkState.FALSE) {
-          ExprNodeConstantDesc trueDesc = new ExprNodeConstantDesc(
-              wrapper.outExpr.getTypeInfo(), Boolean.TRUE);
-          return new NodeInfoWrapper(WalkState.TRUE, null, trueDesc);
-        } else if (wrapper.state == WalkState.DIVIDED) {
+        return handleUdfNot(ctx, fd, nodeOutputs);
+      } else if (FunctionRegistry.isOpAnd(fd)) {
+        return handleUdfAnd(ctx, fd, nodeOutputs);
+      } else if (FunctionRegistry.isOpOr(fd)) {
+        return handleUdfOr(ctx, fd, nodeOutputs);
+      } else if (FunctionRegistry.isIn(fd)) {
+        List<ExprNodeDesc> children = fd.getChildren();
+        // We should not remove the dynamic partition pruner generated synthetic predicates.
+        for (int i = 1; i < children.size(); i++) {
+          if (children.get(i) instanceof ExprNodeDynamicListDesc) {
+            return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+          }
+        }
+        // Otherwise, handle like a normal generic UDF.
+        return handleDeterministicUdf(ctx, fd, nodeOutputs);
+      } else if (fd.getGenericUDF() instanceof GenericUDFStruct) {
+        // Handle structs composed of partition columns,
+        for (Object child : nodeOutputs) {
+          NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
+          if (wrapper.state != WalkState.PART_COL) {
+            return handleDeterministicUdf(ctx, fd, nodeOutputs); // Giving up.
+          }
+        }
+        return new NodeInfoWrapper(WalkState.PART_COL_STRUCT, null, getOutExpr(fd, nodeOutputs));
+      } else if (!FunctionRegistry.isDeterministic(fd.getGenericUDF())) {
+        // If it's a non-deterministic UDF, set unknown to true
+        return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+      } else {
+        return handleDeterministicUdf(ctx, fd, nodeOutputs);
+      }
+    }
+
+    private Object handleDeterministicUdf(PcrExprProcCtx ctx,
+        ExprNodeGenericFuncDesc fd, Object... nodeOutputs)
+        throws SemanticException {
+      Boolean has_part_col = checkForPartColsAndUnknown(fd, nodeOutputs);
+      if (has_part_col == null) {
+        return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+      }
+
+      if (has_part_col && fd.getTypeInfo().getCategory() == Category.PRIMITIVE) {
+        //  we need to evaluate result for every pruned partition
+        if (fd.getTypeInfo().equals(TypeInfoFactory.booleanTypeInfo)) {
+          // if the return type of the GenericUDF is boolean and all partitions agree on
+          // a result, we update the state of the node to be TRUE of FALSE
           Boolean[] results = new Boolean[ctx.getPartList().size()];
           for (int i = 0; i < ctx.getPartList().size(); i++) {
-            results[i] = opNot(wrapper.ResultVector[i]);
+            results[i] = (Boolean) evalExprWithPart(fd, ctx.getPartList().get(i),
+                ctx.getVirtualColumns());
           }
-          return new NodeInfoWrapper(WalkState.DIVIDED, results,
-              getOutExpr(fd, nodeOutputs));
+          return getResultWrapFromResults(results, fd, nodeOutputs);
+        }
+
+        // the case that return type of the GenericUDF is not boolean, and if not all partition
+        // agree on result, we make the node UNKNOWN. If they all agree, we replace the node
+        // to be a CONSTANT node with value to be the agreed result.
+        Object[] results = new Object[ctx.getPartList().size()];
+        for (int i = 0; i < ctx.getPartList().size(); i++) {
+          results[i] = evalExprWithPart(fd, ctx.getPartList().get(i), ctx.getVirtualColumns());
+        }
+        Object result = ifResultsAgree(results);
+        if (result == null) {
+          // if the result is not boolean and not all partition agree on the
+          // result, we don't remove the condition. Potentially, it can miss
+          // the case like "where ds % 3 == 1 or ds % 3 == 2"
+          // TODO: handle this case by making result vector to handle all
+          // constant values.
+          return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+        }
+        return new NodeInfoWrapper(WalkState.CONSTANT, null,
+            new ExprNodeConstantDesc(fd.getTypeInfo(), result));
+      }
+
+      // Try to fold, otherwise return the expression itself
+      final ExprNodeGenericFuncDesc desc = getOutExpr(fd, nodeOutputs);
+      final ExprNodeDesc foldedDesc = ConstantPropagateProcFactory.foldExpr(desc);
+      if (foldedDesc != null && foldedDesc instanceof ExprNodeConstantDesc) {
+        ExprNodeConstantDesc constant = (ExprNodeConstantDesc) foldedDesc;
+        if (Boolean.TRUE.equals(constant.getValue())) {
+          return new NodeInfoWrapper(WalkState.TRUE, null, constant);
+        } else if (Boolean.FALSE.equals(constant.getValue())) {
+          return new NodeInfoWrapper(WalkState.FALSE, null, constant);
         } else {
-          return new NodeInfoWrapper(wrapper.state, null,
-              getOutExpr(fd, nodeOutputs));
+          return new NodeInfoWrapper(WalkState.CONSTANT, null, constant);
         }
-      } else if (FunctionRegistry.isOpAnd(fd)) {
-        boolean anyUnknown = false; // Whether any of the node outputs is unknown
-        boolean allDivided = true; // Whether all of the node outputs are divided
-        List<NodeInfoWrapper> newNodeOutputsList =
-                new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
-        for (int i = 0; i < nodeOutputs.length; i++) {
-          NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
-          if (c.state == WalkState.FALSE) {
-            return c;
-          }
-          if (c.state == WalkState.UNKNOWN) {
-            anyUnknown = true;
-          }
-          if (c.state != WalkState.DIVIDED) {
-            allDivided = false;
-          }
-          if (c.state != WalkState.TRUE) {
-            newNodeOutputsList.add(c);
-          }
+      }
+      return new NodeInfoWrapper(WalkState.CONSTANT, null, desc);
+    }
+
+    private Boolean checkForPartColsAndUnknown(ExprNodeGenericFuncDesc fd,
+        Object... nodeOutputs) {
+      boolean has_part_col = false;
+      for (Object child : nodeOutputs) {
+        NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
+        if (wrapper.state == WalkState.UNKNOWN) {
+          return null;
+        } else if (wrapper.state == WalkState.PART_COL
+            || wrapper.state == WalkState.PART_COL_STRUCT) {
+          has_part_col = true;
         }
-        // If all of them were true, return true
-        if (newNodeOutputsList.size() == 0) {
-          return new NodeInfoWrapper(WalkState.TRUE, null,
-                  new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.TRUE));
+      }
+      return has_part_col;
+    }
+
+    private Object handleUdfOr(PcrExprProcCtx ctx, ExprNodeGenericFuncDesc fd,
+        Object... nodeOutputs) {
+      boolean anyUnknown = false; // Whether any of the node outputs is unknown
+      boolean allDivided = true; // Whether all of the node outputs are divided
+      List<NodeInfoWrapper> newNodeOutputsList =
+              new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
+      for (int i = 0; i< nodeOutputs.length; i++) {
+        NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
+        if (c.state == WalkState.TRUE) {
+          return c;
         }
-        // If we are left with a single child, return the child
-        if (newNodeOutputsList.size() == 1) {
-          return newNodeOutputsList.get(0);
+        if (c.state == WalkState.UNKNOWN) {
+          anyUnknown = true;
         }
-        Object[] newNodeOutputs = newNodeOutputsList.toArray();
-        if (anyUnknown) {
-          return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+        if (c.state != WalkState.DIVIDED) {
+          allDivided = false;
         }
-        if (allDivided) {
-          Boolean[] results = new Boolean[ctx.getPartList().size()];
-          for (int i = 0; i < ctx.getPartList().size(); i++) {
-            Boolean[] andArray = new Boolean[newNodeOutputs.length];
-            for (int j = 0; j < newNodeOutputs.length; j++) {
-              andArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
-            }
-            results[i] = opAnd(andArray);
-          }
-          return getResultWrapFromResults(results, fd, newNodeOutputs);
+        if (c.state != WalkState.FALSE) {
+          newNodeOutputsList.add(c);
         }
+      }
+      // If all of them were false, return false
+      if (newNodeOutputsList.size() == 0) {
+        return new NodeInfoWrapper(WalkState.FALSE, null,
+                new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.FALSE));
+      }
+      // If we are left with a single child, return the child
+      if (newNodeOutputsList.size() == 1) {
+        return newNodeOutputsList.get(0);
+      }
+      Object[] newNodeOutputs = newNodeOutputsList.toArray();
+      if (anyUnknown) {
         return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
-      } else if (FunctionRegistry.isOpOr(fd)) {
-        boolean anyUnknown = false; // Whether any of the node outputs is unknown
-        boolean allDivided = true; // Whether all of the node outputs are divided
-        List<NodeInfoWrapper> newNodeOutputsList =
-                new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
-        for (int i = 0; i< nodeOutputs.length; i++) {
-          NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
-          if (c.state == WalkState.TRUE) {
-            return c;
-          }
-          if (c.state == WalkState.UNKNOWN) {
-            anyUnknown = true;
-          }
-          if (c.state != WalkState.DIVIDED) {
-            allDivided = false;
-          }
-          if (c.state != WalkState.FALSE) {
-            newNodeOutputsList.add(c);
+      }
+      if (allDivided) {
+        Boolean[] results = new Boolean[ctx.getPartList().size()];
+        for (int i = 0; i < ctx.getPartList().size(); i++) {
+          Boolean[] orArray = new Boolean[newNodeOutputs.length];
+          for (int j = 0; j < newNodeOutputs.length; j++) {
+            orArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
           }
+          results[i] = opOr(orArray);
         }
-        // If all of them were false, return false
-        if (newNodeOutputsList.size() == 0) {
-          return new NodeInfoWrapper(WalkState.FALSE, null,
-                  new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.FALSE));
+        return getResultWrapFromResults(results, fd, newNodeOutputs);
+      }
+      return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+    }
+
+    private Object handleUdfAnd(PcrExprProcCtx ctx, ExprNodeGenericFuncDesc fd,
+        Object... nodeOutputs) {
+      boolean anyUnknown = false; // Whether any of the node outputs is unknown
+      boolean allDivided = true; // Whether all of the node outputs are divided
+      List<NodeInfoWrapper> newNodeOutputsList =
+              new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
+      for (int i = 0; i < nodeOutputs.length; i++) {
+        NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
+        if (c.state == WalkState.FALSE) {
+          return c;
         }
-        // If we are left with a single child, return the child
-        if (newNodeOutputsList.size() == 1) {
-          return newNodeOutputsList.get(0);
+        if (c.state == WalkState.UNKNOWN) {
+          anyUnknown = true;
         }
-        Object[] newNodeOutputs = newNodeOutputsList.toArray();
-        if (anyUnknown) {
-          return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+        if (c.state != WalkState.DIVIDED) {
+          allDivided = false;
         }
-        if (allDivided) {
-          Boolean[] results = new Boolean[ctx.getPartList().size()];
-          for (int i = 0; i < ctx.getPartList().size(); i++) {
-            Boolean[] orArray = new Boolean[newNodeOutputs.length];
-            for (int j = 0; j < newNodeOutputs.length; j++) {
-              orArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
-            }
-            results[i] = opOr(orArray);
-          }
-          return getResultWrapFromResults(results, fd, newNodeOutputs);
+        if (c.state != WalkState.TRUE) {
+          newNodeOutputsList.add(c);
         }
+      }
+      // If all of them were true, return true
+      if (newNodeOutputsList.size() == 0) {
+        return new NodeInfoWrapper(WalkState.TRUE, null,
+                new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.TRUE));
+      }
+      // If we are left with a single child, return the child
+      if (newNodeOutputsList.size() == 1) {
+        return newNodeOutputsList.get(0);
+      }
+      Object[] newNodeOutputs = newNodeOutputsList.toArray();
+      if (anyUnknown) {
         return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
-      } else if (FunctionRegistry.isIn(fd)) {
-        List<ExprNodeDesc> children = fd.getChildren();
-        boolean removePredElem = false;
-        ExprNodeDesc lhs = children.get(0);
-
-        if (lhs instanceof ExprNodeColumnDesc) {
-          // It is an IN clause on a column
-          if (((ExprNodeColumnDesc)lhs).getIsPartitionColOrVirtualCol()) {
-            // It is a partition column, we can proceed
-            removePredElem = true;
-          }
-          if (removePredElem) {
-            // We should not remove the dynamic partition pruner generated synthetic predicates.
-            for (int i = 1; i < children.size(); i++) {
-              if (children.get(i) instanceof ExprNodeDynamicListDesc) {
-                removePredElem = false;
-                break;
-              }
-            }
-          }
-        } else if (lhs instanceof ExprNodeGenericFuncDesc) {
-          // It is an IN clause on a struct
-          // Make sure that the generic udf is deterministic
-          if (FunctionRegistry.isDeterministic(((ExprNodeGenericFuncDesc) lhs)
-              .getGenericUDF())) {
-            boolean hasOnlyPartCols = true;
-            boolean hasDynamicListDesc = false;
-
-            for (ExprNodeDesc ed : ((ExprNodeGenericFuncDesc) lhs).getChildren()) {
-              // Check if the current field expression contains only
-              // partition column or a virtual column or constants.
-              // If yes, this filter predicate is a candidate for this optimization.
-              if (!(ed instanceof ExprNodeColumnDesc &&
-                   ((ExprNodeColumnDesc)ed).getIsPartitionColOrVirtualCol())) {
-                hasOnlyPartCols = false;
-                break;
-              }
-            }
-
-            // If we have non-partition columns, we cannot remove the predicate.
-            if (hasOnlyPartCols) {
-              // We should not remove the dynamic partition pruner generated synthetic predicates.
-              for (int i = 1; i < children.size(); i++) {
-                if (children.get(i) instanceof ExprNodeDynamicListDesc) {
-                  hasDynamicListDesc = true;
-                  break;
-                }
-              }
-            }
-
-            removePredElem = hasOnlyPartCols && !hasDynamicListDesc;
+      }
+      if (allDivided) {
+        Boolean[] results = new Boolean[ctx.getPartList().size()];
+        for (int i = 0; i < ctx.getPartList().size(); i++) {
+          Boolean[] andArray = new Boolean[newNodeOutputs.length];
+          for (int j = 0; j < newNodeOutputs.length; j++) {
+            andArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
           }
+          results[i] = opAnd(andArray);
         }
+        return getResultWrapFromResults(results, fd, newNodeOutputs);
+      }
+      return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+    }
 
-        // If removePredElem is set to true, return true as this is a potential candidate
-        // for partition condition remover. Else, set the WalkState for this node to unknown.
-        return removePredElem ?
-          new NodeInfoWrapper(WalkState.TRUE, null,
-          new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.TRUE)) :
-          new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs)) ;
-      } else if (!FunctionRegistry.isDeterministic(fd.getGenericUDF())) {
-        // If it's a non-deterministic UDF, set unknown to true
-        return new NodeInfoWrapper(WalkState.UNKNOWN, null,
+    private Object handleUdfNot(PcrExprProcCtx ctx, ExprNodeGenericFuncDesc fd,
+        Object... nodeOutputs) {
+      assert (nodeOutputs.length == 1);
+      NodeInfoWrapper wrapper = (NodeInfoWrapper) nodeOutputs[0];
+      if (wrapper.state == WalkState.TRUE) {
+        ExprNodeConstantDesc falseDesc = new ExprNodeConstantDesc(
+            wrapper.outExpr.getTypeInfo(), Boolean.FALSE);
+        return new NodeInfoWrapper(WalkState.FALSE, null, falseDesc);
+      } else if (wrapper.state == WalkState.FALSE) {
+        ExprNodeConstantDesc trueDesc = new ExprNodeConstantDesc(
+            wrapper.outExpr.getTypeInfo(), Boolean.TRUE);
+        return new NodeInfoWrapper(WalkState.TRUE, null, trueDesc);
+      } else if (wrapper.state == WalkState.DIVIDED) {
+        Boolean[] results = new Boolean[ctx.getPartList().size()];
+        for (int i = 0; i < ctx.getPartList().size(); i++) {
+          results[i] = opNot(wrapper.ResultVector[i]);
+        }
+        return new NodeInfoWrapper(WalkState.DIVIDED, results,
             getOutExpr(fd, nodeOutputs));
       } else {
-        // If any child is unknown, set unknown to true
-        boolean has_part_col = false;
-        for (Object child : nodeOutputs) {
-          NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
-          if (wrapper.state == WalkState.UNKNOWN) {
-            return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
-          } else if (wrapper.state == WalkState.PART_COL) {
-            has_part_col = true;
-          }
-        }
-
-        if (has_part_col && fd.getTypeInfo().getCategory() == Category.PRIMITIVE) {
-          //  we need to evaluate result for every pruned partition
-          if (fd.getTypeInfo().equals(TypeInfoFactory.booleanTypeInfo)) {
-            // if the return type of the GenericUDF is boolean and all partitions agree on
-            // a result, we update the state of the node to be TRUE of FALSE
-            Boolean[] results = new Boolean[ctx.getPartList().size()];
-            for (int i = 0; i < ctx.getPartList().size(); i++) {
-              results[i] = (Boolean) evalExprWithPart(fd, ctx.getPartList().get(i),
-                  ctx.getVirtualColumns());
-            }
-            return getResultWrapFromResults(results, fd, nodeOutputs);
-          }
-
-          // the case that return type of the GenericUDF is not boolean, and if not all partition
-          // agree on result, we make the node UNKNOWN. If they all agree, we replace the node
-          // to be a CONSTANT node with value to be the agreed result.
-          Object[] results = new Object[ctx.getPartList().size()];
-          for (int i = 0; i < ctx.getPartList().size(); i++) {
-            results[i] = evalExprWithPart(fd, ctx.getPartList().get(i), ctx.getVirtualColumns());
-          }
-          Object result = ifResultsAgree(results);
-          if (result == null) {
-            // if the result is not boolean and not all partition agree on the
-            // result, we don't remove the condition. Potentially, it can miss
-            // the case like "where ds % 3 == 1 or ds % 3 == 2"
-            // TODO: handle this case by making result vector to handle all
-            // constant values.
-            return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
-          }
-          return new NodeInfoWrapper(WalkState.CONSTANT, null,
-              new ExprNodeConstantDesc(fd.getTypeInfo(), result));
-        }
-
-        // Try to fold, otherwise return the expression itself
-        final ExprNodeGenericFuncDesc desc = getOutExpr(fd, nodeOutputs);
-        final ExprNodeDesc foldedDesc = ConstantPropagateProcFactory.foldExpr(desc);
-        if (foldedDesc != null && foldedDesc instanceof ExprNodeConstantDesc) {
-          ExprNodeConstantDesc constant = (ExprNodeConstantDesc) foldedDesc;
-          if (Boolean.TRUE.equals(constant.getValue())) {
-            return new NodeInfoWrapper(WalkState.TRUE, null, constant);
-          } else if (Boolean.FALSE.equals(constant.getValue())) {
-            return new NodeInfoWrapper(WalkState.FALSE, null, constant);
-          } else {
-            return new NodeInfoWrapper(WalkState.CONSTANT, null, constant);
-          }
-        }
-        return new NodeInfoWrapper(WalkState.CONSTANT, null, desc);
+        return new NodeInfoWrapper(wrapper.state, null,
+            getOutExpr(fd, nodeOutputs));
       }
     }
   };

http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index b2125ca..4353d3a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -620,4 +623,26 @@ public class ParseContext {
       List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts) {
     this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts;
   }
+
+  public Collection<Operator> getAllOps() {
+    List<Operator> ops = new ArrayList<>();
+    Set<Operator> visited = new HashSet<Operator>();
+    for (Operator<?> op : getTopOps().values()) {
+      getAllOps(ops, visited, op);
+    }
+    return ops;
+  }
+
+  private static void getAllOps(List<Operator> builder, Set<Operator> visited, Operator<?> op) {
+    boolean added = visited.add(op);
+    builder.add(op);
+    if (!added) return;
+    if (op.getNumChild() > 0) {
+      List<Operator<?>> children = op.getChildOperators();
+      for (int i = 0; i < children.size(); i++) {
+        getAllOps(builder, visited, children.get(i));
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/test/queries/clientpositive/partition_condition_remover.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/partition_condition_remover.q b/ql/src/test/queries/clientpositive/partition_condition_remover.q
new file mode 100644
index 0000000..39e58b8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/partition_condition_remover.q
@@ -0,0 +1,13 @@
+
+drop table foo;
+
+create table foo (i int) partitioned by (s string);
+
+insert overwrite table foo partition(s='foo') select cint from alltypesorc limit 10;
+insert overwrite table foo partition(s='bar') select cint from alltypesorc limit 10;
+
+explain select * from foo where s not in ('bar');
+select * from foo where s not in ('bar');
+
+
+drop table foo;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/test/results/clientpositive/partition_condition_remover.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/partition_condition_remover.q.out b/ql/src/test/results/clientpositive/partition_condition_remover.q.out
new file mode 100644
index 0000000..2f8f998
--- /dev/null
+++ b/ql/src/test/results/clientpositive/partition_condition_remover.q.out
@@ -0,0 +1,79 @@
+PREHOOK: query: drop table foo
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table foo
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table foo (i int) partitioned by (s string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@foo
+POSTHOOK: query: create table foo (i int) partitioned by (s string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@foo
+PREHOOK: query: insert overwrite table foo partition(s='foo') select cint from alltypesorc limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@foo@s=foo
+POSTHOOK: query: insert overwrite table foo partition(s='foo') select cint from alltypesorc limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@foo@s=foo
+POSTHOOK: Lineage: foo PARTITION(s=foo).i SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+PREHOOK: query: insert overwrite table foo partition(s='bar') select cint from alltypesorc limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@foo@s=bar
+POSTHOOK: query: insert overwrite table foo partition(s='bar') select cint from alltypesorc limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@foo@s=bar
+POSTHOOK: Lineage: foo PARTITION(s=bar).i SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+PREHOOK: query: explain select * from foo where s not in ('bar')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from foo where s not in ('bar')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: foo
+          Statistics: Num rows: 10 Data size: 90 Basic stats: COMPLETE Column stats: NONE
+          Select Operator
+            expressions: i (type: int), s (type: string)
+            outputColumnNames: _col0, _col1
+            Statistics: Num rows: 10 Data size: 90 Basic stats: COMPLETE Column stats: NONE
+            ListSink
+
+PREHOOK: query: select * from foo where s not in ('bar')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@foo
+PREHOOK: Input: default@foo@s=foo
+#### A masked pattern was here ####
+POSTHOOK: query: select * from foo where s not in ('bar')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@foo
+POSTHOOK: Input: default@foo@s=foo
+#### A masked pattern was here ####
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+528534767	foo
+PREHOOK: query: drop table foo
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@foo
+PREHOOK: Output: default@foo
+POSTHOOK: query: drop table foo
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@foo
+POSTHOOK: Output: default@foo

http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/test/results/clientpositive/pcs.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/pcs.q.out b/ql/src/test/results/clientpositive/pcs.q.out
index 0045c1c..d409eb5 100644
--- a/ql/src/test/results/clientpositive/pcs.q.out
+++ b/ql/src/test/results/clientpositive/pcs.q.out
@@ -1047,7 +1047,7 @@ STAGE PLANS:
             GatherStats: false
             Filter Operator
               isSamplingPred: false
-              predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (ds = '2008-04-08')) (type: boolean)
+              predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (struct(ds)) IN (const struct('2000-04-08'), const struct('2000-04-09')) and (ds = '2008-04-08')) (type: boolean)
               Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
               Select Operator
                 expressions: key (type: int), value (type: string)
@@ -1072,7 +1072,7 @@ STAGE PLANS:
             GatherStats: false
             Filter Operator
               isSamplingPred: false
-              predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (ds = '2008-04-08')) (type: boolean)
+              predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (struct(ds)) IN (const struct('2000-04-08'), const struct('2000-04-09')) and (ds = '2008-04-08')) (type: boolean)
               Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
               Select Operator
                 expressions: key (type: int), value (type: string)


[9/9] hive git commit: HIVE-14636 : pass information from FSOP/TezTask to commit to take care of speculative execution and failed tasks (Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-14636 : pass information from FSOP/TezTask to commit to take care of speculative execution and failed tasks (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87dcab47
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87dcab47
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87dcab47

Branch: refs/heads/hive-14535
Commit: 87dcab470f33ace818c775da6b0a9f18b10f66ac
Parents: 2cef25d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 31 16:06:03 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 31 16:06:03 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   6 +-
 .../hadoop/hive/common/HiveStatsUtils.java      |  14 +-
 .../java/org/apache/hadoop/hive/ql/Context.java |   9 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java |   4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 194 +++++++++---
 .../apache/hadoop/hive/ql/exec/MoveTask.java    | 315 ++++++++++---------
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  10 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   5 +-
 .../hadoop/hive/ql/parse/TypeCheckCtx.java      |   2 +-
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |   9 +
 .../hadoop/hive/ql/plan/LoadFileDesc.java       |   2 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  19 +-
 ql/src/test/queries/clientpositive/mm_current.q |  18 +-
 .../clientpositive/llap/mm_current.q.out        | 133 +++++++-
 15 files changed, 517 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 3ed2d08..ad43610 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -329,9 +329,13 @@ public final class FileUtils {
    */
   public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
       List<FileStatus> results) throws IOException {
+    listStatusRecursively(fs, fileStatus, HIDDEN_FILES_PATH_FILTER, results);
+  }
 
+  public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
+      PathFilter filter, List<FileStatus> results) throws IOException {
     if (fileStatus.isDir()) {
-      for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER)) {
+      for (FileStatus stat : fs.listStatus(fileStatus.getPath(), filter)) {
         listStatusRecursively(fs, stat, results);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
index 7c9d72f..111d99c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,15 +51,20 @@ public class HiveStatsUtils {
    * @return array of FileStatus
    * @throws IOException
    */
-  public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
+  public static FileStatus[] getFileStatusRecurse(Path path, int level,  FileSystem fs)
       throws IOException {
+    return getFileStatusRecurse(path, level, fs, FileUtils.HIDDEN_FILES_PATH_FILTER);
+  }
+
+  public static FileStatus[] getFileStatusRecurse(
+      Path path, int level, FileSystem fs, PathFilter filter) throws IOException {
 
     // if level is <0, the return all files/directories under the specified path
-    if ( level < 0) {
+    if (level < 0) {
       List<FileStatus> result = new ArrayList<FileStatus>();
       try {
         FileStatus fileStatus = fs.getFileStatus(path);
-        FileUtils.listStatusRecursively(fs, fileStatus, result);
+        FileUtils.listStatusRecursively(fs, fileStatus, filter, result);
       } catch (IOException e) {
         // globStatus() API returns empty FileStatus[] when the specified path
         // does not exist. But getFileStatus() throw IOException. To mimic the
@@ -75,7 +81,7 @@ public class HiveStatsUtils {
       sb.append(Path.SEPARATOR).append("*");
     }
     Path pathPattern = new Path(path, sb.toString());
-    return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER);
+    return fs.globStatus(pathPattern, filter);
   }
 
   public static int getNumBitVectorsForNDVEstimation(Configuration conf) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index ceb257c..1013f7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -233,7 +233,8 @@ public class Context {
       // Append task specific info to stagingPathName, instead of creating a sub-directory.
       // This way we don't have to worry about deleting the stagingPathName separately at
       // end of query execution.
-      dir = fs.makeQualified(new Path(stagingPathName + "_" + this.executionId + "-" + TaskRunner.getTaskRunnerID()));
+      // TODO# HERE
+      dir = fs.makeQualified(new Path(stagingPathName + "_" + getExecutionPrefix()));
 
       LOG.debug("Created staging dir = " + dir + " for path = " + inputPath);
 
@@ -819,6 +820,10 @@ public class Context {
     this.skipTableMasking = skipTableMasking;
   }
 
+  public String getExecutionPrefix() {
+    return this.executionId + "-" + TaskRunner.getTaskRunnerID();
+  }
+
   public ExplainConfiguration getExplainConfig() {
     return explainConfig;
   }
@@ -827,7 +832,7 @@ public class Context {
     this.explainConfig = explainConfig;
   }
 
-  public void resetOpContext(){
+  public void resetOpContext() {
     opContext = new CompilationOpContext();
     sequencer = new AtomicInteger();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index dfad6c1..40c784b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -254,8 +254,8 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
       Path outputDir = conf.getOutputPath();
       FileSystem fs = outputDir.getFileSystem(hconf);
       Path backupPath = backupOutputPath(fs, outputDir);
-      Utilities
-          .mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
+      // TODO# merge-related move
+      Utilities.mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
               null, reporter);
       if (success) {
         LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 1f5dfea..b8a2c5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -27,16 +27,22 @@ import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -79,6 +85,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.slf4j.Logger;
@@ -92,6 +99,7 @@ import com.google.common.collect.Lists;
 public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Serializable {
 
+  private static final String MANIFEST_EXTENSION = ".manifest";
   public static final Logger LOG = LoggerFactory.getLogger(FileSinkOperator.class);
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -165,7 +173,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       } 
       Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts
           + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
-          + " (spec path " + specPath + ")", new Exception());
+          + " (spec path " + specPath + ")"/*, new Exception()*/);
 
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
@@ -187,7 +195,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     /**
      * Update the final paths according to tmpPath.
      */
-    public Path getFinalPath(String taskId, Path tmpPath, String extension) {
+    private Path getFinalPath(String taskId, Path tmpPath, String extension) {
       if (extension != null) {
         return new Path(tmpPath, taskId + extension);
       } else {
@@ -218,41 +226,64 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
 
     private void commit(FileSystem fs) throws HiveException {
-      if (isMmTable) return;  // TODO#: need to propagate to MoveTask instead
+      List<Path> commitPaths = null;
+      if (isMmTable) {
+        commitPaths = new ArrayList<>();
+      }
       for (int idx = 0; idx < outPaths.length; ++idx) {
         try {
-          if ((bDynParts || isSkewedStoredAsSubDirectories)
-              && !fs.exists(finalPaths[idx].getParent())) {
-            Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
-            fs.mkdirs(finalPaths[idx].getParent());
-          }
-          boolean needToRename = true;
-          if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
-              conf.getWriteType() == AcidUtils.Operation.DELETE) {
-            // If we're updating or deleting there may be no file to close.  This can happen
-            // because the where clause strained out all of the records for a given bucket.  So
-            // before attempting the rename below, check if our file exists.  If it doesn't,
-            // then skip the rename.  If it does try it.  We could just blindly try the rename
-            // and avoid the extra stat, but that would mask other errors.
-            try {
-              if (outPaths[idx] != null) {
-                FileStatus stat = fs.getFileStatus(outPaths[idx]);
-              }
-            } catch (FileNotFoundException fnfe) {
-              needToRename = false;
-            }
-          }
-          Utilities.LOG14535.info("commit potentially moving " + outPaths[idx] + " to " + finalPaths[idx]);
-          if (needToRename && outPaths[idx] != null && !fs.rename(outPaths[idx], finalPaths[idx])) {
-            throw new HiveException("Unable to rename output from: " +
-                outPaths[idx] + " to: " + finalPaths[idx]);
-          }
-          updateProgress();
+          commitOneOutPath(idx, fs, commitPaths);
         } catch (IOException e) {
           throw new HiveException("Unable to rename output from: " +
               outPaths[idx] + " to: " + finalPaths[idx], e);
         }
       }
+      if (isMmTable) {
+        Path manifestPath = new Path(specPath, "_tmp." + getPrefixedTaskId() + MANIFEST_EXTENSION);
+        Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths);
+        try {
+          try (FSDataOutputStream out = fs.create(manifestPath)) {
+            out.writeInt(commitPaths.size());
+            for (Path path : commitPaths) {
+              out.writeUTF(path.toString());
+            }
+          }
+        } catch (IOException e) {
+          throw new HiveException(e);
+        }
+      }
+    }
+
+    private String getPrefixedTaskId() {
+      return conf.getExecutionPrefix() + "_" + taskId;
+    }
+
+    private void commitOneOutPath(int idx, FileSystem fs, List<Path> commitPaths)
+        throws IOException, HiveException {
+      if ((bDynParts || isSkewedStoredAsSubDirectories)
+          && !fs.exists(finalPaths[idx].getParent())) {
+        Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
+        fs.mkdirs(finalPaths[idx].getParent());
+      }
+      // If we're updating or deleting there may be no file to close.  This can happen
+      // because the where clause strained out all of the records for a given bucket.  So
+      // before attempting the rename below, check if our file exists.  If it doesn't,
+      // then skip the rename.  If it does try it.  We could just blindly try the rename
+      // and avoid the extra stat, but that would mask other errors.
+      boolean needToRename = (conf.getWriteType() != AcidUtils.Operation.UPDATE &&
+          conf.getWriteType() != AcidUtils.Operation.DELETE) || fs.exists(outPaths[idx]);
+      if (needToRename && outPaths[idx] != null) {
+        Utilities.LOG14535.info("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")");
+        if (isMmTable) {
+          assert outPaths[idx].equals(finalPaths[idx]);
+          commitPaths.add(outPaths[idx]);
+        } else if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+          throw new HiveException("Unable to rename output from: "
+              + outPaths[idx] + " to: " + finalPaths[idx]);
+        }
+      }
+
+      updateProgress();
     }
 
     public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
@@ -297,10 +328,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
-            finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+            finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, extension);
           } else {
             // TODO# wrong!
-            finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+            finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, extension);
           }
           outPaths[filesIdx] = finalPaths[filesIdx];
         }
@@ -638,7 +669,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable, isSkewedStoredAsSubDirectories);
       Utilities.LOG14535.info("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx]
           + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path "
-          + fsp.getTmpPath() + ", task " + taskId + ")", new Exception());
+          + fsp.getTmpPath() + ", task " + taskId + ")"/*, new Exception()*/);
 
       if (isInfoEnabled) {
         LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
@@ -1150,9 +1181,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
         if (conf.isLinkedFileSink() && (dpCtx != null)) {
           specPath = conf.getParentDir();
+          Utilities.LOG14535.info("Setting specPath to " + specPath + " for dynparts");
+        }
+        if (!conf.isMmTable()) {
+          Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); // TODO# other callers
+        } else {
+          handleMmTable(specPath, hconf, success, dpCtx, conf, reporter);
         }
-        Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
-          reporter);
       }
     } catch (IOException e) {
       throw new HiveException(e);
@@ -1160,6 +1195,95 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     super.jobCloseOp(hconf, success);
   }
 
+  private static class ExecPrefixPathFilter implements PathFilter {
+    private final String prefix, tmpPrefix;
+    public ExecPrefixPathFilter(String prefix) {
+      this.prefix = prefix;
+      this.tmpPrefix = "_tmp." + prefix;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith(prefix) || name.startsWith(tmpPrefix);
+    }
+  }
+
+
+  private void handleMmTable(Path specPath, Configuration hconf, boolean success,
+      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter)
+          throws IOException, HiveException {
+    FileSystem fs = specPath.getFileSystem(hconf);
+    int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols();
+    if (!success) {
+      FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+          specPath, targetLevel, fs, new ExecPrefixPathFilter(conf.getExecutionPrefix()));
+      for (FileStatus status : statuses) {
+        Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
+        tryDelete(fs, status.getPath());
+      }
+      return;
+    }
+    FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+        specPath, targetLevel, fs, new ExecPrefixPathFilter(conf.getExecutionPrefix()));
+    if (statuses == null) return;
+    LinkedList<FileStatus> results = new LinkedList<>();
+    List<Path> manifests = new ArrayList<>(statuses.length);
+    for (FileStatus status : statuses) {
+      if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
+        manifests.add(status.getPath());
+      } else {
+        results.add(status);
+      }
+    }
+    HashSet<String> committed = new HashSet<>();
+    for (Path mfp : manifests) {
+      try (FSDataInputStream mdis = fs.open(mfp)) {
+        int fileCount = mdis.readInt();
+        for (int i = 0; i < fileCount; ++i) {
+          String nextFile = mdis.readUTF();
+          if (!committed.add(nextFile)) {
+            throw new HiveException(nextFile + " was specified in multiple manifests");
+          }
+        }
+      }
+    }
+    Iterator<FileStatus> iter = results.iterator();
+    while (iter.hasNext()) {
+      FileStatus rfs = iter.next();
+      if (!committed.remove(rfs.getPath().toString())) {
+        iter.remove();
+        Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not committed");
+        tryDelete(fs, rfs.getPath());
+      }
+    }
+    if (!committed.isEmpty()) {
+      throw new HiveException("The following files were committed but not found: " + committed);
+    }
+    for (Path mfp : manifests) {
+      Utilities.LOG14535.info("Deleting manifest " + mfp);
+      tryDelete(fs, mfp);
+    }
+
+    if (results.isEmpty()) return;
+    FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]);
+
+    List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
+        fs, finalResults, dpCtx, conf, hconf);
+    // create empty buckets if necessary
+    if (emptyBuckets.size() > 0) {
+      Utilities.createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
+    }
+  }
+
+  private void tryDelete(FileSystem fs, Path path) {
+    try {
+      fs.delete(path, false);
+    } catch (IOException ex) {
+      LOG.error("Failed to delete " + path, ex);
+    }
+  }
+
   @Override
   public OperatorType getType() {
     return OperatorType.FILESINK;

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 2ab97f7..e3646da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -241,6 +241,18 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
     return false;
   }
 
+  private final static class TaskInformation {
+    public List<BucketCol> bucketCols = null;
+    public List<SortCol> sortCols = null;
+    public int numBuckets = -1;
+    public Task task;
+    public String path;
+    public TaskInformation(Task task, String path) {
+      this.task = task;
+      this.path = path;
+    }
+  }
+
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -318,155 +330,15 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
 
           // Check if the bucketing and/or sorting columns were inferred
-          List<BucketCol> bucketCols = null;
-          List<SortCol> sortCols = null;
-          int numBuckets = -1;
-          Task task = this;
-          String path = tbd.getSourcePath().toUri().toString();
-          // Find the first ancestor of this MoveTask which is some form of map reduce task
-          // (Either standard, local, or a merge)
-          while (task.getParentTasks() != null && task.getParentTasks().size() == 1) {
-            task = (Task)task.getParentTasks().get(0);
-            // If it was a merge task or a local map reduce task, nothing can be inferred
-            if (task instanceof MergeFileTask || task instanceof MapredLocalTask) {
-              break;
-            }
-
-            // If it's a standard map reduce task, check what, if anything, it inferred about
-            // the directory this move task is moving
-            if (task instanceof MapRedTask) {
-              MapredWork work = (MapredWork)task.getWork();
-              MapWork mapWork = work.getMapWork();
-              bucketCols = mapWork.getBucketedColsByDirectory().get(path);
-              sortCols = mapWork.getSortedColsByDirectory().get(path);
-              if (work.getReduceWork() != null) {
-                numBuckets = work.getReduceWork().getNumReduceTasks();
-              }
-
-              if (bucketCols != null || sortCols != null) {
-                // This must be a final map reduce task (the task containing the file sink
-                // operator that writes the final output)
-                assert work.isFinalMapRed();
-              }
-              break;
-            }
-
-            // If it's a move task, get the path the files were moved from, this is what any
-            // preceding map reduce task inferred information about, and moving does not invalidate
-            // those assumptions
-            // This can happen when a conditional merge is added before the final MoveTask, but the
-            // condition for merging is not met, see GenMRFileSink1.
-            if (task instanceof MoveTask) {
-              if (((MoveTask)task).getWork().getLoadFileWork() != null) {
-                path = ((MoveTask)task).getWork().getLoadFileWork().getSourcePath().toUri().toString();
-              }
-            }
-          }
+          TaskInformation ti = new TaskInformation(this, tbd.getSourcePath().toUri().toString());
+          inferTaskInformation(ti);
           // deal with dynamic partitions
           DynamicPartitionCtx dpCtx = tbd.getDPCtx();
           if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
-
-            List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
-
-            // publish DP columns to its subscribers
-            if (dps != null && dps.size() > 0) {
-              pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
-            }
-            console.printInfo(System.getProperty("line.separator"));
-            long startTime = System.currentTimeMillis();
-            // load the list of DP partitions and return the list of partition specs
-            // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
-            // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
-            // After that check the number of DPs created to not exceed the limit and
-            // iterate over it and call loadPartition() here.
-            // The reason we don't do inside HIVE-1361 is the latter is large and we
-            // want to isolate any potential issue it may introduce.
-            Map<Map<String, String>, Partition> dp =
-              db.loadDynamicPartitions(
-                tbd.getSourcePath(),
-                tbd.getTable().getTableName(),
-                tbd.getPartitionSpec(),
-                tbd.getReplace(),
-                dpCtx.getNumDPCols(),
-                isSkewedStoredAsDirs(tbd),
-                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
-                SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),
-                work.getLoadTableWork().getWriteType());
-
-            console.printInfo("\t Time taken to load dynamic partitions: "  +
-                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
-
-            if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
-              throw new HiveException("This query creates no partitions." +
-                  " To turn off this error, set hive.error.on.empty.partition=false.");
-            }
-
-            startTime = System.currentTimeMillis();
-            // for each partition spec, get the partition
-            // and put it to WriteEntity for post-exec hook
-            for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
-              Partition partn = entry.getValue();
-
-              if (bucketCols != null || sortCols != null) {
-                updatePartitionBucketSortColumns(
-                    db, table, partn, bucketCols, numBuckets, sortCols);
-              }
-
-              WriteEntity enty = new WriteEntity(partn,
-                  (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-                      WriteEntity.WriteType.INSERT));
-              if (work.getOutputs() != null) {
-                work.getOutputs().add(enty);
-              }
-              // Need to update the queryPlan's output as well so that post-exec hook get executed.
-              // This is only needed for dynamic partitioning since for SP the the WriteEntity is
-              // constructed at compile time and the queryPlan already contains that.
-              // For DP, WriteEntity creation is deferred at this stage so we need to update
-              // queryPlan here.
-              if (queryPlan.getOutputs() == null) {
-                queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
-              }
-              queryPlan.getOutputs().add(enty);
-
-              // update columnar lineage for each partition
-              dc = new DataContainer(table.getTTable(), partn.getTPartition());
-
-              // Don't set lineage on delete as we don't have all the columns
-              if (SessionState.get() != null &&
-                  work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
-                  work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
-                SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
-                    table.getCols());
-              }
-              LOG.info("\tLoading partition " + entry.getKey());
-            }
-            console.printInfo("\t Time taken for adding to write entity : " +
-                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
-            dc = null; // reset data container to prevent it being added again.
+            dc = handleDynParts(db, table, tbd, ti, dpCtx);
           } else { // static partitions
-            List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
-                tbd.getPartitionSpec());
-            db.validatePartitionNameCharacters(partVals);
-            Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable());
-            db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
-                tbd.getPartitionSpec(), tbd.getReplace(),
-                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
-                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
-            Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
-
-            if (bucketCols != null || sortCols != null) {
-              updatePartitionBucketSortColumns(db, table, partn, bucketCols,
-                  numBuckets, sortCols);
-            }
-
-            dc = new DataContainer(table.getTTable(), partn.getTPartition());
-            // add this partition to post-execution hook
-            if (work.getOutputs() != null) {
-              work.getOutputs().add(new WriteEntity(partn,
-                  (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
-                      : WriteEntity.WriteType.INSERT)));
-            }
-         }
+            dc = handleStaticParts(db, table, tbd, ti);
+          }
         }
         if (SessionState.get() != null && dc != null) {
           // If we are doing an update or a delete the number of columns in the table will not
@@ -500,6 +372,159 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
     }
   }
 
+  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
+      TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
+    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  tbd.getPartitionSpec());
+    db.validatePartitionNameCharacters(partVals);
+    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable());
+    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+        tbd.getPartitionSpec(), tbd.getReplace(),
+        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
+        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
+    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+
+    if (ti.bucketCols != null || ti.sortCols != null) {
+      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
+          ti.numBuckets, ti.sortCols);
+    }
+
+    DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition());
+    // add this partition to post-execution hook
+    if (work.getOutputs() != null) {
+      work.getOutputs().add(new WriteEntity(partn,
+          (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
+              : WriteEntity.WriteType.INSERT)));
+    }
+    return dc;
+  }
+
+  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
+      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
+      IOException, InvalidOperationException {
+    DataContainer dc;
+    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
+
+    // publish DP columns to its subscribers
+    if (dps != null && dps.size() > 0) {
+      pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
+    }
+    console.printInfo(System.getProperty("line.separator"));
+    long startTime = System.currentTimeMillis();
+    // load the list of DP partitions and return the list of partition specs
+    // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
+    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
+    // After that check the number of DPs created to not exceed the limit and
+    // iterate over it and call loadPartition() here.
+    // The reason we don't do inside HIVE-1361 is the latter is large and we
+    // want to isolate any potential issue it may introduce.
+    Map<Map<String, String>, Partition> dp =
+      db.loadDynamicPartitions(
+        tbd.getSourcePath(),
+        tbd.getTable().getTableName(),
+        tbd.getPartitionSpec(),
+        tbd.getReplace(),
+        dpCtx.getNumDPCols(),
+        isSkewedStoredAsDirs(tbd),
+        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
+        SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),
+        work.getLoadTableWork().getWriteType());
+
+    console.printInfo("\t Time taken to load dynamic partitions: "  +
+        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+
+    if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
+      throw new HiveException("This query creates no partitions." +
+          " To turn off this error, set hive.error.on.empty.partition=false.");
+    }
+
+    startTime = System.currentTimeMillis();
+    // for each partition spec, get the partition
+    // and put it to WriteEntity for post-exec hook
+    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+      Partition partn = entry.getValue();
+
+      if (ti.bucketCols != null || ti.sortCols != null) {
+        updatePartitionBucketSortColumns(
+            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
+      }
+
+      WriteEntity enty = new WriteEntity(partn,
+          (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+              WriteEntity.WriteType.INSERT));
+      if (work.getOutputs() != null) {
+        work.getOutputs().add(enty);
+      }
+      // Need to update the queryPlan's output as well so that post-exec hook get executed.
+      // This is only needed for dynamic partitioning since for SP the the WriteEntity is
+      // constructed at compile time and the queryPlan already contains that.
+      // For DP, WriteEntity creation is deferred at this stage so we need to update
+      // queryPlan here.
+      if (queryPlan.getOutputs() == null) {
+        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
+      }
+      queryPlan.getOutputs().add(enty);
+
+      // update columnar lineage for each partition
+      dc = new DataContainer(table.getTTable(), partn.getTPartition());
+
+      // Don't set lineage on delete as we don't have all the columns
+      if (SessionState.get() != null &&
+          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
+        SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
+            table.getCols());
+      }
+      LOG.info("\tLoading partition " + entry.getKey());
+    }
+    console.printInfo("\t Time taken for adding to write entity : " +
+        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+    dc = null; // reset data container to prevent it being added again.
+    return dc;
+  }
+
+  private void inferTaskInformation(TaskInformation ti) {
+    // Find the first ancestor of this MoveTask which is some form of map reduce task
+    // (Either standard, local, or a merge)
+    while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) {
+      ti.task = (Task)ti.task.getParentTasks().get(0);
+      // If it was a merge task or a local map reduce task, nothing can be inferred
+      if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) {
+        break;
+      }
+
+      // If it's a standard map reduce task, check what, if anything, it inferred about
+      // the directory this move task is moving
+      if (ti.task instanceof MapRedTask) {
+        MapredWork work = (MapredWork)ti.task.getWork();
+        MapWork mapWork = work.getMapWork();
+        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
+        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
+        if (work.getReduceWork() != null) {
+          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
+        }
+
+        if (ti.bucketCols != null || ti.sortCols != null) {
+          // This must be a final map reduce task (the task containing the file sink
+          // operator that writes the final output)
+          assert work.isFinalMapRed();
+        }
+        break;
+      }
+
+      // If it's a move task, get the path the files were moved from, this is what any
+      // preceding map reduce task inferred information about, and moving does not invalidate
+      // those assumptions
+      // This can happen when a conditional merge is added before the final MoveTask, but the
+      // condition for merging is not met, see GenMRFileSink1.
+      if (ti.task instanceof MoveTask) {
+        MoveTask mt = (MoveTask)ti.task;
+        if (mt.getWork().getLoadFileWork() != null) {
+          ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
+        }
+      }
+    }
+  }
+
   private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
       throws HiveException {
     if (work.getCheckFileFormat()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a7f7b9f..427f067 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1409,7 +1409,6 @@ public final class Utilities {
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     if (success) {
-      // TODO# specPath instead of tmpPath
       FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
           tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
       if(statuses != null && statuses.length > 0) {
@@ -1423,8 +1422,6 @@ public final class Utilities {
         Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
         Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
       }
-      List<Path> paths = new ArrayList<>();
-      // TODO#: HERE listFilesToCommit(specPath, fs, paths);
     } else {
       Utilities.LOG14535.info("deleting tmpPath " + tmpPath);
       fs.delete(tmpPath, true);
@@ -1445,7 +1442,7 @@ public final class Utilities {
    * @throws HiveException
    * @throws IOException
    */
-  private static void createEmptyBuckets(Configuration hconf, List<Path> paths,
+  static void createEmptyBuckets(Configuration hconf, List<Path> paths,
       FileSinkDesc conf, Reporter reporter)
       throws HiveException, IOException {
 
@@ -1586,19 +1583,18 @@ public final class Utilities {
 
     for (FileStatus one : items) {
       if (isTempPath(one)) {
-        Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath(), new Exception());
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath()/*, new Exception()*/);
         if (!fs.delete(one.getPath(), true)) {
           throw new IOException("Unable to delete tmp file: " + one.getPath());
         }
       } else {
         String taskId = getPrefixedTaskIdFromFilename(one.getPath().getName());
-        Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId, new Exception());
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId/*, new Exception()*/);
 
         FileStatus otherFile = taskIdToFile.get(taskId);
         if (otherFile == null) {
           taskIdToFile.put(taskId, one);
         } else {
-          // TODO# file choice!
           // Compare the file sizes of all the attempt files for the same task, the largest win
           // any attempt files could contain partial results (due to task failures or
           // speculative runs), but the largest should be the correct one since the result

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7d8c961..e43c600 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1878,7 +1878,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName());
               Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
                   replace, true, listBucketingEnabled,
-                  false, isAcid, hasFollowingStatsTask, false); // TODO# here
+                  false, isAcid, hasFollowingStatsTask, false); // TODO# special case #N
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 6ed379a..499530e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6658,7 +6658,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         acidOp = getAcidType(table_desc.getOutputFileFormatClass());
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, isMmTable);
       ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
           dest_tab.getTableName()));
       ltd.setLbCtx(lbCtx);
@@ -6860,6 +6860,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
         canBeMerged, isMmTable);
+    if (isMmTable) {
+      fileSinkDesc.setExecutionPrefix(ctx.getExecutionPrefix());
+    }
 
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
         fileSinkDesc, fsRS, input), inputRR);

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
index 02896ff..26f1d70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
@@ -160,7 +160,7 @@ public class TypeCheckCtx implements NodeProcessorCtx {
     if (LOG.isDebugEnabled()) {
       // Logger the callstack from which the error has been set.
       LOG.debug("Setting error: [" + error + "] from "
-          + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree()), new Exception());
+          + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree())/*, new Exception()*/);
     }
     this.error = error;
     this.errorSrcNode = errorSrcNode;

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 0a4848b..f51999d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   private Path destPath;
   private boolean isHiveServerQuery;
   private boolean isMmTable;
+  private String executionPrefix;
 
   public FileSinkDesc() {
   }
@@ -158,6 +159,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     ret.setWriteType(writeType);
     ret.setTransactionId(txnId);
     ret.setStatsTmpDir(statsTmpDir);
+    ret.setExecutionPrefix(executionPrefix);
     return ret;
   }
 
@@ -481,4 +483,11 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.statsTmpDir = statsCollectionTempDir;
   }
 
+  public String getExecutionPrefix() {
+    return this.executionPrefix;
+  }
+
+  public void setExecutionPrefix(String value) {
+    this.executionPrefix = value;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 5e4e1fe..5cad65c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -56,7 +56,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
       final boolean isDfsDir, final String columns, final String columnTypes) {
 
     super(sourcePath);
-    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir, new Exception());
+    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir/*, new Exception()*/);
     this.targetDir = targetDir;
     this.isDfsDir = isDfsDir;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1ac831d..3b49197 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -52,10 +52,10 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      final AcidUtils.Operation writeType) {
+      final AcidUtils.Operation writeType, boolean isMmTable) {
     super(sourcePath);
-    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName(), new Exception());
-    init(table, partitionSpec, replace, writeType, false);
+    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
+    init(table, partitionSpec, replace, writeType, isMmTable);
   }
 
   /**
@@ -69,14 +69,16 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
                        final TableDesc table,
                        final Map<String, String> partitionSpec,
                        final boolean replace) {
-    this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID);
+    // TODO# we assume mm=false here
+    this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, false);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
-      final AcidUtils.Operation writeType) {
-    this(sourcePath, table, partitionSpec, true, writeType);
+      final AcidUtils.Operation writeType, boolean isMmTable) {
+    // TODO# we assume mm=false here
+    this(sourcePath, table, partitionSpec, true, writeType, isMmTable);
   }
 
   /**
@@ -88,7 +90,8 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
   public LoadTableDesc(final Path sourcePath,
                        final org.apache.hadoop.hive.ql.plan.TableDesc table,
                        final Map<String, String> partitionSpec) {
-    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID);
+    // TODO# we assume mm=false here
+    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, false);
   }
 
   public LoadTableDesc(final Path sourcePath,
@@ -98,7 +101,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
       boolean isReplace,
       boolean isMmTable) {
     super(sourcePath);
-    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName(), new Exception());
+    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
     this.dpCtx = dpCtx;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
       init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable);

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q
index 882096b..11259cb 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -2,10 +2,20 @@ set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.fetch.task.conversion=none;
+set tez.grouping.min-size=1;
+set tez.grouping.max-size=2;
+set  hive.tez.auto.reducer.parallelism=false;
 
-drop table simple_mm;
-
+create table intermediate(key int) partitioned by (p int) stored as orc;
+insert into table intermediate partition(p='455') select key from src limit 3;
+insert into table intermediate partition(p='456') select key from src limit 3;
+insert into table intermediate partition(p='457') select key from src limit 3;
   
-create table simple_mm(key int) partitioned by (key_mm int) tblproperties ('hivecommit'='true');
-insert into table simple_mm partition(key_mm='455') select key from src limit 3;
+create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+
+explain insert into table simple_mm partition(key_mm='455') select key from intermediate;
+insert into table simple_mm partition(key_mm='455') select key from intermediate;
+
+drop table simple_mm;
+drop table intermediate;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index 129bb13..8f1af4c 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -1,21 +1,128 @@
-PREHOOK: query: drop table simple123
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table simple123
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true')
+PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@simple123
-POSTHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple123
-PREHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3
+POSTHOOK: Output: default@intermediate
+PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=455
+POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=455
+POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
-PREHOOK: Output: default@simple123@key123=455
-POSTHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3
+PREHOOK: Output: default@intermediate@p=456
+POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
-POSTHOOK: Output: default@simple123@key123=455
-POSTHOOK: Lineage: simple123 PARTITION(key123=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Output: default@intermediate@p=456
+POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=457
+POSTHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=457
+POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+POSTHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: intermediate
+                  Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: int)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                          name: default.simple_mm
+            Execution mode: llap
+            LLAP IO: all inputs
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            key_mm 455
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.simple_mm
+          micromanaged table: true
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_mm@key_mm=455
+POSTHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_mm@key_mm=455
+POSTHOOK: Lineage: simple_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@simple_mm
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@simple_mm
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: drop table intermediate
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@intermediate
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: drop table intermediate
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Output: default@intermediate


[5/9] hive git commit: HIVE-14673: Orc orc_merge_diff_fs.q and orc_llap.q test improvement (Prasanth Jayachandran reviewed by Hari Subramaniyan)

Posted by se...@apache.org.
HIVE-14673: Orc orc_merge_diff_fs.q and orc_llap.q test improvement (Prasanth Jayachandran reviewed by Hari Subramaniyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fdc4bc85
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fdc4bc85
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fdc4bc85

Branch: refs/heads/hive-14535
Commit: fdc4bc8536606e8f1dc245fe559514b35fda8708
Parents: fa3a8b9
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Aug 31 12:47:02 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Aug 31 12:47:02 2016 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   3 +-
 ql/src/test/queries/clientpositive/orc_llap.q   |  76 +-
 .../results/clientpositive/llap/orc_llap.q.out  | 866 ++++++++++---------
 .../clientpositive/orc_merge_diff_fs.q.out      |  18 +-
 4 files changed, 501 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 764c8f6..a920ca9 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -33,7 +33,6 @@ minimr.query.files=auto_sortmerge_join_16.q,\
   load_fs2.q,\
   load_hdfs_file_with_space_in_the_name.q,\
   non_native_window_udf.q, \
-  orc_merge_diff_fs.q,\
   parallel_orderby.q,\
   quotedid_smb.q,\
   reduce_deduplicate.q,\
@@ -227,6 +226,7 @@ minillap.shared.query.files=acid_globallimit.q,\
   orc_merge7.q,\
   orc_merge8.q,\
   orc_merge9.q,\
+  orc_merge_diff_fs.q,\
   orc_merge_incompat1.q,\
   orc_merge_incompat2.q,\
   orc_merge_incompat3.q,\
@@ -462,7 +462,6 @@ minillap.query.files=acid_bucket_pruning.q,\
   orc_llap_counters.q,\
   orc_llap_counters1.q,\
   orc_llap_nonvector.q,\
-  orc_merge_diff_fs.q,\
   orc_ppd_basic.q,\
   schema_evol_orc_acid_part.q,\
   schema_evol_orc_acid_part_update.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/ql/src/test/queries/clientpositive/orc_llap.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_llap.q b/ql/src/test/queries/clientpositive/orc_llap.q
index d2bd086..7b7f240 100644
--- a/ql/src/test/queries/clientpositive/orc_llap.q
+++ b/ql/src/test/queries/clientpositive/orc_llap.q
@@ -62,42 +62,26 @@ select count(*) from orc_llap_small;
 -- All row groups pruned
 select count(*) from orc_llap_small where cint < 60000000;
 
--- Hash cannot be vectorized, so run hash as the last step on a temp table
-drop table llap_temp_table;
+-- Hash cannot be vectorized, but now we have row-by-row reader, so the subquery runs in llap but with row-by-row reader
 explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
 
-drop table llap_temp_table;
 explain
-select * from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
 
-drop table llap_temp_table;
 explain
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
 
-
-drop table llap_temp_table;
 explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
 
-drop table llap_temp_table;
 explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
 
 -- multi-stripe test
 insert into table orc_llap
@@ -106,43 +90,25 @@ from alltypesorc cross join cross_numbers;
 
 alter table orc_llap concatenate;
 
-drop table llap_temp_table;
 explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
 
-drop table llap_temp_table;
 explain
-select * from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
 
-drop table llap_temp_table;
 explain
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
 
-drop table llap_temp_table;
 explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
 
-drop table llap_temp_table;
 explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
-
-drop table llap_temp_table;
-
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
 
 DROP TABLE cross_numbers;
 DROP TABLE orc_llap;

http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index 72dd623..74a6b29 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -236,192 +236,232 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@orc_llap_small
 #### A masked pattern was here ####
 0
-PREHOOK: query: -- Hash cannot be vectorized, so run hash as the last step on a temp table
-drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: -- Hash cannot be vectorized, so run hash as the last step on a temp table
-drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+PREHOOK: query: -- Hash cannot be vectorized, but now we have row-by-row reader, so the subquery runs in llap but with row-by-row reader
+explain
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
-POSTHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+POSTHOOK: query: -- Hash cannot be vectorized, but now we have row-by-row reader, so the subquery runs in llap but with row-by-row reader
+explain
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-0 is a root stage
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap
+                  filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+                  Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+                    Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: hash(cint,csmallint,cbigint) (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: sum(_col0)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
   Stage: Stage-0
     Fetch Operator
       limit: -1
       Processor Tree:
-        TableScan
-          alias: orc_llap
-          filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
-          Filter Operator
-            predicate: ((cint > 10) and cbigint is not null) (type: boolean)
-            Select Operator
-              expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint)
-              outputColumnNames: _col0, _col1, _col2
-              ListSink
+        ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -558222259686
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-0 is a root stage
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap
+                  filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+                  Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+                    Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: hash(ctinyint,csmallint,cint,cbigint,cfloat,cdouble,cstring1,cstring2,ctimestamp1,ctimestamp2,cboolean1,cboolean2) (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: sum(_col0)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
   Stage: Stage-0
     Fetch Operator
       limit: -1
       Processor Tree:
-        TableScan
-          alias: orc_llap
-          filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
-          Filter Operator
-            predicate: ((cint > 10) and cbigint is not null) (type: boolean)
-            Select Operator
-              expressions: ctinyint (type: tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), cfloat (type: float), cdouble (type: double), cstring1 (type: string), cstring2 (type: string), ctimestamp1 (type: timestamp), ctimestamp2 (type: timestamp), cboolean1 (type: boolean), cboolean2 (type: boolean)
-              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
-              ListSink
+        ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -197609091139
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-0 is a root stage
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap
+                  filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
+                  Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((cint > 5) and (cint < 10)) (type: boolean)
+                    Statistics: Num rows: 13653 Data size: 3231025 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: hash(cstring2) (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 13653 Data size: 3231025 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: sum(_col0)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
   Stage: Stage-0
     Fetch Operator
       limit: -1
       Processor Tree:
-        TableScan
-          alias: orc_llap
-          filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
-          Filter Operator
-            predicate: ((cint > 5) and (cint < 10)) (type: boolean)
-            Select Operator
-              expressions: cstring2 (type: string)
-              outputColumnNames: _col0
-              ListSink
+        ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 NULL
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -433,6 +473,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -459,7 +500,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: vectorized, llap
+            Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -467,9 +508,30 @@ STAGE PLANS:
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: hash(_col0,_col1,_col2) (type: int)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
+                  Group By Operator
+                    aggregations: sum(_col0)
+                    mode: hash
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                   table:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -481,43 +543,20 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -201218541193
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -528,7 +567,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -552,7 +592,7 @@ STAGE PLANS:
                         value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Map 3 
+        Map 4 
             Map Operator Tree:
                 TableScan
                   alias: o2
@@ -585,16 +625,33 @@ STAGE PLANS:
                 outputColumnNames: _col2, _col5
                 Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
-                  expressions: _col2 (type: string), _col5 (type: string)
-                  outputColumnNames: _col0, _col1
+                  expressions: hash(_col2,_col5) (type: int)
+                  outputColumnNames: _col0
                   Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  Group By Operator
+                    aggregations: sum(_col0)
+                    mode: hash
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -602,27 +659,13 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o2.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -735462183586256
 Warning: Map Join MAPJOIN[10][bigTable=?] in task 'Map 1' is a cross product
@@ -662,194 +705,230 @@ POSTHOOK: query: alter table orc_llap concatenate
 POSTHOOK: type: ALTER_TABLE_MERGE
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: default@orc_llap
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-0 is a root stage
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap
+                  filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+                  Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+                    Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: hash(cint,csmallint,cbigint) (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: sum(_col0)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
   Stage: Stage-0
     Fetch Operator
       limit: -1
       Processor Tree:
-        TableScan
-          alias: orc_llap
-          filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
-          Filter Operator
-            predicate: ((cint > 10) and cbigint is not null) (type: boolean)
-            Select Operator
-              expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint)
-              outputColumnNames: _col0, _col1, _col2
-              ListSink
+        ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -1116444519372
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-0 is a root stage
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap
+                  filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+                  Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+                    Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: hash(ctinyint,csmallint,cint,cbigint,cfloat,cdouble,cstring1,cstring2,ctimestamp1,ctimestamp2,cboolean1,cboolean2) (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: sum(_col0)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
   Stage: Stage-0
     Fetch Operator
       limit: -1
       Processor Tree:
-        TableScan
-          alias: orc_llap
-          filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
-          Filter Operator
-            predicate: ((cint > 10) and cbigint is not null) (type: boolean)
-            Select Operator
-              expressions: ctinyint (type: tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), cfloat (type: float), cdouble (type: double), cstring1 (type: string), cstring2 (type: string), ctimestamp1 (type: timestamp), ctimestamp2 (type: timestamp), cboolean1 (type: boolean), cboolean2 (type: boolean)
-              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
-              ListSink
+        ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -395218182278
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-0 is a root stage
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap
+                  filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
+                  Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: ((cint > 5) and (cint < 10)) (type: boolean)
+                    Statistics: Num rows: 27306 Data size: 6462051 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: hash(cstring2) (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 27306 Data size: 6462051 Basic stats: COMPLETE Column stats: NONE
+                      Group By Operator
+                        aggregations: sum(_col0)
+                        mode: hash
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        Reduce Output Operator
+                          sort order: 
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
   Stage: Stage-0
     Fetch Operator
       limit: -1
       Processor Tree:
-        TableScan
-          alias: orc_llap
-          filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
-          Filter Operator
-            predicate: ((cint > 5) and (cint < 10)) (type: boolean)
-            Select Operator
-              expressions: cstring2 (type: string)
-              outputColumnNames: _col0
-              ListSink
+        ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 NULL
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -861,6 +940,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -887,7 +967,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: vectorized, llap
+            Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -895,9 +975,30 @@ STAGE PLANS:
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
                 Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: hash(_col0,_col1,_col2) (type: int)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                  Group By Operator
+                    aggregations: sum(_col0)
+                    mode: hash
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
                   table:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -909,43 +1010,20 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -201218418313
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 PREHOOK: type: QUERY
 POSTHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -956,7 +1034,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -980,7 +1059,7 @@ STAGE PLANS:
                         value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Map 3 
+        Map 4 
             Map Operator Tree:
                 TableScan
                   alias: o2
@@ -1013,16 +1092,33 @@ STAGE PLANS:
                 outputColumnNames: _col2, _col5
                 Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
-                  expressions: _col2 (type: string), _col5 (type: string)
-                  outputColumnNames: _col0, _col1
+                  expressions: hash(_col2,_col5) (type: int)
+                  outputColumnNames: _col0
                   Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  Group By Operator
+                    aggregations: sum(_col0)
+                    mode: hash
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized, llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -1030,37 +1126,15 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o2.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
 #### A masked pattern was here ####
 -2941848734345024
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
 PREHOOK: query: DROP TABLE cross_numbers
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@cross_numbers

http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out b/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
index 6ac3d35..3f047da 100644
--- a/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
+++ b/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
@@ -67,14 +67,14 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: src
-            Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
               outputColumnNames: _col0, _col1, _col2
-              Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
               File Output Operator
                 compressed: false
-                Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                 table:
                     input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
                     output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
@@ -144,14 +144,14 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: src
-            Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
               outputColumnNames: _col0, _col1, _col2
-              Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
               File Output Operator
                 compressed: false
-                Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                 table:
                     input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
                     output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
@@ -260,14 +260,14 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: src
-            Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
               outputColumnNames: _col0, _col1, _col2
-              Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
               File Output Operator
                 compressed: false
-                Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                 table:
                     input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
                     output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat


[6/9] hive git commit: HIVE-14674 Incorrect syntax near the keyword 'with' using MS SQL Server (Eugene Koifman, reviewed by Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-14674  Incorrect syntax near the keyword 'with' using MS SQL Server (Eugene Koifman, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f5dee8c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f5dee8c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f5dee8c

Branch: refs/heads/hive-14535
Commit: 8f5dee8c45ccf249b56609edb4a66d0211e2a00c
Parents: fdc4bc8
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Wed Aug 31 15:34:07 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Wed Aug 31 15:34:07 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/metastore/txn/TxnHandler.java | 11 +++++++++--
 .../apache/hadoop/hive/metastore/txn/TestTxnUtils.java   |  6 ++++++
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8f5dee8c/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e8c5fac..a7a1cf9 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3308,6 +3308,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   /**
    * Helper class that generates SQL queries with syntax specific to target DB
    */
+  @VisibleForTesting
   static final class SQLGenerator {
     private final DatabaseProduct dbProduct;
     private final HiveConf conf;
@@ -3374,7 +3375,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
      * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent
      * construct.  If the DB doesn't support, return original select.
      */
-    private String addForUpdateClause(String selectStatement) throws MetaException {
+    String addForUpdateClause(String selectStatement) throws MetaException {
       switch (dbProduct) {
         case DERBY:
           //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
@@ -3390,7 +3391,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         case SQLSERVER:
           //https://msdn.microsoft.com/en-us/library/ms189499.aspx
           //https://msdn.microsoft.com/en-us/library/ms187373.aspx
-          return selectStatement + " with(updlock)";
+          String modifier = " with (updlock)";
+          int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
+          if(wherePos < 0) {
+            return selectStatement + modifier;
+          }
+          return selectStatement.substring(0, wherePos) + modifier +
+            selectStatement.substring(wherePos, selectStatement.length());
         default:
           String msg = "Unrecognized database product name <" + dbProduct + ">";
           LOG.error(msg);

http://git-wip-us.apache.org/repos/asf/hive/blob/8f5dee8c/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
index cf0a7d4..ebcbaff 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
@@ -158,6 +158,12 @@ public class TestTxnUtils {
     Assert.assertEquals("Number of stmts", 2, sql.size());
     Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('yellow', 1),('red', 2),('orange', 3),('G',0),('G',1),('G',2),('G',3),('G',4),('G',5),('G',6),('G',7),('G',8),('G',9),('G',10),('G',11),('G',12),('G',13),('G',14),('G',15),('G',16),('G',17),('G',18),('G',19),('G',20),('G',21),('G',22),('G',23),('G',24),('G',25),('G',26),('G',27),('G',28),('G',29),('G',30),('G',31),('G',32),('G',33),('G',34),('G',35),('G',36),('G',37),('G',38),('G',39),('G',40),('G',41),('G',42),('G',43),('G',44),('G',45),('G',46),('G',47),('G',48),('G',49),('G',50),('G',51),('G',52),('G',53),('G',54),('G',55),('G',56),('G',57),('G',58),('G',59),('G',60),('G',61),('G',62),('G',63),('G',64),('G',65),('G',66),('G',67),('G',68),('G',69),('G',70),('G',71),('G',72),('G',73),('G',74),('G',75),('G',76),('G',77),('G',78),('G',79),('G',80),('G',81),('G',82),('G',83),('G',84),('G',85),('G',86),('G',87),('G',88),('G',89),('G',90),('G',91),('G',92),('G',93),('G',94),('G',95),('G',96),('G',97),('G',9
 8),('G',99),('G',100),('G',101),('G',102),('G',103),('G',104),('G',105),('G',106),('G',107),('G',108),('G',109),('G',110),('G',111),('G',112),('G',113),('G',114),('G',115),('G',116),('G',117),('G',118),('G',119),('G',120),('G',121),('G',122),('G',123),('G',124),('G',125),('G',126),('G',127),('G',128),('G',129),('G',130),('G',131),('G',132),('G',133),('G',134),('G',135),('G',136),('G',137),('G',138),('G',139),('G',140),('G',141),('G',142),('G',143),('G',144),('G',145),('G',146),('G',147),('G',148),('G',149),('G',150),('G',151),('G',152),('G',153),('G',154),('G',155),('G',156),('G',157),('G',158),('G',159),('G',160),('G',161),('G',162),('G',163),('G',164),('G',165),('G',166),('G',167),('G',168),('G',169),('G',170),('G',171),('G',172),('G',173),('G',174),('G',175),('G',176),('G',177),('G',178),('G',179),('G',180),('G',181),('G',182),('G',183),('G',184),('G',185),('G',186),('G',187),('G',188),('G',189),('G',190),('G',191),('G',192),('G',193),('G',194),('G',195),('G',196),('G',197),('G',
 198),('G',199),('G',200),('G',201),('G',202),('G',203),('G',204),('G',205),('G',206),('G',207),('G',208),('G',209),('G',210),('G',211),('G',212),('G',213),('G',214),('G',215),('G',216),('G',217),('G',218),('G',219),('G',220),('G',221),('G',222),('G',223),('G',224),('G',225),('G',226),('G',227),('G',228),('G',229),('G',230),('G',231),('G',232),('G',233),('G',234),('G',235),('G',236),('G',237),('G',238),('G',239),('G',240),('G',241),('G',242),('G',243),('G',244),('G',245),('G',246),('G',247),('G',248),('G',249),('G',250),('G',251),('G',252),('G',253),('G',254),('G',255),('G',256),('G',257),('G',258),('G',259),('G',260),('G',261),('G',262),('G',263),('G',264),('G',265),('G',266),('G',267),('G',268),('G',269),('G',270),('G',271),('G',272),('G',273),('G',274),('G',275),('G',276),('G',277),('G',278),('G',279),('G',280),('G',281),('G',282),('G',283),('G',284),('G',285),('G',286),('G',287),('G',288),('G',289),('G',290),('G',291),('G',292),('G',293),('G',294),('G',295),('G',296),('G',297),('
 G',298),('G',299),('G',300),('G',301),('G',302),('G',303),('G',304),('G',305),('G',306),('G',307),('G',308),('G',309),('G',310),('G',311),('G',312),('G',313),('G',314),('G',315),('G',316),('G',317),('G',318),('G',319),('G',320),('G',321),('G',322),('G',323),('G',324),('G',325),('G',326),('G',327),('G',328),('G',329),('G',330),('G',331),('G',332),('G',333),('G',334),('G',335),('G',336),('G',337),('G',338),('G',339),('G',340),('G',341),('G',342),('G',343),('G',344),('G',345),('G',346),('G',347),('G',348),('G',349),('G',350),('G',351),('G',352),('G',353),('G',354),('G',355),('G',356),('G',357),('G',358),('G',359),('G',360),('G',361),('G',362),('G',363),('G',364),('G',365),('G',366),('G',367),('G',368),('G',369),('G',370),('G',371),('G',372),('G',373),('G',374),('G',375),('G',376),('G',377),('G',378),('G',379),('G',380),('G',381),('G',382),('G',383),('G',384),('G',385),('G',386),('G',387),('G',388),('G',389),('G',390),('G',391),('G',392),('G',393),('G',394),('G',395),('G',396),('G',397)
 ,('G',398),('G',399),('G',400),('G',401),('G',402),('G',403),('G',404),('G',405),('G',406),('G',407),('G',408),('G',409),('G',410),('G',411),('G',412),('G',413),('G',414),('G',415),('G',416),('G',417),('G',418),('G',419),('G',420),('G',421),('G',422),('G',423),('G',424),('G',425),('G',426),('G',427),('G',428),('G',429),('G',430),('G',431),('G',432),('G',433),('G',434),('G',435),('G',436),('G',437),('G',438),('G',439),('G',440),('G',441),('G',442),('G',443),('G',444),('G',445),('G',446),('G',447),('G',448),('G',449),('G',450),('G',451),('G',452),('G',453),('G',454),('G',455),('G',456),('G',457),('G',458),('G',459),('G',460),('G',461),('G',462),('G',463),('G',464),('G',465),('G',466),('G',467),('G',468),('G',469),('G',470),('G',471),('G',472),('G',473),('G',474),('G',475),('G',476),('G',477),('G',478),('G',479),('G',480),('G',481),('G',482),('G',483),('G',484),('G',485),('G',486),('G',487),('G',488),('G',489),('G',490),('G',491),('G',492),('G',493),('G',494),('G',495),('G',496),('G',4
 97),('G',498),('G',499),('G',500),('G',501),('G',502),('G',503),('G',504),('G',505),('G',506),('G',507),('G',508),('G',509),('G',510),('G',511),('G',512),('G',513),('G',514),('G',515),('G',516),('G',517),('G',518),('G',519),('G',520),('G',521),('G',522),('G',523),('G',524),('G',525),('G',526),('G',527),('G',528),('G',529),('G',530),('G',531),('G',532),('G',533),('G',534),('G',535),('G',536),('G',537),('G',538),('G',539),('G',540),('G',541),('G',542),('G',543),('G',544),('G',545),('G',546),('G',547),('G',548),('G',549),('G',550),('G',551),('G',552),('G',553),('G',554),('G',555),('G',556),('G',557),('G',558),('G',559),('G',560),('G',561),('G',562),('G',563),('G',564),('G',565),('G',566),('G',567),('G',568),('G',569),('G',570),('G',571),('G',572),('G',573),('G',574),('G',575),('G',576),('G',577),('G',578),('G',579),('G',580),('G',581),('G',582),('G',583),('G',584),('G',585),('G',586),('G',587),('G',588),('G',589),('G',590),('G',591),('G',592),('G',593),('G',594),('G',595),('G',596),('G
 ',597),('G',598),('G',599),('G',600),('G',601),('G',602),('G',603),('G',604),('G',605),('G',606),('G',607),('G',608),('G',609),('G',610),('G',611),('G',612),('G',613),('G',614),('G',615),('G',616),('G',617),('G',618),('G',619),('G',620),('G',621),('G',622),('G',623),('G',624),('G',625),('G',626),('G',627),('G',628),('G',629),('G',630),('G',631),('G',632),('G',633),('G',634),('G',635),('G',636),('G',637),('G',638),('G',639),('G',640),('G',641),('G',642),('G',643),('G',644),('G',645),('G',646),('G',647),('G',648),('G',649),('G',650),('G',651),('G',652),('G',653),('G',654),('G',655),('G',656),('G',657),('G',658),('G',659),('G',660),('G',661),('G',662),('G',663),('G',664),('G',665),('G',666),('G',667),('G',668),('G',669),('G',670),('G',671),('G',672),('G',673),('G',674),('G',675),('G',676),('G',677),('G',678),('G',679),('G',680),('G',681),('G',682),('G',683),('G',684),('G',685),('G',686),('G',687),('G',688),('G',689),('G',690),('G',691),('G',692),('G',693),('G',694),('G',695),('G',696),
 ('G',697),('G',698),('G',699),('G',700),('G',701),('G',702),('G',703),('G',704),('G',705),('G',706),('G',707),('G',708),('G',709),('G',710),('G',711),('G',712),('G',713),('G',714),('G',715),('G',716),('G',717),('G',718),('G',719),('G',720),('G',721),('G',722),('G',723),('G',724),('G',725),('G',726),('G',727),('G',728),('G',729),('G',730),('G',731),('G',732),('G',733),('G',734),('G',735),('G',736),('G',737),('G',738),('G',739),('G',740),('G',741),('G',742),('G',743),('G',744),('G',745),('G',746),('G',747),('G',748),('G',749),('G',750),('G',751),('G',752),('G',753),('G',754),('G',755),('G',756),('G',757),('G',758),('G',759),('G',760),('G',761),('G',762),('G',763),('G',764),('G',765),('G',766),('G',767),('G',768),('G',769),('G',770),('G',771),('G',772),('G',773),('G',774),('G',775),('G',776),('G',777),('G',778),('G',779),('G',780),('G',781),('G',782),('G',783),('G',784),('G',785),('G',786),('G',787),('G',788),('G',789),('G',790),('G',791),('G',792),('G',793),('G',794),('G',795),('G',79
 6),('G',797),('G',798),('G',799),('G',800),('G',801),('G',802),('G',803),('G',804),('G',805),('G',806),('G',807),('G',808),('G',809),('G',810),('G',811),('G',812),('G',813),('G',814),('G',815),('G',816),('G',817),('G',818),('G',819),('G',820),('G',821),('G',822),('G',823),('G',824),('G',825),('G',826),('G',827),('G',828),('G',829),('G',830),('G',831),('G',832),('G',833),('G',834),('G',835),('G',836),('G',837),('G',838),('G',839),('G',840),('G',841),('G',842),('G',843),('G',844),('G',845),('G',846),('G',847),('G',848),('G',849),('G',850),('G',851),('G',852),('G',853),('G',854),('G',855),('G',856),('G',857),('G',858),('G',859),('G',860),('G',861),('G',862),('G',863),('G',864),('G',865),('G',866),('G',867),('G',868),('G',869),('G',870),('G',871),('G',872),('G',873),('G',874),('G',875),('G',876),('G',877),('G',878),('G',879),('G',880),('G',881),('G',882),('G',883),('G',884),('G',885),('G',886),('G',887),('G',888),('G',889),('G',890),('G',891),('G',892),('G',893),('G',894),('G',895),('G'
 ,896),('G',897),('G',898),('G',899),('G',900),('G',901),('G',902),('G',903),('G',904),('G',905),('G',906),('G',907),('G',908),('G',909),('G',910),('G',911),('G',912),('G',913),('G',914),('G',915),('G',916),('G',917),('G',918),('G',919),('G',920),('G',921),('G',922),('G',923),('G',924),('G',925),('G',926),('G',927),('G',928),('G',929),('G',930),('G',931),('G',932),('G',933),('G',934),('G',935),('G',936),('G',937),('G',938),('G',939),('G',940),('G',941),('G',942),('G',943),('G',944),('G',945),('G',946),('G',947),('G',948),('G',949),('G',950),('G',951),('G',952),('G',953),('G',954),('G',955),('G',956),('G',957),('G',958),('G',959),('G',960),('G',961),('G',962),('G',963),('G',964),('G',965),('G',966),('G',967),('G',968),('G',969),('G',970),('G',971),('G',972),('G',973),('G',974),('G',975),('G',976),('G',977),('G',978),('G',979),('G',980),('G',981),('G',982),('G',983),('G',984),('G',985),('G',986),('G',987),('G',988),('G',989),('G',990),('G',991),('G',992),('G',993),('G',994),('G',995),(
 'G',996)", sql.get(0));
     Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('G',997),('G',998),('G',999)", sql.get(1));
+
+    sqlGenerator = new TxnHandler.SQLGenerator(TxnHandler.DatabaseProduct.SQLSERVER, conf);
+    String modSql = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID");
+    Assert.assertEquals("select nl_next from NEXT_LOCK_ID with (updlock)", modSql);
+    modSql = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1='CheckLock' and MT_KEY2=0");
+    Assert.assertEquals("select MT_COMMENT from AUX_TABLE with (updlock) where MT_KEY1='CheckLock' and MT_KEY2=0", modSql);
   }
 
   @Before


[4/9] hive git commit: HIVE-14670: org.apache.hadoop.hive.ql.TestMTQueries failure (Hari Subramaniyan, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-14670: org.apache.hadoop.hive.ql.TestMTQueries failure (Hari Subramaniyan, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fa3a8b9b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fa3a8b9b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fa3a8b9b

Branch: refs/heads/hive-14535
Commit: fa3a8b9bd16aad66b869ee30f6ddfa12f2fd5ad1
Parents: 1f6949f
Author: Hari Subramaniyan <ha...@apache.org>
Authored: Wed Aug 31 11:34:23 2016 -0700
Committer: Hari Subramaniyan <ha...@apache.org>
Committed: Wed Aug 31 11:34:23 2016 -0700

----------------------------------------------------------------------
 .../src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fa3a8b9b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
index c5337ff..198fe48 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
@@ -33,8 +33,7 @@ public class TestMTQueries extends BaseTestQueries {
   }
 
   public void testMTQueries1() throws Exception {
-    String[] testNames = new String[] {"join1.q", "join2.q", "groupby1.q",
-        "groupby2.q", "join3.q", "input1.q", "input19.q"};
+    String[] testNames = new String[] {"join2.q", "groupby1.q", "input1.q", "input19.q"};
 
     File[] qfiles = setupQFiles(testNames);
     QTestUtil[] qts = QTestUtil.queryListRunnerSetup(qfiles, resDir, logDir);