You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/31 23:06:35 UTC
[1/9] hive git commit: HIVE-14658 : UDF abs throws NPE when input arg
type is string (Niklaus Xiao via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/hive-14535 67f1b92ba -> 87dcab470
HIVE-14658 : UDF abs throws NPE when input arg type is string (Niklaus Xiao via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec4673bb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec4673bb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec4673bb
Branch: refs/heads/hive-14535
Commit: ec4673bbc61b2555ef2d992266055a331443b4d6
Parents: 20824f2
Author: niklaus xiao <st...@live.cn>
Authored: Tue Aug 30 17:29:37 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Aug 30 17:29:37 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java | 3 +++
.../apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java | 6 ++++++
2 files changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4673bb/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
index 1fdd41c..a8e2786 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java
@@ -130,6 +130,9 @@ public class GenericUDFAbs extends GenericUDF {
case STRING:
case DOUBLE:
valObject = inputConverter.convert(valObject);
+ if (valObject == null) {
+ return null;
+ }
resultDouble.set(Math.abs(((DoubleWritable) valObject).get()));
return resultDouble;
case DECIMAL:
http://git-wip-us.apache.org/repos/asf/hive/blob/ec4673bb/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
index 8c531ea..6dbb33f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFAbs.java
@@ -133,6 +133,12 @@ public class TestGenericUDFAbs extends TestCase {
output = (DoubleWritable) udf.evaluate(args);
assertEquals("abs() test for String failed ", "123.45", output.toString());
+
+ valueObj = new DeferredJavaObject(new Text("foo"));
+ args[0] = valueObj;
+ output = (DoubleWritable) udf.evaluate(args);
+
+ assertEquals("abs() test for String failed ", null, output);
}
public void testHiveDecimal() throws HiveException {
[3/9] hive git commit: HIVE-14233 - Improve vectorization for ACID by
eliminating row-by-row stitching (Saket Saurabh via Eugene Koifman)
Posted by se...@apache.org.
HIVE-14233 - Improve vectorization for ACID by eliminating row-by-row stitching (Saket Saurabh via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1f6949f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1f6949f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1f6949f9
Branch: refs/heads/hive-14535
Commit: 1f6949f958d63f5b1552a2417d7c042a416cdbe0
Parents: ab60591
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 30 20:38:28 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 30 20:38:28 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 134 +--
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 7 +-
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 822 +++++++++++++++++++
...ommands2WithSplitUpdateAndVectorization.java | 52 ++
.../TestVectorizedOrcAcidRowBatchReader.java | 263 ++++++
.../queries/clientpositive/acid_vectorization.q | 12 +
.../clientpositive/acid_vectorization.q.out | 62 ++
8 files changed, 1302 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb0d96f..aa03b63 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1187,6 +1187,12 @@ public class HiveConf extends Configuration {
HIVE_TRANSACTIONAL_TABLE_SCAN("hive.transactional.table.scan", false,
"internal usage only -- do transaction (ACID) table scan.", true),
+ HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY("hive.transactional.events.mem", 10000000,
+ "Vectorized ACID readers can often load all the delete events from all the delete deltas\n"
+ + "into memory to optimize for performance. To prevent out-of-memory errors, this is a rough heuristic\n"
+ + "that limits the total number of delete events that can be loaded into memory at once.\n"
+ + "Roughly it has been set to 10 million delete events per bucket (~160 MB).\n"),
+
HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
"A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index d5172ed..70003ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.SchemaEvolution;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -41,26 +38,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
-import org.apache.orc.ColumnStatistics;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.OrcTail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -73,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Metastore;
import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -89,22 +67,33 @@ import org.apache.hadoop.hive.ql.io.BatchToRowReader;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -117,7 +106,17 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.orc.ColumnStatistics;
import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.SchemaEvolution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -318,7 +317,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<OrcProto.Type> types = file.getTypes();
options.include(genIncludedColumns(types, conf, isOriginal));
setSearchArgument(options, types, conf, isOriginal);
- return (RecordReader) file.rowsOptions(options);
+ return file.rowsOptions(options);
}
public static boolean isOriginal(Reader file) {
@@ -1793,17 +1792,28 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
reporter.setStatus(inputSplit.toString());
+ boolean isFastVectorizedReaderAvailable =
+ VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit);
+
+ if (vectorMode && isFastVectorizedReaderAvailable) {
+ // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching.
+ return (org.apache.hadoop.mapred.RecordReader)
+ new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter);
+ }
+
Options options = new Options(conf).reporter(reporter);
final RowReader<OrcStruct> inner = getReader(inputSplit, options);
-
- if (vectorMode) {
+ if (vectorMode && !isFastVectorizedReaderAvailable) {
+ // Vectorized regular ACID reader that does row-by-row stitching.
return (org.apache.hadoop.mapred.RecordReader)
new VectorizedOrcAcidRowReader(inner, conf,
Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
} else {
+ // Non-vectorized regular ACID reader.
return new NullKeyRecordReader(inner, conf);
}
}
+
/**
* Return a RecordReader that is compatible with the Hive 0.12 reader
* with NullWritable for the key instead of RecordIdentifier.
@@ -1890,35 +1900,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
: AcidUtils.deserializeDeltas(root, split.getDeltas());
final Configuration conf = options.getConfiguration();
-
- /**
- * Do we have schema on read in the configuration variables?
- */
- TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
-
- final Reader reader;
- final int bucket;
- Reader.Options readOptions = new Reader.Options().schema(schema);
+ final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
+ final int bucket = OrcInputFormat.getBucketForSplit(conf, split);
+ final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
readOptions.range(split.getStart(), split.getLength());
- // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
- final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
- readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
- setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
-
- if (split.hasBase()) {
- bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf)
- .getBucket();
- OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
- .maxLength(split.getFileLength());
- if (split.hasFooter()) {
- readerOptions.orcTail(split.getOrcTail());
- }
- reader = OrcFile.createReader(path, readerOptions);
- } else {
- bucket = (int) split.getStart();
- reader = null;
- }
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
new ValidReadTxnList(txnString);
@@ -1930,7 +1916,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public ObjectInspector getObjectInspector() {
- return OrcStruct.createObjectInspector(0, schemaTypes);
+ return OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(readOptions.getSchema()));
}
@Override
@@ -1992,6 +1978,44 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
directory);
}
+ static Reader.Options createOptionsForReader(Configuration conf) {
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema =
+ OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+ Reader.Options readerOptions = new Reader.Options().schema(schema);
+ // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+ final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+ readerOptions.include(OrcInputFormat.genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+ OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+ return readerOptions;
+ }
+
+ static Reader createOrcReaderForSplit(Configuration conf, OrcSplit orcSplit) throws IOException {
+ Path path = orcSplit.getPath();
+ Reader reader;
+ if (orcSplit.hasBase()) {
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+ readerOptions.maxLength(orcSplit.getFileLength());
+ if (orcSplit.hasFooter()) {
+ readerOptions.orcTail(orcSplit.getOrcTail());
+ }
+ reader = OrcFile.createReader(path, readerOptions);
+ } else {
+ reader = null;
+ }
+ return reader;
+ }
+
+ static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) {
+ if (orcSplit.hasBase()) {
+ return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+ } else {
+ return (int) orcSplit.getStart();
+ }
+ }
+
public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
List<StripeStatistics> stripeStats, int stripeCount) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index e46ca51..f4e06ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -49,7 +50,6 @@ import org.apache.hadoop.io.Text;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
implements RecordReader {
@@ -79,6 +79,10 @@ public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
return true;
}
+ public VectorizedRowBatch createRowBatch() {
+ return this.schema.createRowBatch();
+ }
+
@Override
public long getRowNumber() {
return baseRow + rowInBatch;
@@ -129,6 +133,7 @@ public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
return previous;
}
+ @Override
public boolean nextBatch(VectorizedRowBatch theirBatch) throws IOException {
// If the user hasn't been reading by row, use the fast path.
if (rowInBatch >= batch.size) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
new file mode 100644
index 0000000..75c7680
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -0,0 +1,822 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+/**
+ * A fast vectorized batch reader class for ACID when split-update behavior is enabled.
+ * When split-update is turned on, row-by-row stitching could be avoided to create the final
+ * version of a row. Essentially, there are only insert and delete events. Insert events can be
+ * directly read from the base files/insert_only deltas in vectorized row batches. The deleted
+ * rows can then be easily indicated via the 'selected' field of the vectorized row batch.
+ * Refer HIVE-14233 for more details.
+ */
+public class VectorizedOrcAcidRowBatchReader
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable,VectorizedRowBatch> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class);
+
+ private org.apache.hadoop.hive.ql.io.orc.RecordReader baseReader;
+ private VectorizedRowBatchCtx rbCtx;
+ private VectorizedRowBatch vectorizedRowBatchBase;
+ private long offset;
+ private long length;
+ private float progress = 0.0f;
+ private Object[] partitionValues;
+ private boolean addPartitionCols = true;
+ private ValidTxnList validTxnList;
+ private DeleteEventRegistry deleteEventRegistry;
+
+ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf,
+ Reporter reporter) throws IOException {
+
+ final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
+ final AcidUtils.AcidOperationalProperties acidOperationalProperties
+ = AcidUtils.getAcidOperationalProperties(conf);
+
+ // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
+ // enabled for an ACID case and the file format is ORC.
+ boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
+ || !(inputSplit instanceof OrcSplit);
+ if (isReadNotAllowed) {
+ OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
+ }
+ final OrcSplit orcSplit = (OrcSplit) inputSplit;
+
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+ reporter.setStatus(orcSplit.toString());
+ Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, orcSplit);
+ Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf);
+ readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions);
+
+ this.offset = orcSplit.getStart();
+ this.length = orcSplit.getLength();
+
+ // Careful with the range here now, we do not want to read the whole base file like deltas.
+ this.baseReader = reader.rowsOptions(readerOptions.range(offset, length));
+
+ // VectorizedRowBatchBase schema is picked up from the baseReader because the SchemaEvolution
+ // stuff happens at the ORC layer that understands how to map user schema to acid schema.
+ if (this.baseReader instanceof RecordReaderImpl) {
+ this.vectorizedRowBatchBase = ((RecordReaderImpl) this.baseReader).createRowBatch();
+ } else {
+ throw new IOException("Failed to create vectorized row batch for the reader of type "
+ + this.baseReader.getClass().getName());
+ }
+
+ int partitionColumnCount = (rbCtx != null) ? rbCtx.getPartitionColumnCount() : 0;
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ VectorizedRowBatchCtx.getPartitionValues(rbCtx, conf, orcSplit, partitionValues);
+ } else {
+ partitionValues = null;
+ }
+
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+
+ // Clone readerOptions for deleteEvents.
+ Reader.Options deleteEventReaderOptions = readerOptions.clone();
+ // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
+ // we always want to read all the delete delta files.
+ deleteEventReaderOptions.range(0, Long.MAX_VALUE);
+ // Disable SARGs for deleteEventReaders, as SARGs have no meaning.
+ deleteEventReaderOptions.searchArgument(null, null);
+ try {
+ // See if we can load all the delete events from all the delete deltas in memory...
+ this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+ } catch (DeleteEventsOverflowMemoryException e) {
+ // If not, then create a set of hanging readers that do sort-merge to find the next smallest
+ // delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
+ this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+ }
+ }
+
+ /**
+ * Returns whether it is possible to create a valid instance of this class for a given split.
+ * @param conf is the job configuration
+ * @param inputSplit
+ * @return true if it is possible, else false.
+ */
+ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) {
+ if (!(inputSplit instanceof OrcSplit)) {
+ return false; // must be an instance of OrcSplit.
+ }
+ // First check if we are reading any original files in the split.
+ // To simplify the vectorization logic, the vectorized acid row batch reader does not handle
+ // original files for now as they have a different schema than a regular ACID file.
+ final OrcSplit split = (OrcSplit) inputSplit;
+ if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) {
+ // When split-update is turned on for ACID, a more optimized vectorized batch reader
+ // can be created. But still only possible when we are *NOT* reading any originals.
+ return true;
+ }
+ return false; // no split-update or possibly reading originals!
+ }
+
+ private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
+ Path path = orcSplit.getPath();
+ Path root;
+ if (orcSplit.hasBase()) {
+ if (orcSplit.isOriginal()) {
+ root = path.getParent();
+ } else {
+ root = path.getParent().getParent();
+ }
+ } else {
+ root = path;
+ }
+ return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas());
+ }
+
+ @Override
+ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+ try {
+ // Check and update partition cols if necessary. Ideally, this should be done
+ // in CreateValue as the partition is constant per split. But since Hive uses
+ // CombineHiveRecordReader and
+ // as this does not call CreateValue for each new RecordReader it creates, this check is
+ // required in next()
+ if (addPartitionCols) {
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(value, partitionValues);
+ }
+ addPartitionCols = false;
+ }
+ if (!baseReader.nextBatch(vectorizedRowBatchBase)) {
+ return false;
+ }
+ } catch (Exception e) {
+ throw new IOException("error iterating", e);
+ }
+
+ // Once we have read the VectorizedRowBatchBase from the file, there are two kinds of cases
+ // for which we might have to discard rows from the batch:
+ // Case 1- when the row is created by a transaction that is not valid, or
+ // Case 2- when the row has been deleted.
+ // We will go through the batch to discover rows which match any of the cases and specifically
+ // remove them from the selected vector. Of course, selectedInUse should also be set.
+
+ BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size);
+ if (vectorizedRowBatchBase.selectedInUse) {
+ // When selectedInUse is true, start with every bit set to false and selectively set
+ // certain bits to true based on the selected[] vector.
+ selectedBitSet.set(0, vectorizedRowBatchBase.size, false);
+ for (int j = 0; j < vectorizedRowBatchBase.size; ++j) {
+ int i = vectorizedRowBatchBase.selected[j];
+ selectedBitSet.set(i);
+ }
+ } else {
+ // When selectedInUse is set to false, everything in the batch is selected.
+ selectedBitSet.set(0, vectorizedRowBatchBase.size, true);
+ }
+
+ // Case 1- find rows which belong to transactions that are not valid.
+ findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+
+ // Case 2- find rows which have been deleted.
+ this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet);
+
+ if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) {
+ // None of the cases above matched and everything is selected. Hence, we will use the
+ // same values for the selected and selectedInUse.
+ value.size = vectorizedRowBatchBase.size;
+ value.selected = vectorizedRowBatchBase.selected;
+ value.selectedInUse = vectorizedRowBatchBase.selectedInUse;
+ } else {
+ value.size = selectedBitSet.cardinality();
+ value.selectedInUse = true;
+ value.selected = new int[selectedBitSet.cardinality()];
+ // This loop fills up the selected[] vector with all the index positions that are selected.
+ for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0;
+ setBitIndex >= 0;
+ setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) {
+ value.selected[selectedItr] = setBitIndex;
+ }
+ }
+
+ // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
+ // NOTE: We only link up the user columns and not the ACID metadata columns because this
+ // vectorized code path is not being used in cases of update/delete, when the metadata columns
+ // would be expected to be passed up the operator pipeline. This is because
+ // currently the update/delete specifically disable vectorized code paths.
+ // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
+ StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
+ // Transfer columnVector objects from base batch to outgoing batch.
+ System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
+ progress = baseReader.getProgress();
+ return true;
+ }
+
+ private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
+ if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
+ // When we have repeating values, we can unset the whole bitset at once
+ // if the repeating value is not a valid transaction.
+ long currentTransactionIdForBatch = ((LongColumnVector)
+ batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0];
+ if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) {
+ selectedBitSet.clear(0, batch.size);
+ }
+ return;
+ }
+ long[] currentTransactionVector =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector;
+ // Loop through the bits that are set to true and mark those rows as false, if their
+ // current transactions are not valid.
+ for (int setBitIndex = selectedBitSet.nextSetBit(0);
+ setBitIndex >= 0;
+ setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
+ if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) {
+ selectedBitSet.clear(setBitIndex);
+ }
+ }
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ return rbCtx.createVectorizedRowBatch();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return offset + (long) (progress * length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ this.baseReader.close();
+ } finally {
+ this.deleteEventRegistry.close();
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return progress;
+ }
+
+ @VisibleForTesting
+ DeleteEventRegistry getDeleteEventRegistry() {
+ return deleteEventRegistry;
+ }
+
+ /**
+ * An interface that can determine which rows have been deleted
+ * from a given vectorized row batch. Implementations of this interface
+ * will read the delete delta files and will create their own internal
+ * data structures to maintain record ids of the records that got deleted.
+ */
+ static interface DeleteEventRegistry {
+ /**
+ * Modifies the passed bitset to indicate which of the rows in the batch
+ * have been deleted. Assumes that the batch.size is equal to bitset size.
+ * @param batch
+ * @param selectedBitSet
+ * @throws IOException
+ */
+ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException;
+
+ /**
+ * The close() method can be called externally to signal the implementing classes
+ * to free up resources.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+ }
+
+ /**
+ * An implementation for DeleteEventRegistry that opens the delete delta files all
+ * at once, and then uses the sort-merge algorithm to maintain a sorted list of
+ * delete events. This internally uses the OrcRawRecordMerger and maintains a constant
+ * amount of memory usage, given the number of delete delta files. Therefore, this
+ * implementation will be picked up when the memory pressure is high.
+ */
+ static class SortMergedDeleteEventRegistry implements DeleteEventRegistry {
+ private OrcRawRecordMerger deleteRecords;
+ private OrcRawRecordMerger.ReaderKey deleteRecordKey;
+ private OrcStruct deleteRecordValue;
+ private boolean isDeleteRecordAvailable = true;
+ private ValidTxnList validTxnList;
+
+ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
+ throws IOException {
+ final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
+ if (deleteDeltas.length > 0) {
+ int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+ this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
+ validTxnList, readerOptions, deleteDeltas);
+ this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
+ this.deleteRecordValue = this.deleteRecords.createValue();
+ // Initialize the first value in the delete reader.
+ this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
+ } else {
+ this.isDeleteRecordAvailable = false;
+ this.deleteRecordKey = null;
+ this.deleteRecordValue = null;
+ this.deleteRecords = null;
+ }
+ }
+
+ @Override
+ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet)
+ throws IOException {
+ if (!isDeleteRecordAvailable) {
+ return;
+ }
+
+ long[] originalTransaction =
+ batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+ long[] bucket =
+ batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector;
+ long[] rowId =
+ batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
+
+ // The following repeatedX values will be set, if any of the columns are repeating.
+ long repeatedOriginalTransaction = (originalTransaction != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+ long repeatedBucket = (bucket != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+ long repeatedRowId = (rowId != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0];
+
+
+ // Get the first valid row in the batch still available.
+ int firstValidIndex = selectedBitSet.nextSetBit(0);
+ if (firstValidIndex == -1) {
+ return; // Everything in the batch has already been filtered out.
+ }
+ RecordIdentifier firstRecordIdInBatch =
+ new RecordIdentifier(
+ originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction,
+ bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket,
+ rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId);
+
+ // Get the last valid row in the batch still available.
+ int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1);
+ RecordIdentifier lastRecordIdInBatch =
+ new RecordIdentifier(
+ originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction,
+ bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket,
+ rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId);
+
+ // We must iterate over all the delete records, until we find one record with
+ // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records.
+ while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) {
+ isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue);
+ if (!isDeleteRecordAvailable) return; // exhausted all delete records, return.
+ }
+
+ // If we are here, then we have established that firstRecordInBatch <= deleteRecord.
+ // Now continue marking records which have been deleted until we reach the end of the batch
+ // or we exhaust all the delete records.
+
+ int currIndex = firstValidIndex;
+ RecordIdentifier currRecordIdInBatch = new RecordIdentifier();
+ while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) {
+ currRecordIdInBatch.setValues(
+ (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction,
+ (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket,
+ (rowId != null) ? rowId[currIndex] : repeatedRowId);
+
+ if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) {
+ // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted.
+ selectedBitSet.clear(currIndex);
+ currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index.
+ } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) {
+ // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the
+ // next record in the batch.
+ // But before that, can we short-circuit and skip the entire batch itself
+ // by checking if the deleteRecordId > lastRecordInBatch?
+ if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) {
+ return; // Yay! We short-circuited, skip everything remaining in the batch and return.
+ }
+ currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index.
+ } else {
+ // We have deleteRecordId < currRecordIdInBatch, we must now move on to find
+ // next the larger deleteRecordId that can possibly match anything in the batch.
+ isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.deleteRecords != null) {
+ this.deleteRecords.close();
+ }
+ }
+ }
+
+ /**
+ * An implementation for DeleteEventRegistry that optimizes for performance by loading
+ * all the delete events into memory at once from all the delete delta files.
+ * It starts by reading all the delete events through a regular sort merge logic
+ * into two vectors- one for original transaction id (otid), and the other for row id.
+ * (In the current version, since the bucket id should be same for all the delete deltas,
+ * it is not stored). The otids are likely to be repeated very often, as a single transaction
+ * often deletes thousands of rows. Hence, the otid vector is compressed to only store the
+ * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a
+ * record id is deleted or not, is done by performing a binary search on the
+ * compressed otid range. If a match is found, then a binary search is then performed on
+ * the larger rowId vector between the given toIndex and fromIndex. Of course, there is rough
+ * heuristic that prevents creation of an instance of this class if the memory pressure is high.
+ * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios.
+ */
+ static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {
+ /**
+ * A simple wrapper class to hold the (otid, rowId) pair.
+ */
+ static class DeleteRecordKey implements Comparable<DeleteRecordKey> {
+ private long originalTransactionId;
+ private long rowId;
+ public DeleteRecordKey() {
+ this.originalTransactionId = -1;
+ this.rowId = -1;
+ }
+ public DeleteRecordKey(long otid, long rowId) {
+ this.originalTransactionId = otid;
+ this.rowId = rowId;
+ }
+ public void set(long otid, long rowId) {
+ this.originalTransactionId = otid;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public int compareTo(DeleteRecordKey other) {
+ if (other == null) {
+ return -1;
+ }
+ if (originalTransactionId != other.originalTransactionId) {
+ return originalTransactionId < other.originalTransactionId ? -1 : 1;
+ }
+ if (rowId != other.rowId) {
+ return rowId < other.rowId ? -1 : 1;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * This class actually reads the delete delta files in vectorized row batches.
+ * For every call to next(), it returns the next smallest record id in the file if available.
+ * Internally, the next() buffers a row batch and maintains an index pointer, reading the
+ * next batch when the previous batch is exhausted.
+ */
+ static class DeleteReaderValue {
+ private VectorizedRowBatch batch;
+ private final RecordReader recordReader;
+ private int indexPtrInBatch;
+ private final int bucketForSplit; // The bucket value should be same for all the records.
+ private final ValidTxnList validTxnList;
+
+ public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
+ ValidTxnList validTxnList) throws IOException {
+ this.recordReader = deleteDeltaReader.rowsOptions(readerOptions);
+ this.bucketForSplit = bucket;
+ this.batch = deleteDeltaReader.getSchema().createRowBatch();
+ if (!recordReader.nextBatch(batch)) { // Read the first batch.
+ this.batch = null; // Oh! the first batch itself was null. Close the reader.
+ }
+ this.indexPtrInBatch = 0;
+ this.validTxnList = validTxnList;
+ }
+
+ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException {
+ if (batch == null) {
+ return false;
+ }
+ boolean isValidNext = false;
+ while (!isValidNext) {
+ if (indexPtrInBatch >= batch.size) {
+ // We have exhausted our current batch, read the next batch.
+ if (recordReader.nextBatch(batch)) {
+ // Whenever we are reading a batch, we must ensure that all the records in the batch
+ // have the same bucket id as the bucket id of the split. If not, throw exception.
+ // NOTE: this assertion might not hold, once virtual bucketing is in place. However,
+ // it should be simple to fix that case. Just replace check for bucket equality with
+ // a check for valid bucket mapping. Until virtual bucketing is added, it means
+ // either the split computation got messed up or we found some corrupted records.
+ long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+ if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating)
+ || (bucketForRecord != bucketForSplit)){
+ throw new IOException("Corrupted records with different bucket ids "
+ + "from the containing bucket file found! Expected bucket id "
+ + bucketForSplit + ", however found the bucket id " + bucketForRecord);
+ }
+ indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning.
+ } else {
+ return false; // no more batches to read, exhausted the reader.
+ }
+ }
+ int originalTransactionIndex =
+ batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+ long originalTransaction =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+ long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
+ int currentTransactionIndex =
+ batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+ long currentTransaction =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+ ++indexPtrInBatch;
+ if (validTxnList.isTxnValid(currentTransaction)) {
+ isValidNext = true;
+ deleteRecordKey.set(originalTransaction, rowId);
+ }
+ }
+ return true;
+ }
+
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+ }
+
+ /**
+ * A CompressedOtid class stores a compressed representation of the original
+ * transaction ids (otids) read from the delete delta files. Since the record ids
+ * are sorted by (otid, rowId) and otids are highly likely to be repetitive, it is
+ * efficient to compress them as a CompressedOtid that stores the fromIndex and
+ * the toIndex. These fromIndex and toIndex reference the larger vector formed by
+ * concatenating the correspondingly ordered rowIds.
+ */
+ private class CompressedOtid implements Comparable<CompressedOtid> {
+ long originalTransactionId;
+ int fromIndex; // inclusive
+ int toIndex; // exclusive
+
+ public CompressedOtid(long otid, int fromIndex, int toIndex) {
+ this.originalTransactionId = otid;
+ this.fromIndex = fromIndex;
+ this.toIndex = toIndex;
+ }
+
+ @Override
+ public int compareTo(CompressedOtid other) {
+ // When comparing the CompressedOtid, the one with the lesser value is smaller.
+ if (originalTransactionId != other.originalTransactionId) {
+ return originalTransactionId < other.originalTransactionId ? -1 : 1;
+ }
+ return 0;
+ }
+ }
+
+ private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
+ private long rowIds[];
+ private CompressedOtid compressedOtids[];
+ private ValidTxnList validTxnList;
+
+ public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
+ Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
+ int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+ this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
+ this.rowIds = null;
+ this.compressedOtids = null;
+ int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY);
+
+ try {
+ final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
+ if (deleteDeltaDirs.length > 0) {
+ int totalDeleteEventCount = 0;
+ for (Path deleteDeltaDir : deleteDeltaDirs) {
+ Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket);
+ FileSystem fs = deleteDeltaFile.getFileSystem(conf);
+ // NOTE: Calling last flush length below is more for future-proofing when we have
+ // streaming deletes. But currently we don't support streaming deletes, and this can
+ // be removed if this becomes a performance issue.
+ long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
+ // NOTE: A check for existence of deleteDeltaFile is required because we may not have
+ // deletes for the bucket being taken into consideration for this split processing.
+ if (length != -1 && fs.exists(deleteDeltaFile)) {
+ Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
+ OrcFile.readerOptions(conf).maxLength(length));
+ AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
+ if (acidStats.deletes == 0) {
+ continue; // just a safe check to ensure that we are not reading empty delete files.
+ }
+ totalDeleteEventCount += acidStats.deletes;
+ if (totalDeleteEventCount > maxEventsInMemory) {
+ // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
+ // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
+ // prevents creation of an object of this class if the total number of delete events
+ // exceed this value. By default, it has been set to 10 million delete events per bucket.
+ LOG.info("Total number of delete events exceeds the maximum number of delete events "
+ + "that can be loaded into memory for the delete deltas in the directory at : "
+ + deleteDeltaDirs.toString() +". The max limit is currently set at "
+ + maxEventsInMemory + " and can be changed by setting the Hive config variable "
+ + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
+ throw new DeleteEventsOverflowMemoryException();
+ }
+ DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
+ readerOptions, bucket, validTxnList);
+ DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+ if (deleteReaderValue.next(deleteRecordKey)) {
+ sortMerger.put(deleteRecordKey, deleteReaderValue);
+ } else {
+ deleteReaderValue.close();
+ }
+ }
+ }
+ if (totalDeleteEventCount > 0) {
+ // Initialize the rowId array when we have some delete events.
+ rowIds = new long[totalDeleteEventCount];
+ readAllDeleteEventsFromDeleteDeltas();
+ }
+ }
+ } catch(IOException|DeleteEventsOverflowMemoryException e) {
+ close(); // close any open readers, if there was some exception during initialization.
+ throw e; // rethrow the exception so that the caller can handle.
+ }
+ }
+
+ private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
+ if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read.
+ int distinctOtids = 0;
+ long lastSeenOtid = -1;
+ long otids[] = new long[rowIds.length];
+ int index = 0;
+ while (!sortMerger.isEmpty()) {
+ // The sortMerger is a heap data structure that stores a pair of
+ // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey.
+ // The deleteReaderValue is the actual wrapper class that has the reference to the
+ // underlying delta file that is being read, and its corresponding deleteRecordKey
+ // is the smallest record id for that file. In each iteration of this loop, we extract(poll)
+ // the minimum deleteRecordKey pair. Once we have processed that deleteRecordKey, we
+ // advance the pointer for the corresponding deleteReaderValue. If the underlying file
+ // itself has no more records, then we remove that pair from the heap, or else we
+ // add the updated pair back to the heap.
+ Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
+ DeleteRecordKey deleteRecordKey = entry.getKey();
+ DeleteReaderValue deleteReaderValue = entry.getValue();
+ otids[index] = deleteRecordKey.originalTransactionId;
+ rowIds[index] = deleteRecordKey.rowId;
+ ++index;
+ if (lastSeenOtid != deleteRecordKey.originalTransactionId) {
+ ++distinctOtids;
+ lastSeenOtid = deleteRecordKey.originalTransactionId;
+ }
+ if (deleteReaderValue.next(deleteRecordKey)) {
+ sortMerger.put(deleteRecordKey, deleteReaderValue);
+ } else {
+ deleteReaderValue.close(); // Exhausted reading all records, close the reader.
+ }
+ }
+
+ // Once we have processed all the delete events and seen all the distinct otids,
+ // we compress the otids into CompressedOtid data structure that records
+ // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid.
+ this.compressedOtids = new CompressedOtid[distinctOtids];
+ lastSeenOtid = otids[0];
+ int fromIndex = 0, pos = 0;
+ for (int i = 1; i < otids.length; ++i) {
+ if (otids[i] != lastSeenOtid) {
+ compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i);
+ lastSeenOtid = otids[i];
+ fromIndex = i;
+ ++pos;
+ }
+ }
+ // account for the last distinct otid
+ compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length);
+ }
+
+ private boolean isDeleted(long otid, long rowId) {
+ if (compressedOtids == null || rowIds == null) {
+ return false;
+ }
+ // To find if a given (otid, rowId) pair is deleted or not, we perform
+ // two binary searches at most. The first binary search is on the
+ // compressed otids. If a match is found, only then we do the next
+ // binary search in the larger rowId vector between the given toIndex & fromIndex.
+
+ // Check if otid is outside the range of all otids present.
+ if (otid < compressedOtids[0].originalTransactionId
+ || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) {
+ return false;
+ }
+ // Create a dummy key for searching the otid in the compressed otid ranges.
+ CompressedOtid key = new CompressedOtid(otid, -1, -1);
+ int pos = Arrays.binarySearch(compressedOtids, key);
+ if (pos >= 0) {
+ // Otid with the given value found! Searching now for rowId...
+ key = compressedOtids[pos]; // Retrieve the actual CompressedOtid that matched.
+ // Check if rowId is outside the range of all rowIds present for this otid.
+ if (rowId < rowIds[key.fromIndex]
+ || rowId > rowIds[key.toIndex - 1]) {
+ return false;
+ }
+ if (Arrays.binarySearch(rowIds, key.fromIndex, key.toIndex, rowId) >= 0) {
+ return true; // rowId also found!
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet)
+ throws IOException {
+ if (rowIds == null || compressedOtids == null) {
+ return;
+ }
+ // Iterate through the batch and for each (otid, rowid) in the batch
+ // check if it is deleted or not.
+
+ long[] originalTransactionVector =
+ batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+ long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+
+ long[] rowIdVector =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
+
+ for (int setBitIndex = selectedBitSet.nextSetBit(0);
+ setBitIndex >= 0;
+ setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
+ long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex]
+ : repeatedOriginalTransaction ;
+ long rowId = rowIdVector[setBitIndex];
+ if (isDeleted(otid, rowId)) {
+ selectedBitSet.clear(setBitIndex);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // ColumnizedDeleteEventRegistry reads all the delete events into memory during initialization
+ // and it closes the delete event readers after it. If an exception gets thrown during
+ // initialization, we may have to close any readers that are still left open.
+ while (!sortMerger.isEmpty()) {
+ Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
+ entry.getValue().close(); // close the reader for this entry
+ }
+ }
+ }
+
+ static class DeleteEventsOverflowMemoryException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
new file mode 100644
index 0000000..44a9412
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by
+ * default, and having 'transactional_properties' set to 'default'. This specifically tests the
+ * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
+ */
+public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate {
+
+ public TestTxnCommands2WithSplitUpdateAndVectorization() {
+ super();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ }
+
+ @Override
+ @Test
+ public void testFailureOnAlteringTransactionalProperties() throws Exception {
+ // Override to do nothing, as the this test is not related with vectorization.
+ // The parent class creates a temporary table in this test and alters its properties.
+ // To not override this test, that temporary table needs to be renamed. However, as
+ // mentioned this does not serve any purpose, as this test does not relate to vectorization.
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
new file mode 100644
index 0000000..4656ab2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+/**
+ * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set
+ * of delete delta files. The split is on an insert delta and there are multiple delete deltas
+ * with interleaving list of record ids that get deleted. Correctness is tested by validating
+ * that the correct set of record ids are returned in sorted order for valid transactions only.
+ */
+public class TestVectorizedOrcAcidRowBatchReader {
+
+ private static final long NUM_ROWID_PER_OTID = 15000L;
+ private static final long NUM_OTID = 10L;
+ private JobConf conf;
+ private FileSystem fs;
+ private Path root;
+
+ static class DummyRow {
+ LongWritable field;
+ RecordIdentifier ROW__ID;
+
+ DummyRow(long val) {
+ field = new LongWritable(val);
+ ROW__ID = null;
+ }
+
+ DummyRow(long val, long rowId, long origTxn, int bucket) {
+ field = new LongWritable(val);
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+ }
+
+ static String getColumnNamesProperty() {
+ return "x";
+ }
+ static String getColumnTypesProperty() {
+ return "bigint";
+ }
+
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new JobConf();
+ conf.set("bucket_count", "1");
+ conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true);
+ conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, DummyRow.getColumnNamesProperty());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, DummyRow.getColumnTypesProperty());
+ conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+ conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+ fs = FileSystem.getLocal(conf);
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ root = new Path(workDir, "TestVectorizedOrcAcidRowBatch.testDump");
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ int bucket = 0;
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .bucket(bucket)
+ .writingBase(false)
+ .minimumTransactionId(1)
+ .maximumTransactionId(NUM_OTID)
+ .inspector(inspector)
+ .reporter(Reporter.NULL)
+ .recordIdColumn(1)
+ .finalDestination(root);
+ RecordUpdater updater = new OrcRecordUpdater(root, options);
+ // Create a single insert delta with 150,000 rows, with 15000 rowIds per original transaction id.
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; ++j) {
+ long payload = (i-1) * NUM_ROWID_PER_OTID + j;
+ updater.insert(i, new DummyRow(payload, j, i, bucket));
+ }
+ }
+ updater.close(false);
+
+ // Now create three types of delete deltas- first has rowIds divisible by 2 but not by 3,
+ // second has rowIds divisible by 3 but not by 2, and the third has rowIds divisible by
+ // both 2 and 3. This should produce delete deltas that will thoroughly test the sort-merge
+ // logic when the delete events in the delete delta files interleave in the sort order.
+
+ // Create a delete delta that has rowIds divisible by 2 but not by 3. This will produce
+ // a delete delta file with 50,000 delete events.
+ long currTxnId = NUM_OTID + 1;
+ options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+ updater = new OrcRecordUpdater(root, options);
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+ if (j % 2 == 0 && j % 3 != 0) {
+ updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+ }
+ }
+ }
+ updater.close(false);
+ // Now, create a delete delta that has rowIds divisible by 3 but not by 2. This will produce
+ // a delete delta file with 25,000 delete events.
+ currTxnId = NUM_OTID + 2;
+ options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+ updater = new OrcRecordUpdater(root, options);
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+ if (j % 2 != 0 && j % 3 == 0) {
+ updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+ }
+ }
+ }
+ updater.close(false);
+ // Now, create a delete delta that has rowIds divisible by both 3 and 2. This will produce
+ // a delete delta file with 25,000 delete events.
+ currTxnId = NUM_OTID + 3;
+ options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+ updater = new OrcRecordUpdater(root, options);
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+ if (j % 2 == 0 && j % 3 == 0) {
+ updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+ }
+ }
+ }
+ updater.close(false);
+ }
+
+ private List<OrcSplit> getSplits() throws Exception {
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+ OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
+ OrcInputFormat.AcidDirInfo adi = gen.call();
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
+ null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ null, null, true);
+ assertEquals(1, splitStrategies.size());
+ List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(1, splits.size());
+ assertEquals("file://" + root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
+ splits.get(0).getPath().toUri().toString());
+ assertFalse(splits.get(0).isOriginal());
+ return splits;
+ }
+
+ @Test
+ public void testVectorizedOrcAcidRowBatchReader() throws Exception {
+ testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());
+
+ // To test the SortMergedDeleteEventRegistry, we need to explicitly set the
+ // HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
+ int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
+ conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
+ testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());
+
+ // Restore the old value.
+ conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
+ }
+
+
+ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) throws Exception {
+ List<OrcSplit> splits = getSplits();
+ // Mark one of the transactions as an exception to test that invalid transactions
+ // are being handled properly.
+ conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5
+
+ VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL);
+ if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) {
+ assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry);
+ }
+ if (deleteEventRegistry.equals(SortMergedDeleteEventRegistry.class.getName())) {
+ assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof SortMergedDeleteEventRegistry);
+ }
+ TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+ VectorizedRowBatch vectorizedRowBatch = schema.createRowBatch();
+ vectorizedRowBatch.setPartitionInfo(1, 0); // set data column count as 1.
+ long previousPayload = Long.MIN_VALUE;
+ while (vectorizedReader.next(null, vectorizedRowBatch)) {
+ assertTrue(vectorizedRowBatch.selectedInUse);
+ LongColumnVector col = (LongColumnVector) vectorizedRowBatch.cols[0];
+ for (int i = 0; i < vectorizedRowBatch.size; ++i) {
+ int idx = vectorizedRowBatch.selected[i];
+ long payload = col.vector[idx];
+ long otid = (payload / NUM_ROWID_PER_OTID) + 1;
+ long rowId = payload % NUM_ROWID_PER_OTID;
+ assertFalse(rowId % 2 == 0 || rowId % 3 == 0);
+ assertTrue(otid != 5); // Check that txn#5 has been excluded.
+ assertTrue(payload > previousPayload); // Check that the data is in sorted order.
+ previousPayload = payload;
+ }
+ }
+ }
+
+ @Test
+ public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
+ OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getLegacy().toInt());
+ // Test false when trying to create a vectorized ACID row batch reader for a legacy table.
+ assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ Mockito.when(mockSplit.isOriginal()).thenReturn(true);
+ // Test false when trying to create a vectorized ACID row batch reader when reading originals.
+ assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+
+ // A positive test case.
+ Mockito.when(mockSplit.isOriginal()).thenReturn(false);
+ assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/queries/clientpositive/acid_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization.q b/ql/src/test/queries/clientpositive/acid_vectorization.q
index 832909b..4c37563 100644
--- a/ql/src/test/queries/clientpositive/acid_vectorization.q
+++ b/ql/src/test/queries/clientpositive/acid_vectorization.q
@@ -15,3 +15,15 @@ set hive.vectorized.execution.enabled=true;
delete from acid_vectorized where b = 'foo';
set hive.vectorized.execution.enabled=true;
select a, b from acid_vectorized order by a, b;
+
+
+CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10;
+set hive.vectorized.execution.enabled=true;
+insert into table acid_fast_vectorized values (1, 'bar');
+set hive.vectorized.execution.enabled=true;
+update acid_fast_vectorized set b = 'foo' where b = 'bar';
+set hive.vectorized.execution.enabled=true;
+delete from acid_fast_vectorized where b = 'foo';
+set hive.vectorized.execution.enabled=true;
+select a, b from acid_fast_vectorized order by a, b;
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/results/clientpositive/acid_vectorization.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_vectorization.q.out b/ql/src/test/results/clientpositive/acid_vectorization.q.out
index 1792979..24330bd 100644
--- a/ql/src/test/results/clientpositive/acid_vectorization.q.out
+++ b/ql/src/test/results/clientpositive/acid_vectorization.q.out
@@ -60,3 +60,65 @@ POSTHOOK: Input: default@acid_vectorized
-1070883071 0ruyd6Y50JpdGRf6HqD
-1070551679 iUR3Q
-1069736047 k17Am8uPHWk02cEf1jet
+PREHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: Lineage: acid_fast_vectorized.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+PREHOOK: query: insert into table acid_fast_vectorized values (1, 'bar')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: insert into table acid_fast_vectorized values (1, 'bar')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: Lineage: acid_fast_vectorized.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: delete from acid_fast_vectorized where b = 'foo'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: delete from acid_fast_vectorized where b = 'foo'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: select a, b from acid_fast_vectorized order by a, b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+#### A masked pattern was here ####
+POSTHOOK: query: select a, b from acid_fast_vectorized order by a, b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+#### A masked pattern was here ####
+-1073279343 oj1YrV5Wa
+-1073051226 A34p7oRr2WvUJNf
+-1072910839 0iqrc5
+-1072081801 dPkN74F7
+-1072076362 2uLyD28144vklju213J1mr
+-1071480828 aw724t8c5558x2xneC624
+-1071363017 Anj0oF
+-1070883071 0ruyd6Y50JpdGRf6HqD
+-1070551679 iUR3Q
+-1069736047 k17Am8uPHWk02cEf1jet
[8/9] hive git commit: HIVE-14671 : merge master into hive-14535
(Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2cef25db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2cef25db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2cef25db
Branch: refs/heads/hive-14535
Commit: 2cef25db66161fa83921913031cd8f408f8b83e2
Parents: 67f1b92 3187fb3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 31 16:04:07 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 31 16:04:07 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../apache/hadoop/hive/ql/TestMTQueries.java | 3 +-
.../test/resources/testconfiguration.properties | 3 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 11 +-
.../hadoop/hive/metastore/txn/TestTxnUtils.java | 6 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 134 +--
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 7 +-
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 822 ++++++++++++++++++
.../hive/ql/metadata/HiveMetaStoreChecker.java | 16 +-
.../ql/optimizer/pcr/PcrExprProcFactory.java | 423 +++++----
.../hadoop/hive/ql/parse/ParseContext.java | 25 +
.../hive/ql/udf/generic/GenericUDFAbs.java | 3 +
...ommands2WithSplitUpdateAndVectorization.java | 52 ++
.../TestVectorizedOrcAcidRowBatchReader.java | 263 ++++++
.../hive/ql/udf/generic/TestGenericUDFAbs.java | 6 +
.../queries/clientpositive/acid_vectorization.q | 12 +
ql/src/test/queries/clientpositive/orc_llap.q | 76 +-
.../partition_condition_remover.q | 13 +
.../clientpositive/acid_vectorization.q.out | 62 ++
.../results/clientpositive/llap/orc_llap.q.out | 866 ++++++++++---------
.../clientpositive/orc_merge_diff_fs.q.out | 18 +-
.../partition_condition_remover.q.out | 79 ++
ql/src/test/results/clientpositive/pcs.q.out | 4 +-
23 files changed, 2163 insertions(+), 747 deletions(-)
----------------------------------------------------------------------
[2/9] hive git commit: HIVE-14290: Refactor HIVE-14054 to use
Collections#newSetFromMap (Peter Slawski via Ashutosh Chauhan)
Posted by se...@apache.org.
HIVE-14290: Refactor HIVE-14054 to use Collections#newSetFromMap (Peter Slawski via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ab605910
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ab605910
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ab605910
Branch: refs/heads/hive-14535
Commit: ab605910e45cfa65b9895399f09194801f2cc091
Parents: ec4673b
Author: Peter Slawski <pe...@amazon.com>
Authored: Mon Jun 20 10:18:25 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Aug 30 17:52:43 2016 -0700
----------------------------------------------------------------------
.../hive/ql/metadata/HiveMetaStoreChecker.java | 16 +++++-----------
1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ab605910/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 34b76b8..13d9651 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -379,9 +379,7 @@ public class HiveMetaStoreChecker {
private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
ConcurrentLinkedQueue<Path> basePaths = new ConcurrentLinkedQueue<>();
basePaths.add(basePath);
- // we only use the keySet of ConcurrentHashMap
- // Neither the key nor the value can be null.
- Map<Path, Object> dirSet = new ConcurrentHashMap<>();
+ Set<Path> dirSet = Collections.newSetFromMap(new ConcurrentHashMap<Path, Boolean>());
// Here we just reuse the THREAD_COUNT configuration for
// HIVE_MOVE_FILES_THREAD_COUNT
final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors
@@ -396,12 +394,12 @@ public class HiveMetaStoreChecker {
}
checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth);
pool.shutdown();
- allDirs.addAll(dirSet.keySet());
+ allDirs.addAll(dirSet);
}
// process the basePaths in parallel and then the next level of basePaths
private void checkPartitionDirs(final ExecutorService pool,
- final ConcurrentLinkedQueue<Path> basePaths, final Map<Path, Object> allDirs,
+ final ConcurrentLinkedQueue<Path> basePaths, final Set<Path> allDirs,
final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException {
final ConcurrentLinkedQueue<Path> nextLevel = new ConcurrentLinkedQueue<>();
if (null == pool) {
@@ -437,9 +435,7 @@ public class HiveMetaStoreChecker {
}
}
} else {
- // true is just a boolean object place holder because neither the
- // key nor the value can be null.
- allDirs.put(path, true);
+ allDirs.add(path);
}
}
} else {
@@ -483,9 +479,7 @@ public class HiveMetaStoreChecker {
}
}
} else {
- // true is just a boolean object place holder because neither the
- // key nor the value can be null.
- allDirs.put(path, true);
+ allDirs.add(path);
}
return null;
}
[7/9] hive git commit: HIVE-14652 : incorrect results for not in on
partition columns (Sergey Shelukhin, reviewed by Jesus Camacho Rodriguez)
Posted by se...@apache.org.
HIVE-14652 : incorrect results for not in on partition columns (Sergey Shelukhin, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3187fb36
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3187fb36
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3187fb36
Branch: refs/heads/hive-14535
Commit: 3187fb369fb6e27a542edbc110df528a444987fa
Parents: 8f5dee8
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 31 15:36:47 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 31 15:42:40 2016 -0700
----------------------------------------------------------------------
.../ql/optimizer/pcr/PcrExprProcFactory.java | 423 +++++++++----------
.../hadoop/hive/ql/parse/ParseContext.java | 25 ++
.../partition_condition_remover.q | 13 +
.../partition_condition_remover.q.out | 79 ++++
ql/src/test/results/clientpositive/pcs.q.out | 4 +-
5 files changed, 330 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
index f9388e2..461dbe5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer.pcr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -185,7 +187,7 @@ public final class PcrExprProcFactory {
}
public enum WalkState {
- PART_COL, TRUE, FALSE, CONSTANT, UNKNOWN, DIVIDED
+ PART_COL, TRUE, FALSE, CONSTANT, UNKNOWN, DIVIDED, PART_COL_STRUCT
}
public static class NodeInfoWrapper {
@@ -253,242 +255,239 @@ public final class PcrExprProcFactory {
Object... nodeOutputs) throws SemanticException {
PcrExprProcCtx ctx = (PcrExprProcCtx) procCtx;
ExprNodeGenericFuncDesc fd = (ExprNodeGenericFuncDesc) nd;
+ if (LOG.isDebugEnabled()) {
+ String err = "Processing " + fd.getExprString() + " "
+ + fd.getGenericUDF().getUdfName() + " outputs ";
+ for (Object child : nodeOutputs) {
+ NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
+ err += "{" + wrapper.state + ", " + wrapper.outExpr + "}, ";
+ }
+ LOG.debug(err);
+ }
if (FunctionRegistry.isOpNot(fd)) {
- assert (nodeOutputs.length == 1);
- NodeInfoWrapper wrapper = (NodeInfoWrapper) nodeOutputs[0];
- if (wrapper.state == WalkState.TRUE) {
- ExprNodeConstantDesc falseDesc = new ExprNodeConstantDesc(
- wrapper.outExpr.getTypeInfo(), Boolean.FALSE);
- return new NodeInfoWrapper(WalkState.FALSE, null, falseDesc);
- } else if (wrapper.state == WalkState.FALSE) {
- ExprNodeConstantDesc trueDesc = new ExprNodeConstantDesc(
- wrapper.outExpr.getTypeInfo(), Boolean.TRUE);
- return new NodeInfoWrapper(WalkState.TRUE, null, trueDesc);
- } else if (wrapper.state == WalkState.DIVIDED) {
+ return handleUdfNot(ctx, fd, nodeOutputs);
+ } else if (FunctionRegistry.isOpAnd(fd)) {
+ return handleUdfAnd(ctx, fd, nodeOutputs);
+ } else if (FunctionRegistry.isOpOr(fd)) {
+ return handleUdfOr(ctx, fd, nodeOutputs);
+ } else if (FunctionRegistry.isIn(fd)) {
+ List<ExprNodeDesc> children = fd.getChildren();
+ // We should not remove the dynamic partition pruner generated synthetic predicates.
+ for (int i = 1; i < children.size(); i++) {
+ if (children.get(i) instanceof ExprNodeDynamicListDesc) {
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+ }
+ }
+ // Otherwise, handle like a normal generic UDF.
+ return handleDeterministicUdf(ctx, fd, nodeOutputs);
+ } else if (fd.getGenericUDF() instanceof GenericUDFStruct) {
+ // Handle structs composed of partition columns,
+ for (Object child : nodeOutputs) {
+ NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
+ if (wrapper.state != WalkState.PART_COL) {
+ return handleDeterministicUdf(ctx, fd, nodeOutputs); // Giving up.
+ }
+ }
+ return new NodeInfoWrapper(WalkState.PART_COL_STRUCT, null, getOutExpr(fd, nodeOutputs));
+ } else if (!FunctionRegistry.isDeterministic(fd.getGenericUDF())) {
+ // If it's a non-deterministic UDF, set unknown to true
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+ } else {
+ return handleDeterministicUdf(ctx, fd, nodeOutputs);
+ }
+ }
+
+ private Object handleDeterministicUdf(PcrExprProcCtx ctx,
+ ExprNodeGenericFuncDesc fd, Object... nodeOutputs)
+ throws SemanticException {
+ Boolean has_part_col = checkForPartColsAndUnknown(fd, nodeOutputs);
+ if (has_part_col == null) {
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+ }
+
+ if (has_part_col && fd.getTypeInfo().getCategory() == Category.PRIMITIVE) {
+ // we need to evaluate result for every pruned partition
+ if (fd.getTypeInfo().equals(TypeInfoFactory.booleanTypeInfo)) {
+ // if the return type of the GenericUDF is boolean and all partitions agree on
+ // a result, we update the state of the node to be TRUE of FALSE
Boolean[] results = new Boolean[ctx.getPartList().size()];
for (int i = 0; i < ctx.getPartList().size(); i++) {
- results[i] = opNot(wrapper.ResultVector[i]);
+ results[i] = (Boolean) evalExprWithPart(fd, ctx.getPartList().get(i),
+ ctx.getVirtualColumns());
}
- return new NodeInfoWrapper(WalkState.DIVIDED, results,
- getOutExpr(fd, nodeOutputs));
+ return getResultWrapFromResults(results, fd, nodeOutputs);
+ }
+
+ // the case that return type of the GenericUDF is not boolean, and if not all partition
+ // agree on result, we make the node UNKNOWN. If they all agree, we replace the node
+ // to be a CONSTANT node with value to be the agreed result.
+ Object[] results = new Object[ctx.getPartList().size()];
+ for (int i = 0; i < ctx.getPartList().size(); i++) {
+ results[i] = evalExprWithPart(fd, ctx.getPartList().get(i), ctx.getVirtualColumns());
+ }
+ Object result = ifResultsAgree(results);
+ if (result == null) {
+ // if the result is not boolean and not all partition agree on the
+ // result, we don't remove the condition. Potentially, it can miss
+ // the case like "where ds % 3 == 1 or ds % 3 == 2"
+ // TODO: handle this case by making result vector to handle all
+ // constant values.
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
+ }
+ return new NodeInfoWrapper(WalkState.CONSTANT, null,
+ new ExprNodeConstantDesc(fd.getTypeInfo(), result));
+ }
+
+ // Try to fold, otherwise return the expression itself
+ final ExprNodeGenericFuncDesc desc = getOutExpr(fd, nodeOutputs);
+ final ExprNodeDesc foldedDesc = ConstantPropagateProcFactory.foldExpr(desc);
+ if (foldedDesc != null && foldedDesc instanceof ExprNodeConstantDesc) {
+ ExprNodeConstantDesc constant = (ExprNodeConstantDesc) foldedDesc;
+ if (Boolean.TRUE.equals(constant.getValue())) {
+ return new NodeInfoWrapper(WalkState.TRUE, null, constant);
+ } else if (Boolean.FALSE.equals(constant.getValue())) {
+ return new NodeInfoWrapper(WalkState.FALSE, null, constant);
} else {
- return new NodeInfoWrapper(wrapper.state, null,
- getOutExpr(fd, nodeOutputs));
+ return new NodeInfoWrapper(WalkState.CONSTANT, null, constant);
}
- } else if (FunctionRegistry.isOpAnd(fd)) {
- boolean anyUnknown = false; // Whether any of the node outputs is unknown
- boolean allDivided = true; // Whether all of the node outputs are divided
- List<NodeInfoWrapper> newNodeOutputsList =
- new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
- for (int i = 0; i < nodeOutputs.length; i++) {
- NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
- if (c.state == WalkState.FALSE) {
- return c;
- }
- if (c.state == WalkState.UNKNOWN) {
- anyUnknown = true;
- }
- if (c.state != WalkState.DIVIDED) {
- allDivided = false;
- }
- if (c.state != WalkState.TRUE) {
- newNodeOutputsList.add(c);
- }
+ }
+ return new NodeInfoWrapper(WalkState.CONSTANT, null, desc);
+ }
+
+ private Boolean checkForPartColsAndUnknown(ExprNodeGenericFuncDesc fd,
+ Object... nodeOutputs) {
+ boolean has_part_col = false;
+ for (Object child : nodeOutputs) {
+ NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
+ if (wrapper.state == WalkState.UNKNOWN) {
+ return null;
+ } else if (wrapper.state == WalkState.PART_COL
+ || wrapper.state == WalkState.PART_COL_STRUCT) {
+ has_part_col = true;
}
- // If all of them were true, return true
- if (newNodeOutputsList.size() == 0) {
- return new NodeInfoWrapper(WalkState.TRUE, null,
- new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.TRUE));
+ }
+ return has_part_col;
+ }
+
+ private Object handleUdfOr(PcrExprProcCtx ctx, ExprNodeGenericFuncDesc fd,
+ Object... nodeOutputs) {
+ boolean anyUnknown = false; // Whether any of the node outputs is unknown
+ boolean allDivided = true; // Whether all of the node outputs are divided
+ List<NodeInfoWrapper> newNodeOutputsList =
+ new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
+ for (int i = 0; i< nodeOutputs.length; i++) {
+ NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
+ if (c.state == WalkState.TRUE) {
+ return c;
}
- // If we are left with a single child, return the child
- if (newNodeOutputsList.size() == 1) {
- return newNodeOutputsList.get(0);
+ if (c.state == WalkState.UNKNOWN) {
+ anyUnknown = true;
}
- Object[] newNodeOutputs = newNodeOutputsList.toArray();
- if (anyUnknown) {
- return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+ if (c.state != WalkState.DIVIDED) {
+ allDivided = false;
}
- if (allDivided) {
- Boolean[] results = new Boolean[ctx.getPartList().size()];
- for (int i = 0; i < ctx.getPartList().size(); i++) {
- Boolean[] andArray = new Boolean[newNodeOutputs.length];
- for (int j = 0; j < newNodeOutputs.length; j++) {
- andArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
- }
- results[i] = opAnd(andArray);
- }
- return getResultWrapFromResults(results, fd, newNodeOutputs);
+ if (c.state != WalkState.FALSE) {
+ newNodeOutputsList.add(c);
}
+ }
+ // If all of them were false, return false
+ if (newNodeOutputsList.size() == 0) {
+ return new NodeInfoWrapper(WalkState.FALSE, null,
+ new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.FALSE));
+ }
+ // If we are left with a single child, return the child
+ if (newNodeOutputsList.size() == 1) {
+ return newNodeOutputsList.get(0);
+ }
+ Object[] newNodeOutputs = newNodeOutputsList.toArray();
+ if (anyUnknown) {
return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
- } else if (FunctionRegistry.isOpOr(fd)) {
- boolean anyUnknown = false; // Whether any of the node outputs is unknown
- boolean allDivided = true; // Whether all of the node outputs are divided
- List<NodeInfoWrapper> newNodeOutputsList =
- new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
- for (int i = 0; i< nodeOutputs.length; i++) {
- NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
- if (c.state == WalkState.TRUE) {
- return c;
- }
- if (c.state == WalkState.UNKNOWN) {
- anyUnknown = true;
- }
- if (c.state != WalkState.DIVIDED) {
- allDivided = false;
- }
- if (c.state != WalkState.FALSE) {
- newNodeOutputsList.add(c);
+ }
+ if (allDivided) {
+ Boolean[] results = new Boolean[ctx.getPartList().size()];
+ for (int i = 0; i < ctx.getPartList().size(); i++) {
+ Boolean[] orArray = new Boolean[newNodeOutputs.length];
+ for (int j = 0; j < newNodeOutputs.length; j++) {
+ orArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
}
+ results[i] = opOr(orArray);
}
- // If all of them were false, return false
- if (newNodeOutputsList.size() == 0) {
- return new NodeInfoWrapper(WalkState.FALSE, null,
- new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.FALSE));
+ return getResultWrapFromResults(results, fd, newNodeOutputs);
+ }
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+ }
+
+ private Object handleUdfAnd(PcrExprProcCtx ctx, ExprNodeGenericFuncDesc fd,
+ Object... nodeOutputs) {
+ boolean anyUnknown = false; // Whether any of the node outputs is unknown
+ boolean allDivided = true; // Whether all of the node outputs are divided
+ List<NodeInfoWrapper> newNodeOutputsList =
+ new ArrayList<NodeInfoWrapper>(nodeOutputs.length);
+ for (int i = 0; i < nodeOutputs.length; i++) {
+ NodeInfoWrapper c = (NodeInfoWrapper)nodeOutputs[i];
+ if (c.state == WalkState.FALSE) {
+ return c;
}
- // If we are left with a single child, return the child
- if (newNodeOutputsList.size() == 1) {
- return newNodeOutputsList.get(0);
+ if (c.state == WalkState.UNKNOWN) {
+ anyUnknown = true;
}
- Object[] newNodeOutputs = newNodeOutputsList.toArray();
- if (anyUnknown) {
- return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+ if (c.state != WalkState.DIVIDED) {
+ allDivided = false;
}
- if (allDivided) {
- Boolean[] results = new Boolean[ctx.getPartList().size()];
- for (int i = 0; i < ctx.getPartList().size(); i++) {
- Boolean[] orArray = new Boolean[newNodeOutputs.length];
- for (int j = 0; j < newNodeOutputs.length; j++) {
- orArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
- }
- results[i] = opOr(orArray);
- }
- return getResultWrapFromResults(results, fd, newNodeOutputs);
+ if (c.state != WalkState.TRUE) {
+ newNodeOutputsList.add(c);
}
+ }
+ // If all of them were true, return true
+ if (newNodeOutputsList.size() == 0) {
+ return new NodeInfoWrapper(WalkState.TRUE, null,
+ new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.TRUE));
+ }
+ // If we are left with a single child, return the child
+ if (newNodeOutputsList.size() == 1) {
+ return newNodeOutputsList.get(0);
+ }
+ Object[] newNodeOutputs = newNodeOutputsList.toArray();
+ if (anyUnknown) {
return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
- } else if (FunctionRegistry.isIn(fd)) {
- List<ExprNodeDesc> children = fd.getChildren();
- boolean removePredElem = false;
- ExprNodeDesc lhs = children.get(0);
-
- if (lhs instanceof ExprNodeColumnDesc) {
- // It is an IN clause on a column
- if (((ExprNodeColumnDesc)lhs).getIsPartitionColOrVirtualCol()) {
- // It is a partition column, we can proceed
- removePredElem = true;
- }
- if (removePredElem) {
- // We should not remove the dynamic partition pruner generated synthetic predicates.
- for (int i = 1; i < children.size(); i++) {
- if (children.get(i) instanceof ExprNodeDynamicListDesc) {
- removePredElem = false;
- break;
- }
- }
- }
- } else if (lhs instanceof ExprNodeGenericFuncDesc) {
- // It is an IN clause on a struct
- // Make sure that the generic udf is deterministic
- if (FunctionRegistry.isDeterministic(((ExprNodeGenericFuncDesc) lhs)
- .getGenericUDF())) {
- boolean hasOnlyPartCols = true;
- boolean hasDynamicListDesc = false;
-
- for (ExprNodeDesc ed : ((ExprNodeGenericFuncDesc) lhs).getChildren()) {
- // Check if the current field expression contains only
- // partition column or a virtual column or constants.
- // If yes, this filter predicate is a candidate for this optimization.
- if (!(ed instanceof ExprNodeColumnDesc &&
- ((ExprNodeColumnDesc)ed).getIsPartitionColOrVirtualCol())) {
- hasOnlyPartCols = false;
- break;
- }
- }
-
- // If we have non-partition columns, we cannot remove the predicate.
- if (hasOnlyPartCols) {
- // We should not remove the dynamic partition pruner generated synthetic predicates.
- for (int i = 1; i < children.size(); i++) {
- if (children.get(i) instanceof ExprNodeDynamicListDesc) {
- hasDynamicListDesc = true;
- break;
- }
- }
- }
-
- removePredElem = hasOnlyPartCols && !hasDynamicListDesc;
+ }
+ if (allDivided) {
+ Boolean[] results = new Boolean[ctx.getPartList().size()];
+ for (int i = 0; i < ctx.getPartList().size(); i++) {
+ Boolean[] andArray = new Boolean[newNodeOutputs.length];
+ for (int j = 0; j < newNodeOutputs.length; j++) {
+ andArray[j] = ((NodeInfoWrapper) newNodeOutputs[j]).ResultVector[i];
}
+ results[i] = opAnd(andArray);
}
+ return getResultWrapFromResults(results, fd, newNodeOutputs);
+ }
+ return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, newNodeOutputs));
+ }
- // If removePredElem is set to true, return true as this is a potential candidate
- // for partition condition remover. Else, set the WalkState for this node to unknown.
- return removePredElem ?
- new NodeInfoWrapper(WalkState.TRUE, null,
- new ExprNodeConstantDesc(fd.getTypeInfo(), Boolean.TRUE)) :
- new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs)) ;
- } else if (!FunctionRegistry.isDeterministic(fd.getGenericUDF())) {
- // If it's a non-deterministic UDF, set unknown to true
- return new NodeInfoWrapper(WalkState.UNKNOWN, null,
+ private Object handleUdfNot(PcrExprProcCtx ctx, ExprNodeGenericFuncDesc fd,
+ Object... nodeOutputs) {
+ assert (nodeOutputs.length == 1);
+ NodeInfoWrapper wrapper = (NodeInfoWrapper) nodeOutputs[0];
+ if (wrapper.state == WalkState.TRUE) {
+ ExprNodeConstantDesc falseDesc = new ExprNodeConstantDesc(
+ wrapper.outExpr.getTypeInfo(), Boolean.FALSE);
+ return new NodeInfoWrapper(WalkState.FALSE, null, falseDesc);
+ } else if (wrapper.state == WalkState.FALSE) {
+ ExprNodeConstantDesc trueDesc = new ExprNodeConstantDesc(
+ wrapper.outExpr.getTypeInfo(), Boolean.TRUE);
+ return new NodeInfoWrapper(WalkState.TRUE, null, trueDesc);
+ } else if (wrapper.state == WalkState.DIVIDED) {
+ Boolean[] results = new Boolean[ctx.getPartList().size()];
+ for (int i = 0; i < ctx.getPartList().size(); i++) {
+ results[i] = opNot(wrapper.ResultVector[i]);
+ }
+ return new NodeInfoWrapper(WalkState.DIVIDED, results,
getOutExpr(fd, nodeOutputs));
} else {
- // If any child is unknown, set unknown to true
- boolean has_part_col = false;
- for (Object child : nodeOutputs) {
- NodeInfoWrapper wrapper = (NodeInfoWrapper) child;
- if (wrapper.state == WalkState.UNKNOWN) {
- return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
- } else if (wrapper.state == WalkState.PART_COL) {
- has_part_col = true;
- }
- }
-
- if (has_part_col && fd.getTypeInfo().getCategory() == Category.PRIMITIVE) {
- // we need to evaluate result for every pruned partition
- if (fd.getTypeInfo().equals(TypeInfoFactory.booleanTypeInfo)) {
- // if the return type of the GenericUDF is boolean and all partitions agree on
- // a result, we update the state of the node to be TRUE of FALSE
- Boolean[] results = new Boolean[ctx.getPartList().size()];
- for (int i = 0; i < ctx.getPartList().size(); i++) {
- results[i] = (Boolean) evalExprWithPart(fd, ctx.getPartList().get(i),
- ctx.getVirtualColumns());
- }
- return getResultWrapFromResults(results, fd, nodeOutputs);
- }
-
- // the case that return type of the GenericUDF is not boolean, and if not all partition
- // agree on result, we make the node UNKNOWN. If they all agree, we replace the node
- // to be a CONSTANT node with value to be the agreed result.
- Object[] results = new Object[ctx.getPartList().size()];
- for (int i = 0; i < ctx.getPartList().size(); i++) {
- results[i] = evalExprWithPart(fd, ctx.getPartList().get(i), ctx.getVirtualColumns());
- }
- Object result = ifResultsAgree(results);
- if (result == null) {
- // if the result is not boolean and not all partition agree on the
- // result, we don't remove the condition. Potentially, it can miss
- // the case like "where ds % 3 == 1 or ds % 3 == 2"
- // TODO: handle this case by making result vector to handle all
- // constant values.
- return new NodeInfoWrapper(WalkState.UNKNOWN, null, getOutExpr(fd, nodeOutputs));
- }
- return new NodeInfoWrapper(WalkState.CONSTANT, null,
- new ExprNodeConstantDesc(fd.getTypeInfo(), result));
- }
-
- // Try to fold, otherwise return the expression itself
- final ExprNodeGenericFuncDesc desc = getOutExpr(fd, nodeOutputs);
- final ExprNodeDesc foldedDesc = ConstantPropagateProcFactory.foldExpr(desc);
- if (foldedDesc != null && foldedDesc instanceof ExprNodeConstantDesc) {
- ExprNodeConstantDesc constant = (ExprNodeConstantDesc) foldedDesc;
- if (Boolean.TRUE.equals(constant.getValue())) {
- return new NodeInfoWrapper(WalkState.TRUE, null, constant);
- } else if (Boolean.FALSE.equals(constant.getValue())) {
- return new NodeInfoWrapper(WalkState.FALSE, null, constant);
- } else {
- return new NodeInfoWrapper(WalkState.CONSTANT, null, constant);
- }
- }
- return new NodeInfoWrapper(WalkState.CONSTANT, null, desc);
+ return new NodeInfoWrapper(wrapper.state, null,
+ getOutExpr(fd, nodeOutputs));
}
}
};
http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index b2125ca..4353d3a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.QueryState;
@@ -620,4 +623,26 @@ public class ParseContext {
List<ColumnStatsAutoGatherContext> columnStatsAutoGatherContexts) {
this.columnStatsAutoGatherContexts = columnStatsAutoGatherContexts;
}
+
+ public Collection<Operator> getAllOps() {
+ List<Operator> ops = new ArrayList<>();
+ Set<Operator> visited = new HashSet<Operator>();
+ for (Operator<?> op : getTopOps().values()) {
+ getAllOps(ops, visited, op);
+ }
+ return ops;
+ }
+
+ private static void getAllOps(List<Operator> builder, Set<Operator> visited, Operator<?> op) {
+ boolean added = visited.add(op);
+ builder.add(op);
+ if (!added) return;
+ if (op.getNumChild() > 0) {
+ List<Operator<?>> children = op.getChildOperators();
+ for (int i = 0; i < children.size(); i++) {
+ getAllOps(builder, visited, children.get(i));
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/test/queries/clientpositive/partition_condition_remover.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/partition_condition_remover.q b/ql/src/test/queries/clientpositive/partition_condition_remover.q
new file mode 100644
index 0000000..39e58b8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/partition_condition_remover.q
@@ -0,0 +1,13 @@
+
+drop table foo;
+
+create table foo (i int) partitioned by (s string);
+
+insert overwrite table foo partition(s='foo') select cint from alltypesorc limit 10;
+insert overwrite table foo partition(s='bar') select cint from alltypesorc limit 10;
+
+explain select * from foo where s not in ('bar');
+select * from foo where s not in ('bar');
+
+
+drop table foo;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/test/results/clientpositive/partition_condition_remover.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/partition_condition_remover.q.out b/ql/src/test/results/clientpositive/partition_condition_remover.q.out
new file mode 100644
index 0000000..2f8f998
--- /dev/null
+++ b/ql/src/test/results/clientpositive/partition_condition_remover.q.out
@@ -0,0 +1,79 @@
+PREHOOK: query: drop table foo
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table foo
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table foo (i int) partitioned by (s string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@foo
+POSTHOOK: query: create table foo (i int) partitioned by (s string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@foo
+PREHOOK: query: insert overwrite table foo partition(s='foo') select cint from alltypesorc limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@foo@s=foo
+POSTHOOK: query: insert overwrite table foo partition(s='foo') select cint from alltypesorc limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@foo@s=foo
+POSTHOOK: Lineage: foo PARTITION(s=foo).i SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+PREHOOK: query: insert overwrite table foo partition(s='bar') select cint from alltypesorc limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@foo@s=bar
+POSTHOOK: query: insert overwrite table foo partition(s='bar') select cint from alltypesorc limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@foo@s=bar
+POSTHOOK: Lineage: foo PARTITION(s=bar).i SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+PREHOOK: query: explain select * from foo where s not in ('bar')
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from foo where s not in ('bar')
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: foo
+ Statistics: Num rows: 10 Data size: 90 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: i (type: int), s (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 10 Data size: 90 Basic stats: COMPLETE Column stats: NONE
+ ListSink
+
+PREHOOK: query: select * from foo where s not in ('bar')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@foo
+PREHOOK: Input: default@foo@s=foo
+#### A masked pattern was here ####
+POSTHOOK: query: select * from foo where s not in ('bar')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@foo
+POSTHOOK: Input: default@foo@s=foo
+#### A masked pattern was here ####
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+528534767 foo
+PREHOOK: query: drop table foo
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@foo
+PREHOOK: Output: default@foo
+POSTHOOK: query: drop table foo
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@foo
+POSTHOOK: Output: default@foo
http://git-wip-us.apache.org/repos/asf/hive/blob/3187fb36/ql/src/test/results/clientpositive/pcs.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/pcs.q.out b/ql/src/test/results/clientpositive/pcs.q.out
index 0045c1c..d409eb5 100644
--- a/ql/src/test/results/clientpositive/pcs.q.out
+++ b/ql/src/test/results/clientpositive/pcs.q.out
@@ -1047,7 +1047,7 @@ STAGE PLANS:
GatherStats: false
Filter Operator
isSamplingPred: false
- predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (ds = '2008-04-08')) (type: boolean)
+ predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (struct(ds)) IN (const struct('2000-04-08'), const struct('2000-04-09')) and (ds = '2008-04-08')) (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: key (type: int), value (type: string)
@@ -1072,7 +1072,7 @@ STAGE PLANS:
GatherStats: false
Filter Operator
isSamplingPred: false
- predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (ds = '2008-04-08')) (type: boolean)
+ predicate: ((struct(ds,key)) IN (const struct('2000-04-08',1), const struct('2000-04-09',2)) and (struct(ds)) IN (const struct('2000-04-08'), const struct('2000-04-09')) and (ds = '2008-04-08')) (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: key (type: int), value (type: string)
[9/9] hive git commit: HIVE-14636 : pass information from
FSOP/TezTask to commit to take care of speculative execution and failed tasks
(Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-14636 : pass information from FSOP/TezTask to commit to take care of speculative execution and failed tasks (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87dcab47
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87dcab47
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87dcab47
Branch: refs/heads/hive-14535
Commit: 87dcab470f33ace818c775da6b0a9f18b10f66ac
Parents: 2cef25d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 31 16:06:03 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 31 16:06:03 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/FileUtils.java | 6 +-
.../hadoop/hive/common/HiveStatsUtils.java | 14 +-
.../java/org/apache/hadoop/hive/ql/Context.java | 9 +-
.../hive/ql/exec/AbstractFileMergeOperator.java | 4 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 194 +++++++++---
.../apache/hadoop/hive/ql/exec/MoveTask.java | 315 ++++++++++---------
.../apache/hadoop/hive/ql/exec/Utilities.java | 10 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 5 +-
.../hadoop/hive/ql/parse/TypeCheckCtx.java | 2 +-
.../hadoop/hive/ql/plan/FileSinkDesc.java | 9 +
.../hadoop/hive/ql/plan/LoadFileDesc.java | 2 +-
.../hadoop/hive/ql/plan/LoadTableDesc.java | 19 +-
ql/src/test/queries/clientpositive/mm_current.q | 18 +-
.../clientpositive/llap/mm_current.q.out | 133 +++++++-
15 files changed, 517 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 3ed2d08..ad43610 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -329,9 +329,13 @@ public final class FileUtils {
*/
public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
List<FileStatus> results) throws IOException {
+ listStatusRecursively(fs, fileStatus, HIDDEN_FILES_PATH_FILTER, results);
+ }
+ public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
+ PathFilter filter, List<FileStatus> results) throws IOException {
if (fileStatus.isDir()) {
- for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER)) {
+ for (FileStatus stat : fs.listStatus(fileStatus.getPath(), filter)) {
listStatusRecursively(fs, stat, results);
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
index 7c9d72f..111d99c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,15 +51,20 @@ public class HiveStatsUtils {
* @return array of FileStatus
* @throws IOException
*/
- public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
+ public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
throws IOException {
+ return getFileStatusRecurse(path, level, fs, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ }
+
+ public static FileStatus[] getFileStatusRecurse(
+ Path path, int level, FileSystem fs, PathFilter filter) throws IOException {
// if level is <0, the return all files/directories under the specified path
- if ( level < 0) {
+ if (level < 0) {
List<FileStatus> result = new ArrayList<FileStatus>();
try {
FileStatus fileStatus = fs.getFileStatus(path);
- FileUtils.listStatusRecursively(fs, fileStatus, result);
+ FileUtils.listStatusRecursively(fs, fileStatus, filter, result);
} catch (IOException e) {
// globStatus() API returns empty FileStatus[] when the specified path
// does not exist. But getFileStatus() throw IOException. To mimic the
@@ -75,7 +81,7 @@ public class HiveStatsUtils {
sb.append(Path.SEPARATOR).append("*");
}
Path pathPattern = new Path(path, sb.toString());
- return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ return fs.globStatus(pathPattern, filter);
}
public static int getNumBitVectorsForNDVEstimation(Configuration conf) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index ceb257c..1013f7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -233,7 +233,8 @@ public class Context {
// Append task specific info to stagingPathName, instead of creating a sub-directory.
// This way we don't have to worry about deleting the stagingPathName separately at
// end of query execution.
- dir = fs.makeQualified(new Path(stagingPathName + "_" + this.executionId + "-" + TaskRunner.getTaskRunnerID()));
+ // TODO# HERE
+ dir = fs.makeQualified(new Path(stagingPathName + "_" + getExecutionPrefix()));
LOG.debug("Created staging dir = " + dir + " for path = " + inputPath);
@@ -819,6 +820,10 @@ public class Context {
this.skipTableMasking = skipTableMasking;
}
+ public String getExecutionPrefix() {
+ return this.executionId + "-" + TaskRunner.getTaskRunnerID();
+ }
+
public ExplainConfiguration getExplainConfig() {
return explainConfig;
}
@@ -827,7 +832,7 @@ public class Context {
this.explainConfig = explainConfig;
}
- public void resetOpContext(){
+ public void resetOpContext() {
opContext = new CompilationOpContext();
sequencer = new AtomicInteger();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index dfad6c1..40c784b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -254,8 +254,8 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
Path outputDir = conf.getOutputPath();
FileSystem fs = outputDir.getFileSystem(hconf);
Path backupPath = backupOutputPath(fs, outputDir);
- Utilities
- .mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
+ // TODO# merge-related move
+ Utilities.mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
null, reporter);
if (success) {
LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 1f5dfea..b8a2c5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -27,16 +27,22 @@ import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -79,6 +85,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
@@ -92,6 +99,7 @@ import com.google.common.collect.Lists;
public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
Serializable {
+ private static final String MANIFEST_EXTENSION = ".manifest";
public static final Logger LOG = LoggerFactory.getLogger(FileSinkOperator.class);
private static final boolean isInfoEnabled = LOG.isInfoEnabled();
private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -165,7 +173,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts
+ ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
- + " (spec path " + specPath + ")", new Exception());
+ + " (spec path " + specPath + ")"/*, new Exception()*/);
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
@@ -187,7 +195,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
/**
* Update the final paths according to tmpPath.
*/
- public Path getFinalPath(String taskId, Path tmpPath, String extension) {
+ private Path getFinalPath(String taskId, Path tmpPath, String extension) {
if (extension != null) {
return new Path(tmpPath, taskId + extension);
} else {
@@ -218,41 +226,64 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
private void commit(FileSystem fs) throws HiveException {
- if (isMmTable) return; // TODO#: need to propagate to MoveTask instead
+ List<Path> commitPaths = null;
+ if (isMmTable) {
+ commitPaths = new ArrayList<>();
+ }
for (int idx = 0; idx < outPaths.length; ++idx) {
try {
- if ((bDynParts || isSkewedStoredAsSubDirectories)
- && !fs.exists(finalPaths[idx].getParent())) {
- Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
- fs.mkdirs(finalPaths[idx].getParent());
- }
- boolean needToRename = true;
- if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
- conf.getWriteType() == AcidUtils.Operation.DELETE) {
- // If we're updating or deleting there may be no file to close. This can happen
- // because the where clause strained out all of the records for a given bucket. So
- // before attempting the rename below, check if our file exists. If it doesn't,
- // then skip the rename. If it does try it. We could just blindly try the rename
- // and avoid the extra stat, but that would mask other errors.
- try {
- if (outPaths[idx] != null) {
- FileStatus stat = fs.getFileStatus(outPaths[idx]);
- }
- } catch (FileNotFoundException fnfe) {
- needToRename = false;
- }
- }
- Utilities.LOG14535.info("commit potentially moving " + outPaths[idx] + " to " + finalPaths[idx]);
- if (needToRename && outPaths[idx] != null && !fs.rename(outPaths[idx], finalPaths[idx])) {
- throw new HiveException("Unable to rename output from: " +
- outPaths[idx] + " to: " + finalPaths[idx]);
- }
- updateProgress();
+ commitOneOutPath(idx, fs, commitPaths);
} catch (IOException e) {
throw new HiveException("Unable to rename output from: " +
outPaths[idx] + " to: " + finalPaths[idx], e);
}
}
+ if (isMmTable) {
+ Path manifestPath = new Path(specPath, "_tmp." + getPrefixedTaskId() + MANIFEST_EXTENSION);
+ Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths);
+ try {
+ try (FSDataOutputStream out = fs.create(manifestPath)) {
+ out.writeInt(commitPaths.size());
+ for (Path path : commitPaths) {
+ out.writeUTF(path.toString());
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+
+ private String getPrefixedTaskId() {
+ return conf.getExecutionPrefix() + "_" + taskId;
+ }
+
+ private void commitOneOutPath(int idx, FileSystem fs, List<Path> commitPaths)
+ throws IOException, HiveException {
+ if ((bDynParts || isSkewedStoredAsSubDirectories)
+ && !fs.exists(finalPaths[idx].getParent())) {
+ Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
+ fs.mkdirs(finalPaths[idx].getParent());
+ }
+ // If we're updating or deleting there may be no file to close. This can happen
+ // because the where clause strained out all of the records for a given bucket. So
+ // before attempting the rename below, check if our file exists. If it doesn't,
+ // then skip the rename. If it does try it. We could just blindly try the rename
+ // and avoid the extra stat, but that would mask other errors.
+ boolean needToRename = (conf.getWriteType() != AcidUtils.Operation.UPDATE &&
+ conf.getWriteType() != AcidUtils.Operation.DELETE) || fs.exists(outPaths[idx]);
+ if (needToRename && outPaths[idx] != null) {
+ Utilities.LOG14535.info("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")");
+ if (isMmTable) {
+ assert outPaths[idx].equals(finalPaths[idx]);
+ commitPaths.add(outPaths[idx]);
+ } else if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+ throw new HiveException("Unable to rename output from: "
+ + outPaths[idx] + " to: " + finalPaths[idx]);
+ }
+ }
+
+ updateProgress();
}
public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
@@ -297,10 +328,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
outPaths[filesIdx] = getTaskOutPath(taskId);
} else {
if (!bDynParts && !isSkewedStoredAsSubDirectories) {
- finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+ finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, extension);
} else {
// TODO# wrong!
- finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+ finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, extension);
}
outPaths[filesIdx] = finalPaths[filesIdx];
}
@@ -638,7 +669,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable, isSkewedStoredAsSubDirectories);
Utilities.LOG14535.info("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx]
+ "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path "
- + fsp.getTmpPath() + ", task " + taskId + ")", new Exception());
+ + fsp.getTmpPath() + ", task " + taskId + ")"/*, new Exception()*/);
if (isInfoEnabled) {
LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
@@ -1150,9 +1181,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
if (conf.isLinkedFileSink() && (dpCtx != null)) {
specPath = conf.getParentDir();
+ Utilities.LOG14535.info("Setting specPath to " + specPath + " for dynparts");
+ }
+ if (!conf.isMmTable()) {
+ Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); // TODO# other callers
+ } else {
+ handleMmTable(specPath, hconf, success, dpCtx, conf, reporter);
}
- Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
- reporter);
}
} catch (IOException e) {
throw new HiveException(e);
@@ -1160,6 +1195,95 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
super.jobCloseOp(hconf, success);
}
+ private static class ExecPrefixPathFilter implements PathFilter {
+ private final String prefix, tmpPrefix;
+ public ExecPrefixPathFilter(String prefix) {
+ this.prefix = prefix;
+ this.tmpPrefix = "_tmp." + prefix;
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return name.startsWith(prefix) || name.startsWith(tmpPrefix);
+ }
+ }
+
+
+ private void handleMmTable(Path specPath, Configuration hconf, boolean success,
+ DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter)
+ throws IOException, HiveException {
+ FileSystem fs = specPath.getFileSystem(hconf);
+ int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols();
+ if (!success) {
+ FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+ specPath, targetLevel, fs, new ExecPrefixPathFilter(conf.getExecutionPrefix()));
+ for (FileStatus status : statuses) {
+ Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
+ tryDelete(fs, status.getPath());
+ }
+ return;
+ }
+ FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+ specPath, targetLevel, fs, new ExecPrefixPathFilter(conf.getExecutionPrefix()));
+ if (statuses == null) return;
+ LinkedList<FileStatus> results = new LinkedList<>();
+ List<Path> manifests = new ArrayList<>(statuses.length);
+ for (FileStatus status : statuses) {
+ if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
+ manifests.add(status.getPath());
+ } else {
+ results.add(status);
+ }
+ }
+ HashSet<String> committed = new HashSet<>();
+ for (Path mfp : manifests) {
+ try (FSDataInputStream mdis = fs.open(mfp)) {
+ int fileCount = mdis.readInt();
+ for (int i = 0; i < fileCount; ++i) {
+ String nextFile = mdis.readUTF();
+ if (!committed.add(nextFile)) {
+ throw new HiveException(nextFile + " was specified in multiple manifests");
+ }
+ }
+ }
+ }
+ Iterator<FileStatus> iter = results.iterator();
+ while (iter.hasNext()) {
+ FileStatus rfs = iter.next();
+ if (!committed.remove(rfs.getPath().toString())) {
+ iter.remove();
+ Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not committed");
+ tryDelete(fs, rfs.getPath());
+ }
+ }
+ if (!committed.isEmpty()) {
+ throw new HiveException("The following files were committed but not found: " + committed);
+ }
+ for (Path mfp : manifests) {
+ Utilities.LOG14535.info("Deleting manifest " + mfp);
+ tryDelete(fs, mfp);
+ }
+
+ if (results.isEmpty()) return;
+ FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]);
+
+ List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
+ fs, finalResults, dpCtx, conf, hconf);
+ // create empty buckets if necessary
+ if (emptyBuckets.size() > 0) {
+ Utilities.createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
+ }
+ }
+
+ private void tryDelete(FileSystem fs, Path path) {
+ try {
+ fs.delete(path, false);
+ } catch (IOException ex) {
+ LOG.error("Failed to delete " + path, ex);
+ }
+ }
+
@Override
public OperatorType getType() {
return OperatorType.FILESINK;
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 2ab97f7..e3646da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -241,6 +241,18 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
return false;
}
+ private final static class TaskInformation {
+ public List<BucketCol> bucketCols = null;
+ public List<SortCol> sortCols = null;
+ public int numBuckets = -1;
+ public Task task;
+ public String path;
+ public TaskInformation(Task task, String path) {
+ this.task = task;
+ this.path = path;
+ }
+ }
+
@Override
public int execute(DriverContext driverContext) {
@@ -318,155 +330,15 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
// Check if the bucketing and/or sorting columns were inferred
- List<BucketCol> bucketCols = null;
- List<SortCol> sortCols = null;
- int numBuckets = -1;
- Task task = this;
- String path = tbd.getSourcePath().toUri().toString();
- // Find the first ancestor of this MoveTask which is some form of map reduce task
- // (Either standard, local, or a merge)
- while (task.getParentTasks() != null && task.getParentTasks().size() == 1) {
- task = (Task)task.getParentTasks().get(0);
- // If it was a merge task or a local map reduce task, nothing can be inferred
- if (task instanceof MergeFileTask || task instanceof MapredLocalTask) {
- break;
- }
-
- // If it's a standard map reduce task, check what, if anything, it inferred about
- // the directory this move task is moving
- if (task instanceof MapRedTask) {
- MapredWork work = (MapredWork)task.getWork();
- MapWork mapWork = work.getMapWork();
- bucketCols = mapWork.getBucketedColsByDirectory().get(path);
- sortCols = mapWork.getSortedColsByDirectory().get(path);
- if (work.getReduceWork() != null) {
- numBuckets = work.getReduceWork().getNumReduceTasks();
- }
-
- if (bucketCols != null || sortCols != null) {
- // This must be a final map reduce task (the task containing the file sink
- // operator that writes the final output)
- assert work.isFinalMapRed();
- }
- break;
- }
-
- // If it's a move task, get the path the files were moved from, this is what any
- // preceding map reduce task inferred information about, and moving does not invalidate
- // those assumptions
- // This can happen when a conditional merge is added before the final MoveTask, but the
- // condition for merging is not met, see GenMRFileSink1.
- if (task instanceof MoveTask) {
- if (((MoveTask)task).getWork().getLoadFileWork() != null) {
- path = ((MoveTask)task).getWork().getLoadFileWork().getSourcePath().toUri().toString();
- }
- }
- }
+ TaskInformation ti = new TaskInformation(this, tbd.getSourcePath().toUri().toString());
+ inferTaskInformation(ti);
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
-
- List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
-
- // publish DP columns to its subscribers
- if (dps != null && dps.size() > 0) {
- pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
- }
- console.printInfo(System.getProperty("line.separator"));
- long startTime = System.currentTimeMillis();
- // load the list of DP partitions and return the list of partition specs
- // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
- // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
- // After that check the number of DPs created to not exceed the limit and
- // iterate over it and call loadPartition() here.
- // The reason we don't do inside HIVE-1361 is the latter is large and we
- // want to isolate any potential issue it may introduce.
- Map<Map<String, String>, Partition> dp =
- db.loadDynamicPartitions(
- tbd.getSourcePath(),
- tbd.getTable().getTableName(),
- tbd.getPartitionSpec(),
- tbd.getReplace(),
- dpCtx.getNumDPCols(),
- isSkewedStoredAsDirs(tbd),
- work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
- SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),
- work.getLoadTableWork().getWriteType());
-
- console.printInfo("\t Time taken to load dynamic partitions: " +
- (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
-
- if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
- throw new HiveException("This query creates no partitions." +
- " To turn off this error, set hive.error.on.empty.partition=false.");
- }
-
- startTime = System.currentTimeMillis();
- // for each partition spec, get the partition
- // and put it to WriteEntity for post-exec hook
- for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
- Partition partn = entry.getValue();
-
- if (bucketCols != null || sortCols != null) {
- updatePartitionBucketSortColumns(
- db, table, partn, bucketCols, numBuckets, sortCols);
- }
-
- WriteEntity enty = new WriteEntity(partn,
- (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
- WriteEntity.WriteType.INSERT));
- if (work.getOutputs() != null) {
- work.getOutputs().add(enty);
- }
- // Need to update the queryPlan's output as well so that post-exec hook get executed.
- // This is only needed for dynamic partitioning since for SP the the WriteEntity is
- // constructed at compile time and the queryPlan already contains that.
- // For DP, WriteEntity creation is deferred at this stage so we need to update
- // queryPlan here.
- if (queryPlan.getOutputs() == null) {
- queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
- }
- queryPlan.getOutputs().add(enty);
-
- // update columnar lineage for each partition
- dc = new DataContainer(table.getTTable(), partn.getTPartition());
-
- // Don't set lineage on delete as we don't have all the columns
- if (SessionState.get() != null &&
- work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
- work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
- SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
- table.getCols());
- }
- LOG.info("\tLoading partition " + entry.getKey());
- }
- console.printInfo("\t Time taken for adding to write entity : " +
- (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
- dc = null; // reset data container to prevent it being added again.
+ dc = handleDynParts(db, table, tbd, ti, dpCtx);
} else { // static partitions
- List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
- tbd.getPartitionSpec());
- db.validatePartitionNameCharacters(partVals);
- Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable());
- db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
- tbd.getPartitionSpec(), tbd.getReplace(),
- tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
- work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
- Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
-
- if (bucketCols != null || sortCols != null) {
- updatePartitionBucketSortColumns(db, table, partn, bucketCols,
- numBuckets, sortCols);
- }
-
- dc = new DataContainer(table.getTTable(), partn.getTPartition());
- // add this partition to post-execution hook
- if (work.getOutputs() != null) {
- work.getOutputs().add(new WriteEntity(partn,
- (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
- : WriteEntity.WriteType.INSERT)));
- }
- }
+ dc = handleStaticParts(db, table, tbd, ti);
+ }
}
if (SessionState.get() != null && dc != null) {
// If we are doing an update or a delete the number of columns in the table will not
@@ -500,6 +372,159 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
}
}
+ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
+ TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
+ List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec());
+ db.validatePartitionNameCharacters(partVals);
+ Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable());
+ db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(), tbd.getReplace(),
+ tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
+ ti.numBuckets, ti.sortCols);
+ }
+
+ DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition());
+ // add this partition to post-execution hook
+ if (work.getOutputs() != null) {
+ work.getOutputs().add(new WriteEntity(partn,
+ (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
+ : WriteEntity.WriteType.INSERT)));
+ }
+ return dc;
+ }
+
+ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
+ TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
+ IOException, InvalidOperationException {
+ DataContainer dc;
+ List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
+
+ // publish DP columns to its subscribers
+ if (dps != null && dps.size() > 0) {
+ pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
+ }
+ console.printInfo(System.getProperty("line.separator"));
+ long startTime = System.currentTimeMillis();
+ // load the list of DP partitions and return the list of partition specs
+ // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
+ // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
+ // After that check the number of DPs created to not exceed the limit and
+ // iterate over it and call loadPartition() here.
+ // The reason we don't do inside HIVE-1361 is the latter is large and we
+ // want to isolate any potential issue it may introduce.
+ Map<Map<String, String>, Partition> dp =
+ db.loadDynamicPartitions(
+ tbd.getSourcePath(),
+ tbd.getTable().getTableName(),
+ tbd.getPartitionSpec(),
+ tbd.getReplace(),
+ dpCtx.getNumDPCols(),
+ isSkewedStoredAsDirs(tbd),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
+ SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),
+ work.getLoadTableWork().getWriteType());
+
+ console.printInfo("\t Time taken to load dynamic partitions: " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+
+ if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
+ throw new HiveException("This query creates no partitions." +
+ " To turn off this error, set hive.error.on.empty.partition=false.");
+ }
+
+ startTime = System.currentTimeMillis();
+ // for each partition spec, get the partition
+ // and put it to WriteEntity for post-exec hook
+ for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+ Partition partn = entry.getValue();
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ updatePartitionBucketSortColumns(
+ db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
+ }
+
+ WriteEntity enty = new WriteEntity(partn,
+ (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+ WriteEntity.WriteType.INSERT));
+ if (work.getOutputs() != null) {
+ work.getOutputs().add(enty);
+ }
+ // Need to update the queryPlan's output as well so that post-exec hook get executed.
+ // This is only needed for dynamic partitioning since for SP the the WriteEntity is
+ // constructed at compile time and the queryPlan already contains that.
+ // For DP, WriteEntity creation is deferred at this stage so we need to update
+ // queryPlan here.
+ if (queryPlan.getOutputs() == null) {
+ queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
+ }
+ queryPlan.getOutputs().add(enty);
+
+ // update columnar lineage for each partition
+ dc = new DataContainer(table.getTTable(), partn.getTPartition());
+
+ // Don't set lineage on delete as we don't have all the columns
+ if (SessionState.get() != null &&
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
+ SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
+ table.getCols());
+ }
+ LOG.info("\tLoading partition " + entry.getKey());
+ }
+ console.printInfo("\t Time taken for adding to write entity : " +
+ (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+ dc = null; // reset data container to prevent it being added again.
+ return dc;
+ }
+
+ private void inferTaskInformation(TaskInformation ti) {
+ // Find the first ancestor of this MoveTask which is some form of map reduce task
+ // (Either standard, local, or a merge)
+ while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) {
+ ti.task = (Task)ti.task.getParentTasks().get(0);
+ // If it was a merge task or a local map reduce task, nothing can be inferred
+ if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) {
+ break;
+ }
+
+ // If it's a standard map reduce task, check what, if anything, it inferred about
+ // the directory this move task is moving
+ if (ti.task instanceof MapRedTask) {
+ MapredWork work = (MapredWork)ti.task.getWork();
+ MapWork mapWork = work.getMapWork();
+ ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
+ ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
+ if (work.getReduceWork() != null) {
+ ti.numBuckets = work.getReduceWork().getNumReduceTasks();
+ }
+
+ if (ti.bucketCols != null || ti.sortCols != null) {
+ // This must be a final map reduce task (the task containing the file sink
+ // operator that writes the final output)
+ assert work.isFinalMapRed();
+ }
+ break;
+ }
+
+ // If it's a move task, get the path the files were moved from, this is what any
+ // preceding map reduce task inferred information about, and moving does not invalidate
+ // those assumptions
+ // This can happen when a conditional merge is added before the final MoveTask, but the
+ // condition for merging is not met, see GenMRFileSink1.
+ if (ti.task instanceof MoveTask) {
+ MoveTask mt = (MoveTask)ti.task;
+ if (mt.getWork().getLoadFileWork() != null) {
+ ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
+ }
+ }
+ }
+ }
+
private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
throws HiveException {
if (work.getCheckFileFormat()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a7f7b9f..427f067 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1409,7 +1409,6 @@ public final class Utilities {
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
if (success) {
- // TODO# specPath instead of tmpPath
FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
if(statuses != null && statuses.length > 0) {
@@ -1423,8 +1422,6 @@ public final class Utilities {
Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
}
- List<Path> paths = new ArrayList<>();
- // TODO#: HERE listFilesToCommit(specPath, fs, paths);
} else {
Utilities.LOG14535.info("deleting tmpPath " + tmpPath);
fs.delete(tmpPath, true);
@@ -1445,7 +1442,7 @@ public final class Utilities {
* @throws HiveException
* @throws IOException
*/
- private static void createEmptyBuckets(Configuration hconf, List<Path> paths,
+ static void createEmptyBuckets(Configuration hconf, List<Path> paths,
FileSinkDesc conf, Reporter reporter)
throws HiveException, IOException {
@@ -1586,19 +1583,18 @@ public final class Utilities {
for (FileStatus one : items) {
if (isTempPath(one)) {
- Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath(), new Exception());
+ Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath()/*, new Exception()*/);
if (!fs.delete(one.getPath(), true)) {
throw new IOException("Unable to delete tmp file: " + one.getPath());
}
} else {
String taskId = getPrefixedTaskIdFromFilename(one.getPath().getName());
- Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId, new Exception());
+ Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId/*, new Exception()*/);
FileStatus otherFile = taskIdToFile.get(taskId);
if (otherFile == null) {
taskIdToFile.put(taskId, one);
} else {
- // TODO# file choice!
// Compare the file sizes of all the attempt files for the same task, the largest win
// any attempt files could contain partial results (due to task failures or
// speculative runs), but the largest should be the correct one since the result
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7d8c961..e43c600 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1878,7 +1878,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName());
Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
replace, true, listBucketingEnabled,
- false, isAcid, hasFollowingStatsTask, false); // TODO# here
+ false, isAcid, hasFollowingStatsTask, false); // TODO# special case #N
partitionsMap.put(fullPartSpec, newPartition);
if (inPlaceEligible) {
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 6ed379a..499530e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6658,7 +6658,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
acidOp = getAcidType(table_desc.getOutputFileFormatClass());
checkAcidConstraints(qb, table_desc, dest_tab);
}
- ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
+ ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, isMmTable);
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
dest_tab.getTableName()));
ltd.setLbCtx(lbCtx);
@@ -6860,6 +6860,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
canBeMerged, isMmTable);
+ if (isMmTable) {
+ fileSinkDesc.setExecutionPrefix(ctx.getExecutionPrefix());
+ }
Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
fileSinkDesc, fsRS, input), inputRR);
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
index 02896ff..26f1d70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
@@ -160,7 +160,7 @@ public class TypeCheckCtx implements NodeProcessorCtx {
if (LOG.isDebugEnabled()) {
// Logger the callstack from which the error has been set.
LOG.debug("Setting error: [" + error + "] from "
- + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree()), new Exception());
+ + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree())/*, new Exception()*/);
}
this.error = error;
this.errorSrcNode = errorSrcNode;
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 0a4848b..f51999d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
private Path destPath;
private boolean isHiveServerQuery;
private boolean isMmTable;
+ private String executionPrefix;
public FileSinkDesc() {
}
@@ -158,6 +159,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
ret.setWriteType(writeType);
ret.setTransactionId(txnId);
ret.setStatsTmpDir(statsTmpDir);
+ ret.setExecutionPrefix(executionPrefix);
return ret;
}
@@ -481,4 +483,11 @@ public class FileSinkDesc extends AbstractOperatorDesc {
this.statsTmpDir = statsCollectionTempDir;
}
+ public String getExecutionPrefix() {
+ return this.executionPrefix;
+ }
+
+ public void setExecutionPrefix(String value) {
+ this.executionPrefix = value;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 5e4e1fe..5cad65c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -56,7 +56,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
final boolean isDfsDir, final String columns, final String columnTypes) {
super(sourcePath);
- Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir, new Exception());
+ Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir/*, new Exception()*/);
this.targetDir = targetDir;
this.isDfsDir = isDfsDir;
this.columns = columns;
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1ac831d..3b49197 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -52,10 +52,10 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec,
final boolean replace,
- final AcidUtils.Operation writeType) {
+ final AcidUtils.Operation writeType, boolean isMmTable) {
super(sourcePath);
- Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName(), new Exception());
- init(table, partitionSpec, replace, writeType, false);
+ Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
+ init(table, partitionSpec, replace, writeType, isMmTable);
}
/**
@@ -69,14 +69,16 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
final TableDesc table,
final Map<String, String> partitionSpec,
final boolean replace) {
- this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID);
+ // TODO# we assume mm=false here
+ this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, false);
}
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec,
- final AcidUtils.Operation writeType) {
- this(sourcePath, table, partitionSpec, true, writeType);
+ final AcidUtils.Operation writeType, boolean isMmTable) {
+ // TODO# we assume mm=false here
+ this(sourcePath, table, partitionSpec, true, writeType, isMmTable);
}
/**
@@ -88,7 +90,8 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
public LoadTableDesc(final Path sourcePath,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
final Map<String, String> partitionSpec) {
- this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID);
+ // TODO# we assume mm=false here
+ this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, false);
}
public LoadTableDesc(final Path sourcePath,
@@ -98,7 +101,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
boolean isReplace,
boolean isMmTable) {
super(sourcePath);
- Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName(), new Exception());
+ Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
this.dpCtx = dpCtx;
if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable);
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q
index 882096b..11259cb 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -2,10 +2,20 @@ set hive.mapred.mode=nonstrict;
set hive.explain.user=false;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.fetch.task.conversion=none;
+set tez.grouping.min-size=1;
+set tez.grouping.max-size=2;
+set hive.tez.auto.reducer.parallelism=false;
-drop table simple_mm;
-
+create table intermediate(key int) partitioned by (p int) stored as orc;
+insert into table intermediate partition(p='455') select key from src limit 3;
+insert into table intermediate partition(p='456') select key from src limit 3;
+insert into table intermediate partition(p='457') select key from src limit 3;
-create table simple_mm(key int) partitioned by (key_mm int) tblproperties ('hivecommit'='true');
-insert into table simple_mm partition(key_mm='455') select key from src limit 3;
+create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+
+explain insert into table simple_mm partition(key_mm='455') select key from intermediate;
+insert into table simple_mm partition(key_mm='455') select key from intermediate;
+
+drop table simple_mm;
+drop table intermediate;
http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index 129bb13..8f1af4c 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -1,21 +1,128 @@
-PREHOOK: query: drop table simple123
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table simple123
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true')
+PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
-PREHOOK: Output: default@simple123
-POSTHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple123
-PREHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3
+POSTHOOK: Output: default@intermediate
+PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=455
+POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=455
+POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
PREHOOK: type: QUERY
PREHOOK: Input: default@src
-PREHOOK: Output: default@simple123@key123=455
-POSTHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3
+PREHOOK: Output: default@intermediate@p=456
+POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
-POSTHOOK: Output: default@simple123@key123=455
-POSTHOOK: Lineage: simple123 PARTITION(key123=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Output: default@intermediate@p=456
+POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=457
+POSTHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=457
+POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+POSTHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-2 depends on stages: Stage-1
+ Stage-0 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: intermediate
+ Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.simple_mm
+ Execution mode: llap
+ LLAP IO: all inputs
+
+ Stage: Stage-2
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ key_mm 455
+ replace: false
+ table:
+ input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+ output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+ serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+ name: default.simple_mm
+ micromanaged table: true
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+PREHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_mm@key_mm=455
+POSTHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_mm@key_mm=455
+POSTHOOK: Lineage: simple_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@simple_mm
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@simple_mm
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: drop table intermediate
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@intermediate
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: drop table intermediate
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Output: default@intermediate
[5/9] hive git commit: HIVE-14673: Orc orc_merge_diff_fs.q and
orc_llap.q test improvement (Prasanth Jayachandran reviewed by Hari
Subramaniyan)
Posted by se...@apache.org.
HIVE-14673: Orc orc_merge_diff_fs.q and orc_llap.q test improvement (Prasanth Jayachandran reviewed by Hari Subramaniyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fdc4bc85
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fdc4bc85
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fdc4bc85
Branch: refs/heads/hive-14535
Commit: fdc4bc8536606e8f1dc245fe559514b35fda8708
Parents: fa3a8b9
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Aug 31 12:47:02 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Aug 31 12:47:02 2016 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 3 +-
ql/src/test/queries/clientpositive/orc_llap.q | 76 +-
.../results/clientpositive/llap/orc_llap.q.out | 866 ++++++++++---------
.../clientpositive/orc_merge_diff_fs.q.out | 18 +-
4 files changed, 501 insertions(+), 462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 764c8f6..a920ca9 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -33,7 +33,6 @@ minimr.query.files=auto_sortmerge_join_16.q,\
load_fs2.q,\
load_hdfs_file_with_space_in_the_name.q,\
non_native_window_udf.q, \
- orc_merge_diff_fs.q,\
parallel_orderby.q,\
quotedid_smb.q,\
reduce_deduplicate.q,\
@@ -227,6 +226,7 @@ minillap.shared.query.files=acid_globallimit.q,\
orc_merge7.q,\
orc_merge8.q,\
orc_merge9.q,\
+ orc_merge_diff_fs.q,\
orc_merge_incompat1.q,\
orc_merge_incompat2.q,\
orc_merge_incompat3.q,\
@@ -462,7 +462,6 @@ minillap.query.files=acid_bucket_pruning.q,\
orc_llap_counters.q,\
orc_llap_counters1.q,\
orc_llap_nonvector.q,\
- orc_merge_diff_fs.q,\
orc_ppd_basic.q,\
schema_evol_orc_acid_part.q,\
schema_evol_orc_acid_part_update.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/ql/src/test/queries/clientpositive/orc_llap.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_llap.q b/ql/src/test/queries/clientpositive/orc_llap.q
index d2bd086..7b7f240 100644
--- a/ql/src/test/queries/clientpositive/orc_llap.q
+++ b/ql/src/test/queries/clientpositive/orc_llap.q
@@ -62,42 +62,26 @@ select count(*) from orc_llap_small;
-- All row groups pruned
select count(*) from orc_llap_small where cint < 60000000;
--- Hash cannot be vectorized, so run hash as the last step on a temp table
-drop table llap_temp_table;
+-- Hash cannot be vectorized, but now we have row-by-row reader, so the subquery runs in llap but with row-by-row reader
explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
-drop table llap_temp_table;
explain
-select * from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
-drop table llap_temp_table;
explain
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
-
-drop table llap_temp_table;
explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
-drop table llap_temp_table;
explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
-- multi-stripe test
insert into table orc_llap
@@ -106,43 +90,25 @@ from alltypesorc cross join cross_numbers;
alter table orc_llap concatenate;
-drop table llap_temp_table;
explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t;
-drop table llap_temp_table;
explain
-select * from orc_llap where cint > 10 and cbigint is not null;
-create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t;
-drop table llap_temp_table;
explain
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t;
-drop table llap_temp_table;
explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2;
-select sum(hash(*)) from llap_temp_table;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t;
-drop table llap_temp_table;
explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null;
-select sum(hash(*)) from llap_temp_table;
-
-drop table llap_temp_table;
-
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t;
DROP TABLE cross_numbers;
DROP TABLE orc_llap;
http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index 72dd623..74a6b29 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -236,192 +236,232 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@orc_llap_small
#### A masked pattern was here ####
0
-PREHOOK: query: -- Hash cannot be vectorized, so run hash as the last step on a temp table
-drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: -- Hash cannot be vectorized, so run hash as the last step on a temp table
-drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+PREHOOK: query: -- Hash cannot be vectorized, but now we have row-by-row reader, so the subquery runs in llap but with row-by-row reader
+explain
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
-POSTHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+POSTHOOK: query: -- Hash cannot be vectorized, but now we have row-by-row reader, so the subquery runs in llap but with row-by-row reader
+explain
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-0 is a root stage
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_llap
+ filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(cint,csmallint,cbigint) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
- TableScan
- alias: orc_llap
- filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
- Filter Operator
- predicate: ((cint > 10) and cbigint is not null) (type: boolean)
- Select Operator
- expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint)
- outputColumnNames: _col0, _col1, _col2
- ListSink
+ ListSink
-PREHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-558222259686
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-0 is a root stage
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_llap
+ filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(ctinyint,csmallint,cint,cbigint,cfloat,cdouble,cstring1,cstring2,ctimestamp1,ctimestamp2,cboolean1,cboolean2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 40960 Data size: 9693313 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
- TableScan
- alias: orc_llap
- filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
- Filter Operator
- predicate: ((cint > 10) and cbigint is not null) (type: boolean)
- Select Operator
- expressions: ctinyint (type: tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), cfloat (type: float), cdouble (type: double), cstring1 (type: string), cstring2 (type: string), ctimestamp1 (type: timestamp), ctimestamp2 (type: timestamp), cboolean1 (type: boolean), cboolean2 (type: boolean)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
- ListSink
+ ListSink
-PREHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-197609091139
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-0 is a root stage
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_llap
+ filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
+ Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((cint > 5) and (cint < 10)) (type: boolean)
+ Statistics: Num rows: 13653 Data size: 3231025 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(cstring2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 13653 Data size: 3231025 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
- TableScan
- alias: orc_llap
- filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
- Filter Operator
- predicate: ((cint > 5) and (cint < 10)) (type: boolean)
- Select Operator
- expressions: cstring2 (type: string)
- outputColumnNames: _col0
- ListSink
+ ListSink
-PREHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
NULL
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -433,6 +473,7 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -459,7 +500,7 @@ STAGE PLANS:
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
- Execution mode: vectorized, llap
+ Execution mode: llap
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -467,9 +508,30 @@ STAGE PLANS:
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(_col0,_col1,_col2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -481,43 +543,20 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-201218541193
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -528,7 +567,8 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -552,7 +592,7 @@ STAGE PLANS:
value expressions: _col2 (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
- Map 3
+ Map 4
Map Operator Tree:
TableScan
alias: o2
@@ -585,16 +625,33 @@ STAGE PLANS:
outputColumnNames: _col2, _col5
Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: _col2 (type: string), _col5 (type: string)
- outputColumnNames: _col0, _col1
+ expressions: hash(_col2,_col5) (type: int)
+ outputColumnNames: _col0
Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
@@ -602,27 +659,13 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o2.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-735462183586256
Warning: Map Join MAPJOIN[10][bigTable=?] in task 'Map 1' is a cross product
@@ -662,194 +705,230 @@ POSTHOOK: query: alter table orc_llap concatenate
POSTHOOK: type: ALTER_TABLE_MERGE
POSTHOOK: Input: default@orc_llap
POSTHOOK: Output: default@orc_llap
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-0 is a root stage
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_llap
+ filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(cint,csmallint,cbigint) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
- TableScan
- alias: orc_llap
- filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
- Filter Operator
- predicate: ((cint > 10) and cbigint is not null) (type: boolean)
- Select Operator
- expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint)
- outputColumnNames: _col0, _col1, _col2
- ListSink
+ ListSink
-PREHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-1116444519372
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select * from orc_llap where cint > 10 and cbigint is not null
+select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-0 is a root stage
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_llap
+ filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((cint > 10) and cbigint is not null) (type: boolean)
+ Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(ctinyint,csmallint,cint,cbigint,cfloat,cdouble,cstring1,cstring2,ctimestamp1,ctimestamp2,cboolean1,cboolean2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 81920 Data size: 19386626 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
- TableScan
- alias: orc_llap
- filterExpr: ((cint > 10) and cbigint is not null) (type: boolean)
- Filter Operator
- predicate: ((cint > 10) and cbigint is not null) (type: boolean)
- Select Operator
- expressions: ctinyint (type: tinyint), csmallint (type: smallint), cint (type: int), cbigint (type: bigint), cfloat (type: float), cdouble (type: double), cstring1 (type: string), cstring2 (type: string), ctimestamp1 (type: timestamp), ctimestamp2 (type: timestamp), cboolean1 (type: boolean), cboolean2 (type: boolean)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
- ListSink
+ ListSink
-PREHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select * from orc_llap where cint > 10 and cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-395218182278
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select cstring2 from orc_llap where cint > 5 and cint < 10
+select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-0 is a root stage
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: orc_llap
+ filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
+ Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: ((cint > 5) and (cint < 10)) (type: boolean)
+ Statistics: Num rows: 27306 Data size: 6462051 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(cstring2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 27306 Data size: 6462051 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Execution mode: llap
+ LLAP IO: all inputs
+ Reducer 2
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
- TableScan
- alias: orc_llap
- filterExpr: ((cint > 5) and (cint < 10)) (type: boolean)
- Filter Operator
- predicate: ((cint > 5) and (cint < 10)) (type: boolean)
- Select Operator
- expressions: cstring2 (type: string)
- outputColumnNames: _col0
- ListSink
+ ListSink
-PREHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring2 from orc_llap where cint > 5 and cint < 10
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
NULL
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
+select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -861,6 +940,7 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -887,7 +967,7 @@ STAGE PLANS:
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
- Execution mode: vectorized, llap
+ Execution mode: llap
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -895,9 +975,30 @@ STAGE PLANS:
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: hash(_col0,_col1,_col2) (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -909,43 +1010,20 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ]
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-201218418313
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
+select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -956,7 +1034,8 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -980,7 +1059,7 @@ STAGE PLANS:
value expressions: _col2 (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
- Map 3
+ Map 4
Map Operator Tree:
TableScan
alias: o2
@@ -1013,16 +1092,33 @@ STAGE PLANS:
outputColumnNames: _col2, _col5
Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: _col2 (type: string), _col5 (type: string)
- outputColumnNames: _col0, _col1
+ expressions: hash(_col2,_col5) (type: int)
+ outputColumnNames: _col0
Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Group By Operator
+ aggregations: sum(_col0)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Execution mode: vectorized, llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
@@ -1030,37 +1126,15 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-PREHOOK: type: CREATETABLE_AS_SELECT
-PREHOOK: Input: default@orc_llap
-PREHOOK: Output: database:default
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: create table llap_temp_table as
-select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null
-POSTHOOK: type: CREATETABLE_AS_SELECT
-POSTHOOK: Input: default@orc_llap
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@llap_temp_table
-POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ]
-POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o2.FieldSchema(name:cstring2, type:string, comment:null), ]
-PREHOOK: query: select sum(hash(*)) from llap_temp_table
+PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
PREHOOK: type: QUERY
-PREHOOK: Input: default@llap_temp_table
+PREHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-POSTHOOK: query: select sum(hash(*)) from llap_temp_table
+POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) t
POSTHOOK: type: QUERY
-POSTHOOK: Input: default@llap_temp_table
+POSTHOOK: Input: default@orc_llap
#### A masked pattern was here ####
-2941848734345024
-PREHOOK: query: drop table llap_temp_table
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@llap_temp_table
-PREHOOK: Output: default@llap_temp_table
-POSTHOOK: query: drop table llap_temp_table
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@llap_temp_table
-POSTHOOK: Output: default@llap_temp_table
PREHOOK: query: DROP TABLE cross_numbers
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@cross_numbers
http://git-wip-us.apache.org/repos/asf/hive/blob/fdc4bc85/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out b/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
index 6ac3d35..3f047da 100644
--- a/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
+++ b/ql/src/test/results/clientpositive/orc_merge_diff_fs.q.out
@@ -67,14 +67,14 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: src
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
@@ -144,14 +144,14 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: src
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
@@ -260,14 +260,14 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: src
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: UDFToInteger(key) (type: int), value (type: string), (hash(key) pmod 2) (type: int)
outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
[6/9] hive git commit: HIVE-14674 Incorrect syntax near the keyword
'with' using MS SQL Server (Eugene Koifman, reviewed by Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-14674 Incorrect syntax near the keyword 'with' using MS SQL Server (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f5dee8c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f5dee8c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f5dee8c
Branch: refs/heads/hive-14535
Commit: 8f5dee8c45ccf249b56609edb4a66d0211e2a00c
Parents: fdc4bc8
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Wed Aug 31 15:34:07 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Wed Aug 31 15:34:07 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/metastore/txn/TxnHandler.java | 11 +++++++++--
.../apache/hadoop/hive/metastore/txn/TestTxnUtils.java | 6 ++++++
2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8f5dee8c/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e8c5fac..a7a1cf9 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3308,6 +3308,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* Helper class that generates SQL queries with syntax specific to target DB
*/
+ @VisibleForTesting
static final class SQLGenerator {
private final DatabaseProduct dbProduct;
private final HiveConf conf;
@@ -3374,7 +3375,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent
* construct. If the DB doesn't support, return original select.
*/
- private String addForUpdateClause(String selectStatement) throws MetaException {
+ String addForUpdateClause(String selectStatement) throws MetaException {
switch (dbProduct) {
case DERBY:
//https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
@@ -3390,7 +3391,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case SQLSERVER:
//https://msdn.microsoft.com/en-us/library/ms189499.aspx
//https://msdn.microsoft.com/en-us/library/ms187373.aspx
- return selectStatement + " with(updlock)";
+ String modifier = " with (updlock)";
+ int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
+ if(wherePos < 0) {
+ return selectStatement + modifier;
+ }
+ return selectStatement.substring(0, wherePos) + modifier +
+ selectStatement.substring(wherePos, selectStatement.length());
default:
String msg = "Unrecognized database product name <" + dbProduct + ">";
LOG.error(msg);
http://git-wip-us.apache.org/repos/asf/hive/blob/8f5dee8c/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
index cf0a7d4..ebcbaff 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
@@ -158,6 +158,12 @@ public class TestTxnUtils {
Assert.assertEquals("Number of stmts", 2, sql.size());
Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('yellow', 1),('red', 2),('orange', 3),('G',0),('G',1),('G',2),('G',3),('G',4),('G',5),('G',6),('G',7),('G',8),('G',9),('G',10),('G',11),('G',12),('G',13),('G',14),('G',15),('G',16),('G',17),('G',18),('G',19),('G',20),('G',21),('G',22),('G',23),('G',24),('G',25),('G',26),('G',27),('G',28),('G',29),('G',30),('G',31),('G',32),('G',33),('G',34),('G',35),('G',36),('G',37),('G',38),('G',39),('G',40),('G',41),('G',42),('G',43),('G',44),('G',45),('G',46),('G',47),('G',48),('G',49),('G',50),('G',51),('G',52),('G',53),('G',54),('G',55),('G',56),('G',57),('G',58),('G',59),('G',60),('G',61),('G',62),('G',63),('G',64),('G',65),('G',66),('G',67),('G',68),('G',69),('G',70),('G',71),('G',72),('G',73),('G',74),('G',75),('G',76),('G',77),('G',78),('G',79),('G',80),('G',81),('G',82),('G',83),('G',84),('G',85),('G',86),('G',87),('G',88),('G',89),('G',90),('G',91),('G',92),('G',93),('G',94),('G',95),('G',96),('G',97),('G',9
8),('G',99),('G',100),('G',101),('G',102),('G',103),('G',104),('G',105),('G',106),('G',107),('G',108),('G',109),('G',110),('G',111),('G',112),('G',113),('G',114),('G',115),('G',116),('G',117),('G',118),('G',119),('G',120),('G',121),('G',122),('G',123),('G',124),('G',125),('G',126),('G',127),('G',128),('G',129),('G',130),('G',131),('G',132),('G',133),('G',134),('G',135),('G',136),('G',137),('G',138),('G',139),('G',140),('G',141),('G',142),('G',143),('G',144),('G',145),('G',146),('G',147),('G',148),('G',149),('G',150),('G',151),('G',152),('G',153),('G',154),('G',155),('G',156),('G',157),('G',158),('G',159),('G',160),('G',161),('G',162),('G',163),('G',164),('G',165),('G',166),('G',167),('G',168),('G',169),('G',170),('G',171),('G',172),('G',173),('G',174),('G',175),('G',176),('G',177),('G',178),('G',179),('G',180),('G',181),('G',182),('G',183),('G',184),('G',185),('G',186),('G',187),('G',188),('G',189),('G',190),('G',191),('G',192),('G',193),('G',194),('G',195),('G',196),('G',197),('G',
198),('G',199),('G',200),('G',201),('G',202),('G',203),('G',204),('G',205),('G',206),('G',207),('G',208),('G',209),('G',210),('G',211),('G',212),('G',213),('G',214),('G',215),('G',216),('G',217),('G',218),('G',219),('G',220),('G',221),('G',222),('G',223),('G',224),('G',225),('G',226),('G',227),('G',228),('G',229),('G',230),('G',231),('G',232),('G',233),('G',234),('G',235),('G',236),('G',237),('G',238),('G',239),('G',240),('G',241),('G',242),('G',243),('G',244),('G',245),('G',246),('G',247),('G',248),('G',249),('G',250),('G',251),('G',252),('G',253),('G',254),('G',255),('G',256),('G',257),('G',258),('G',259),('G',260),('G',261),('G',262),('G',263),('G',264),('G',265),('G',266),('G',267),('G',268),('G',269),('G',270),('G',271),('G',272),('G',273),('G',274),('G',275),('G',276),('G',277),('G',278),('G',279),('G',280),('G',281),('G',282),('G',283),('G',284),('G',285),('G',286),('G',287),('G',288),('G',289),('G',290),('G',291),('G',292),('G',293),('G',294),('G',295),('G',296),('G',297),('
G',298),('G',299),('G',300),('G',301),('G',302),('G',303),('G',304),('G',305),('G',306),('G',307),('G',308),('G',309),('G',310),('G',311),('G',312),('G',313),('G',314),('G',315),('G',316),('G',317),('G',318),('G',319),('G',320),('G',321),('G',322),('G',323),('G',324),('G',325),('G',326),('G',327),('G',328),('G',329),('G',330),('G',331),('G',332),('G',333),('G',334),('G',335),('G',336),('G',337),('G',338),('G',339),('G',340),('G',341),('G',342),('G',343),('G',344),('G',345),('G',346),('G',347),('G',348),('G',349),('G',350),('G',351),('G',352),('G',353),('G',354),('G',355),('G',356),('G',357),('G',358),('G',359),('G',360),('G',361),('G',362),('G',363),('G',364),('G',365),('G',366),('G',367),('G',368),('G',369),('G',370),('G',371),('G',372),('G',373),('G',374),('G',375),('G',376),('G',377),('G',378),('G',379),('G',380),('G',381),('G',382),('G',383),('G',384),('G',385),('G',386),('G',387),('G',388),('G',389),('G',390),('G',391),('G',392),('G',393),('G',394),('G',395),('G',396),('G',397)
,('G',398),('G',399),('G',400),('G',401),('G',402),('G',403),('G',404),('G',405),('G',406),('G',407),('G',408),('G',409),('G',410),('G',411),('G',412),('G',413),('G',414),('G',415),('G',416),('G',417),('G',418),('G',419),('G',420),('G',421),('G',422),('G',423),('G',424),('G',425),('G',426),('G',427),('G',428),('G',429),('G',430),('G',431),('G',432),('G',433),('G',434),('G',435),('G',436),('G',437),('G',438),('G',439),('G',440),('G',441),('G',442),('G',443),('G',444),('G',445),('G',446),('G',447),('G',448),('G',449),('G',450),('G',451),('G',452),('G',453),('G',454),('G',455),('G',456),('G',457),('G',458),('G',459),('G',460),('G',461),('G',462),('G',463),('G',464),('G',465),('G',466),('G',467),('G',468),('G',469),('G',470),('G',471),('G',472),('G',473),('G',474),('G',475),('G',476),('G',477),('G',478),('G',479),('G',480),('G',481),('G',482),('G',483),('G',484),('G',485),('G',486),('G',487),('G',488),('G',489),('G',490),('G',491),('G',492),('G',493),('G',494),('G',495),('G',496),('G',4
97),('G',498),('G',499),('G',500),('G',501),('G',502),('G',503),('G',504),('G',505),('G',506),('G',507),('G',508),('G',509),('G',510),('G',511),('G',512),('G',513),('G',514),('G',515),('G',516),('G',517),('G',518),('G',519),('G',520),('G',521),('G',522),('G',523),('G',524),('G',525),('G',526),('G',527),('G',528),('G',529),('G',530),('G',531),('G',532),('G',533),('G',534),('G',535),('G',536),('G',537),('G',538),('G',539),('G',540),('G',541),('G',542),('G',543),('G',544),('G',545),('G',546),('G',547),('G',548),('G',549),('G',550),('G',551),('G',552),('G',553),('G',554),('G',555),('G',556),('G',557),('G',558),('G',559),('G',560),('G',561),('G',562),('G',563),('G',564),('G',565),('G',566),('G',567),('G',568),('G',569),('G',570),('G',571),('G',572),('G',573),('G',574),('G',575),('G',576),('G',577),('G',578),('G',579),('G',580),('G',581),('G',582),('G',583),('G',584),('G',585),('G',586),('G',587),('G',588),('G',589),('G',590),('G',591),('G',592),('G',593),('G',594),('G',595),('G',596),('G
',597),('G',598),('G',599),('G',600),('G',601),('G',602),('G',603),('G',604),('G',605),('G',606),('G',607),('G',608),('G',609),('G',610),('G',611),('G',612),('G',613),('G',614),('G',615),('G',616),('G',617),('G',618),('G',619),('G',620),('G',621),('G',622),('G',623),('G',624),('G',625),('G',626),('G',627),('G',628),('G',629),('G',630),('G',631),('G',632),('G',633),('G',634),('G',635),('G',636),('G',637),('G',638),('G',639),('G',640),('G',641),('G',642),('G',643),('G',644),('G',645),('G',646),('G',647),('G',648),('G',649),('G',650),('G',651),('G',652),('G',653),('G',654),('G',655),('G',656),('G',657),('G',658),('G',659),('G',660),('G',661),('G',662),('G',663),('G',664),('G',665),('G',666),('G',667),('G',668),('G',669),('G',670),('G',671),('G',672),('G',673),('G',674),('G',675),('G',676),('G',677),('G',678),('G',679),('G',680),('G',681),('G',682),('G',683),('G',684),('G',685),('G',686),('G',687),('G',688),('G',689),('G',690),('G',691),('G',692),('G',693),('G',694),('G',695),('G',696),
('G',697),('G',698),('G',699),('G',700),('G',701),('G',702),('G',703),('G',704),('G',705),('G',706),('G',707),('G',708),('G',709),('G',710),('G',711),('G',712),('G',713),('G',714),('G',715),('G',716),('G',717),('G',718),('G',719),('G',720),('G',721),('G',722),('G',723),('G',724),('G',725),('G',726),('G',727),('G',728),('G',729),('G',730),('G',731),('G',732),('G',733),('G',734),('G',735),('G',736),('G',737),('G',738),('G',739),('G',740),('G',741),('G',742),('G',743),('G',744),('G',745),('G',746),('G',747),('G',748),('G',749),('G',750),('G',751),('G',752),('G',753),('G',754),('G',755),('G',756),('G',757),('G',758),('G',759),('G',760),('G',761),('G',762),('G',763),('G',764),('G',765),('G',766),('G',767),('G',768),('G',769),('G',770),('G',771),('G',772),('G',773),('G',774),('G',775),('G',776),('G',777),('G',778),('G',779),('G',780),('G',781),('G',782),('G',783),('G',784),('G',785),('G',786),('G',787),('G',788),('G',789),('G',790),('G',791),('G',792),('G',793),('G',794),('G',795),('G',79
6),('G',797),('G',798),('G',799),('G',800),('G',801),('G',802),('G',803),('G',804),('G',805),('G',806),('G',807),('G',808),('G',809),('G',810),('G',811),('G',812),('G',813),('G',814),('G',815),('G',816),('G',817),('G',818),('G',819),('G',820),('G',821),('G',822),('G',823),('G',824),('G',825),('G',826),('G',827),('G',828),('G',829),('G',830),('G',831),('G',832),('G',833),('G',834),('G',835),('G',836),('G',837),('G',838),('G',839),('G',840),('G',841),('G',842),('G',843),('G',844),('G',845),('G',846),('G',847),('G',848),('G',849),('G',850),('G',851),('G',852),('G',853),('G',854),('G',855),('G',856),('G',857),('G',858),('G',859),('G',860),('G',861),('G',862),('G',863),('G',864),('G',865),('G',866),('G',867),('G',868),('G',869),('G',870),('G',871),('G',872),('G',873),('G',874),('G',875),('G',876),('G',877),('G',878),('G',879),('G',880),('G',881),('G',882),('G',883),('G',884),('G',885),('G',886),('G',887),('G',888),('G',889),('G',890),('G',891),('G',892),('G',893),('G',894),('G',895),('G'
,896),('G',897),('G',898),('G',899),('G',900),('G',901),('G',902),('G',903),('G',904),('G',905),('G',906),('G',907),('G',908),('G',909),('G',910),('G',911),('G',912),('G',913),('G',914),('G',915),('G',916),('G',917),('G',918),('G',919),('G',920),('G',921),('G',922),('G',923),('G',924),('G',925),('G',926),('G',927),('G',928),('G',929),('G',930),('G',931),('G',932),('G',933),('G',934),('G',935),('G',936),('G',937),('G',938),('G',939),('G',940),('G',941),('G',942),('G',943),('G',944),('G',945),('G',946),('G',947),('G',948),('G',949),('G',950),('G',951),('G',952),('G',953),('G',954),('G',955),('G',956),('G',957),('G',958),('G',959),('G',960),('G',961),('G',962),('G',963),('G',964),('G',965),('G',966),('G',967),('G',968),('G',969),('G',970),('G',971),('G',972),('G',973),('G',974),('G',975),('G',976),('G',977),('G',978),('G',979),('G',980),('G',981),('G',982),('G',983),('G',984),('G',985),('G',986),('G',987),('G',988),('G',989),('G',990),('G',991),('G',992),('G',993),('G',994),('G',995),(
'G',996)", sql.get(0));
Assert.assertEquals("Wrong stmt", "insert into colors(name, category) values('G',997),('G',998),('G',999)", sql.get(1));
+
+ sqlGenerator = new TxnHandler.SQLGenerator(TxnHandler.DatabaseProduct.SQLSERVER, conf);
+ String modSql = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID");
+ Assert.assertEquals("select nl_next from NEXT_LOCK_ID with (updlock)", modSql);
+ modSql = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1='CheckLock' and MT_KEY2=0");
+ Assert.assertEquals("select MT_COMMENT from AUX_TABLE with (updlock) where MT_KEY1='CheckLock' and MT_KEY2=0", modSql);
}
@Before
[4/9] hive git commit: HIVE-14670:
org.apache.hadoop.hive.ql.TestMTQueries failure (Hari Subramaniyan,
reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-14670: org.apache.hadoop.hive.ql.TestMTQueries failure (Hari Subramaniyan, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fa3a8b9b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fa3a8b9b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fa3a8b9b
Branch: refs/heads/hive-14535
Commit: fa3a8b9bd16aad66b869ee30f6ddfa12f2fd5ad1
Parents: 1f6949f
Author: Hari Subramaniyan <ha...@apache.org>
Authored: Wed Aug 31 11:34:23 2016 -0700
Committer: Hari Subramaniyan <ha...@apache.org>
Committed: Wed Aug 31 11:34:23 2016 -0700
----------------------------------------------------------------------
.../src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fa3a8b9b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
index c5337ff..198fe48 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java
@@ -33,8 +33,7 @@ public class TestMTQueries extends BaseTestQueries {
}
public void testMTQueries1() throws Exception {
- String[] testNames = new String[] {"join1.q", "join2.q", "groupby1.q",
- "groupby2.q", "join3.q", "input1.q", "input19.q"};
+ String[] testNames = new String[] {"join2.q", "groupby1.q", "input1.q", "input19.q"};
File[] qfiles = setupQFiles(testNames);
QTestUtil[] qts = QTestUtil.queryListRunnerSetup(qfiles, resDir, logDir);