You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/08/31 03:38:38 UTC
hive git commit: HIVE-14233 - Improve vectorization for ACID by
eliminating row-by-row stitching (Saket Saurabh via Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master ab605910e -> 1f6949f95
HIVE-14233 - Improve vectorization for ACID by eliminating row-by-row stitching (Saket Saurabh via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1f6949f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1f6949f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1f6949f9
Branch: refs/heads/master
Commit: 1f6949f958d63f5b1552a2417d7c042a416cdbe0
Parents: ab60591
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 30 20:38:28 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 30 20:38:28 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 134 +--
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 7 +-
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 822 +++++++++++++++++++
...ommands2WithSplitUpdateAndVectorization.java | 52 ++
.../TestVectorizedOrcAcidRowBatchReader.java | 263 ++++++
.../queries/clientpositive/acid_vectorization.q | 12 +
.../clientpositive/acid_vectorization.q.out | 62 ++
8 files changed, 1302 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb0d96f..aa03b63 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1187,6 +1187,12 @@ public class HiveConf extends Configuration {
HIVE_TRANSACTIONAL_TABLE_SCAN("hive.transactional.table.scan", false,
"internal usage only -- do transaction (ACID) table scan.", true),
+ HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY("hive.transactional.events.mem", 10000000,
+ "Vectorized ACID readers can often load all the delete events from all the delete deltas\n"
+ + "into memory to optimize for performance. To prevent out-of-memory errors, this is a rough heuristic\n"
+ + "that limits the total number of delete events that can be loaded into memory at once.\n"
+ + "Roughly it has been set to 10 million delete events per bucket (~160 MB).\n"),
+
HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
"A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index d5172ed..70003ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.impl.SchemaEvolution;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -41,26 +38,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
-import org.apache.orc.ColumnStatistics;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.OrcTail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -73,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Metastore;
import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -89,22 +67,33 @@ import org.apache.hadoop.hive.ql.io.BatchToRowReader;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -117,7 +106,17 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.orc.ColumnStatistics;
import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.SchemaEvolution;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -318,7 +317,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<OrcProto.Type> types = file.getTypes();
options.include(genIncludedColumns(types, conf, isOriginal));
setSearchArgument(options, types, conf, isOriginal);
- return (RecordReader) file.rowsOptions(options);
+ return file.rowsOptions(options);
}
public static boolean isOriginal(Reader file) {
@@ -1793,17 +1792,28 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
reporter.setStatus(inputSplit.toString());
+ boolean isFastVectorizedReaderAvailable =
+ VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit);
+
+ if (vectorMode && isFastVectorizedReaderAvailable) {
+ // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching.
+ return (org.apache.hadoop.mapred.RecordReader)
+ new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter);
+ }
+
Options options = new Options(conf).reporter(reporter);
final RowReader<OrcStruct> inner = getReader(inputSplit, options);
-
- if (vectorMode) {
+ if (vectorMode && !isFastVectorizedReaderAvailable) {
+ // Vectorized regular ACID reader that does row-by-row stitching.
return (org.apache.hadoop.mapred.RecordReader)
new VectorizedOrcAcidRowReader(inner, conf,
Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
} else {
+ // Non-vectorized regular ACID reader.
return new NullKeyRecordReader(inner, conf);
}
}
+
/**
* Return a RecordReader that is compatible with the Hive 0.12 reader
* with NullWritable for the key instead of RecordIdentifier.
@@ -1890,35 +1900,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
: AcidUtils.deserializeDeltas(root, split.getDeltas());
final Configuration conf = options.getConfiguration();
-
- /**
- * Do we have schema on read in the configuration variables?
- */
- TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
-
- final Reader reader;
- final int bucket;
- Reader.Options readOptions = new Reader.Options().schema(schema);
+ final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
+ final int bucket = OrcInputFormat.getBucketForSplit(conf, split);
+ final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
readOptions.range(split.getStart(), split.getLength());
- // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
- final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
- readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
- setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
-
- if (split.hasBase()) {
- bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf)
- .getBucket();
- OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
- .maxLength(split.getFileLength());
- if (split.hasFooter()) {
- readerOptions.orcTail(split.getOrcTail());
- }
- reader = OrcFile.createReader(path, readerOptions);
- } else {
- bucket = (int) split.getStart();
- reader = null;
- }
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
new ValidReadTxnList(txnString);
@@ -1930,7 +1916,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public ObjectInspector getObjectInspector() {
- return OrcStruct.createObjectInspector(0, schemaTypes);
+ return OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(readOptions.getSchema()));
}
@Override
@@ -1992,6 +1978,44 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
directory);
}
+ static Reader.Options createOptionsForReader(Configuration conf) {
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema =
+ OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+ Reader.Options readerOptions = new Reader.Options().schema(schema);
+ // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+ final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+ readerOptions.include(OrcInputFormat.genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+ OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+ return readerOptions;
+ }
+
+ static Reader createOrcReaderForSplit(Configuration conf, OrcSplit orcSplit) throws IOException {
+ Path path = orcSplit.getPath();
+ Reader reader;
+ if (orcSplit.hasBase()) {
+ OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+ readerOptions.maxLength(orcSplit.getFileLength());
+ if (orcSplit.hasFooter()) {
+ readerOptions.orcTail(orcSplit.getOrcTail());
+ }
+ reader = OrcFile.createReader(path, readerOptions);
+ } else {
+ reader = null;
+ }
+ return reader;
+ }
+
+ static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) {
+ if (orcSplit.hasBase()) {
+ return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+ } else {
+ return (int) orcSplit.getStart();
+ }
+ }
+
public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
List<StripeStatistics> stripeStats, int stripeCount) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index e46ca51..f4e06ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -49,7 +50,6 @@ import org.apache.hadoop.io.Text;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
implements RecordReader {
@@ -79,6 +79,10 @@ public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
return true;
}
+ public VectorizedRowBatch createRowBatch() {
+ return this.schema.createRowBatch();
+ }
+
@Override
public long getRowNumber() {
return baseRow + rowInBatch;
@@ -129,6 +133,7 @@ public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
return previous;
}
+ @Override
public boolean nextBatch(VectorizedRowBatch theirBatch) throws IOException {
// If the user hasn't been reading by row, use the fast path.
if (rowInBatch >= batch.size) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
new file mode 100644
index 0000000..75c7680
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -0,0 +1,822 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+/**
+ * A fast vectorized batch reader class for ACID when split-update behavior is enabled.
+ * When split-update is turned on, row-by-row stitching could be avoided to create the final
+ * version of a row. Essentially, there are only insert and delete events. Insert events can be
+ * directly read from the base files/insert_only deltas in vectorized row batches. The deleted
+ * rows can then be easily indicated via the 'selected' field of the vectorized row batch.
+ * Refer HIVE-14233 for more details.
+ */
+public class VectorizedOrcAcidRowBatchReader
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable,VectorizedRowBatch> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class);
+
+ private org.apache.hadoop.hive.ql.io.orc.RecordReader baseReader;
+ private VectorizedRowBatchCtx rbCtx;
+ private VectorizedRowBatch vectorizedRowBatchBase;
+ private long offset;
+ private long length;
+ private float progress = 0.0f;
+ private Object[] partitionValues;
+ private boolean addPartitionCols = true;
+ private ValidTxnList validTxnList;
+ private DeleteEventRegistry deleteEventRegistry;
+
+ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf,
+ Reporter reporter) throws IOException {
+
+ final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
+ final AcidUtils.AcidOperationalProperties acidOperationalProperties
+ = AcidUtils.getAcidOperationalProperties(conf);
+
+ // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
+ // enabled for an ACID case and the file format is ORC.
+ boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
+ || !(inputSplit instanceof OrcSplit);
+ if (isReadNotAllowed) {
+ OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
+ }
+ final OrcSplit orcSplit = (OrcSplit) inputSplit;
+
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+ reporter.setStatus(orcSplit.toString());
+ Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, orcSplit);
+ Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf);
+ readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions);
+
+ this.offset = orcSplit.getStart();
+ this.length = orcSplit.getLength();
+
+ // Careful with the range here now, we do not want to read the whole base file like deltas.
+ this.baseReader = reader.rowsOptions(readerOptions.range(offset, length));
+
+ // VectorizedRowBatchBase schema is picked up from the baseReader because the SchemaEvolution
+ // stuff happens at the ORC layer that understands how to map user schema to acid schema.
+ if (this.baseReader instanceof RecordReaderImpl) {
+ this.vectorizedRowBatchBase = ((RecordReaderImpl) this.baseReader).createRowBatch();
+ } else {
+ throw new IOException("Failed to create vectorized row batch for the reader of type "
+ + this.baseReader.getClass().getName());
+ }
+
+ int partitionColumnCount = (rbCtx != null) ? rbCtx.getPartitionColumnCount() : 0;
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ VectorizedRowBatchCtx.getPartitionValues(rbCtx, conf, orcSplit, partitionValues);
+ } else {
+ partitionValues = null;
+ }
+
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+
+ // Clone readerOptions for deleteEvents.
+ Reader.Options deleteEventReaderOptions = readerOptions.clone();
+ // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
+ // we always want to read all the delete delta files.
+ deleteEventReaderOptions.range(0, Long.MAX_VALUE);
+ // Disable SARGs for deleteEventReaders, as SARGs have no meaning.
+ deleteEventReaderOptions.searchArgument(null, null);
+ try {
+ // See if we can load all the delete events from all the delete deltas in memory...
+ this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+ } catch (DeleteEventsOverflowMemoryException e) {
+ // If not, then create a set of hanging readers that do sort-merge to find the next smallest
+ // delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
+ this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+ }
+ }
+
+ /**
+ * Returns whether it is possible to create a valid instance of this class for a given split.
+ * @param conf is the job configuration
+ * @param inputSplit
+ * @return true if it is possible, else false.
+ */
+ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) {
+ if (!(inputSplit instanceof OrcSplit)) {
+ return false; // must be an instance of OrcSplit.
+ }
+ // First check if we are reading any original files in the split.
+ // To simplify the vectorization logic, the vectorized acid row batch reader does not handle
+ // original files for now as they have a different schema than a regular ACID file.
+ final OrcSplit split = (OrcSplit) inputSplit;
+ if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) {
+ // When split-update is turned on for ACID, a more optimized vectorized batch reader
+ // can be created. But still only possible when we are *NOT* reading any originals.
+ return true;
+ }
+ return false; // no split-update or possibly reading originals!
+ }
+
+ private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
+ Path path = orcSplit.getPath();
+ Path root;
+ if (orcSplit.hasBase()) {
+ if (orcSplit.isOriginal()) {
+ root = path.getParent();
+ } else {
+ root = path.getParent().getParent();
+ }
+ } else {
+ root = path;
+ }
+ return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas());
+ }
+
+ @Override
+ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+ try {
+ // Check and update partition cols if necessary. Ideally, this should be done
+ // in CreateValue as the partition is constant per split. But since Hive uses
+ // CombineHiveRecordReader and
+ // as this does not call CreateValue for each new RecordReader it creates, this check is
+ // required in next()
+ if (addPartitionCols) {
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(value, partitionValues);
+ }
+ addPartitionCols = false;
+ }
+ if (!baseReader.nextBatch(vectorizedRowBatchBase)) {
+ return false;
+ }
+ } catch (Exception e) {
+ throw new IOException("error iterating", e);
+ }
+
+ // Once we have read the VectorizedRowBatchBase from the file, there are two kinds of cases
+ // for which we might have to discard rows from the batch:
+ // Case 1- when the row is created by a transaction that is not valid, or
+ // Case 2- when the row has been deleted.
+ // We will go through the batch to discover rows which match any of the cases and specifically
+ // remove them from the selected vector. Of course, selectedInUse should also be set.
+
+ BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size);
+ if (vectorizedRowBatchBase.selectedInUse) {
+ // When selectedInUse is true, start with every bit set to false and selectively set
+ // certain bits to true based on the selected[] vector.
+ selectedBitSet.set(0, vectorizedRowBatchBase.size, false);
+ for (int j = 0; j < vectorizedRowBatchBase.size; ++j) {
+ int i = vectorizedRowBatchBase.selected[j];
+ selectedBitSet.set(i);
+ }
+ } else {
+ // When selectedInUse is set to false, everything in the batch is selected.
+ selectedBitSet.set(0, vectorizedRowBatchBase.size, true);
+ }
+
+ // Case 1- find rows which belong to transactions that are not valid.
+ findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+
+ // Case 2- find rows which have been deleted.
+ this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet);
+
+ if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) {
+ // None of the cases above matched and everything is selected. Hence, we will use the
+ // same values for the selected and selectedInUse.
+ value.size = vectorizedRowBatchBase.size;
+ value.selected = vectorizedRowBatchBase.selected;
+ value.selectedInUse = vectorizedRowBatchBase.selectedInUse;
+ } else {
+ value.size = selectedBitSet.cardinality();
+ value.selectedInUse = true;
+ value.selected = new int[selectedBitSet.cardinality()];
+ // This loop fills up the selected[] vector with all the index positions that are selected.
+ for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0;
+ setBitIndex >= 0;
+ setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) {
+ value.selected[selectedItr] = setBitIndex;
+ }
+ }
+
+ // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
+ // NOTE: We only link up the user columns and not the ACID metadata columns because this
+ // vectorized code path is not being used in cases of update/delete, when the metadata columns
+ // would be expected to be passed up the operator pipeline. This is because
+ // currently the update/delete specifically disable vectorized code paths.
+ // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
+ StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
+ // Transfer columnVector objects from base batch to outgoing batch.
+ System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
+ progress = baseReader.getProgress();
+ return true;
+ }
+
+ private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
+ if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
+ // When we have repeating values, we can unset the whole bitset at once
+ // if the repeating value is not a valid transaction.
+ long currentTransactionIdForBatch = ((LongColumnVector)
+ batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0];
+ if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) {
+ selectedBitSet.clear(0, batch.size);
+ }
+ return;
+ }
+ long[] currentTransactionVector =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector;
+ // Loop through the bits that are set to true and mark those rows as false, if their
+ // current transactions are not valid.
+ for (int setBitIndex = selectedBitSet.nextSetBit(0);
+ setBitIndex >= 0;
+ setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
+ if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) {
+ selectedBitSet.clear(setBitIndex);
+ }
+ }
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ return rbCtx.createVectorizedRowBatch();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return offset + (long) (progress * length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ this.baseReader.close();
+ } finally {
+ this.deleteEventRegistry.close();
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return progress;
+ }
+
+ @VisibleForTesting
+ DeleteEventRegistry getDeleteEventRegistry() {
+ return deleteEventRegistry;
+ }
+
+ /**
+ * An interface that can determine which rows have been deleted
+ * from a given vectorized row batch. Implementations of this interface
+ * will read the delete delta files and will create their own internal
+ * data structures to maintain record ids of the records that got deleted.
+ */
+ static interface DeleteEventRegistry {
+ /**
+ * Modifies the passed bitset to indicate which of the rows in the batch
+ * have been deleted. Assumes that the batch.size is equal to bitset size.
+ * @param batch
+ * @param selectedBitSet
+ * @throws IOException
+ */
+ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException;
+
+ /**
+ * The close() method can be called externally to signal the implementing classes
+ * to free up resources.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+ }
+
+ /**
+ * An implementation for DeleteEventRegistry that opens the delete delta files all
+ * at once, and then uses the sort-merge algorithm to maintain a sorted list of
+ * delete events. This internally uses the OrcRawRecordMerger and maintains a constant
+ * amount of memory usage, given the number of delete delta files. Therefore, this
+ * implementation will be picked up when the memory pressure is high.
+ */
+ static class SortMergedDeleteEventRegistry implements DeleteEventRegistry {
+ private OrcRawRecordMerger deleteRecords;
+ private OrcRawRecordMerger.ReaderKey deleteRecordKey;
+ private OrcStruct deleteRecordValue;
+ private boolean isDeleteRecordAvailable = true;
+ private ValidTxnList validTxnList;
+
+ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
+ throws IOException {
+ final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
+ if (deleteDeltas.length > 0) {
+ int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+ this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
+ validTxnList, readerOptions, deleteDeltas);
+ this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
+ this.deleteRecordValue = this.deleteRecords.createValue();
+ // Initialize the first value in the delete reader.
+ this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
+ } else {
+ this.isDeleteRecordAvailable = false;
+ this.deleteRecordKey = null;
+ this.deleteRecordValue = null;
+ this.deleteRecords = null;
+ }
+ }
+
+ @Override
+ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet)
+ throws IOException {
+ if (!isDeleteRecordAvailable) {
+ return;
+ }
+
+ long[] originalTransaction =
+ batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+ long[] bucket =
+ batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector;
+ long[] rowId =
+ batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
+
+ // The following repeatedX values will be set, if any of the columns are repeating.
+ long repeatedOriginalTransaction = (originalTransaction != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+ long repeatedBucket = (bucket != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+ long repeatedRowId = (rowId != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0];
+
+
+ // Get the first valid row in the batch still available.
+ int firstValidIndex = selectedBitSet.nextSetBit(0);
+ if (firstValidIndex == -1) {
+ return; // Everything in the batch has already been filtered out.
+ }
+ RecordIdentifier firstRecordIdInBatch =
+ new RecordIdentifier(
+ originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction,
+ bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket,
+ rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId);
+
+ // Get the last valid row in the batch still available.
+ int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1);
+ RecordIdentifier lastRecordIdInBatch =
+ new RecordIdentifier(
+ originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction,
+ bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket,
+ rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId);
+
+ // We must iterate over all the delete records, until we find one record with
+ // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records.
+ while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) {
+ isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue);
+ if (!isDeleteRecordAvailable) return; // exhausted all delete records, return.
+ }
+
+ // If we are here, then we have established that firstRecordInBatch <= deleteRecord.
+ // Now continue marking records which have been deleted until we reach the end of the batch
+ // or we exhaust all the delete records.
+
+ int currIndex = firstValidIndex;
+ RecordIdentifier currRecordIdInBatch = new RecordIdentifier();
+ while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) {
+ currRecordIdInBatch.setValues(
+ (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction,
+ (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket,
+ (rowId != null) ? rowId[currIndex] : repeatedRowId);
+
+ if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) {
+ // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted.
+ selectedBitSet.clear(currIndex);
+ currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index.
+ } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) {
+ // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the
+ // next record in the batch.
+ // But before that, can we short-circuit and skip the entire batch itself
+ // by checking if the deleteRecordId > lastRecordInBatch?
+ if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) {
+ return; // Yay! We short-circuited, skip everything remaining in the batch and return.
+ }
+ currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index.
+ } else {
+ // We have deleteRecordId < currRecordIdInBatch, we must now move on to find
+ // next the larger deleteRecordId that can possibly match anything in the batch.
+ isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.deleteRecords != null) {
+ this.deleteRecords.close();
+ }
+ }
+ }
+
+ /**
+ * An implementation for DeleteEventRegistry that optimizes for performance by loading
+ * all the delete events into memory at once from all the delete delta files.
+ * It starts by reading all the delete events through a regular sort merge logic
+ * into two vectors- one for original transaction id (otid), and the other for row id.
+ * (In the current version, since the bucket id should be same for all the delete deltas,
+ * it is not stored). The otids are likely to be repeated very often, as a single transaction
+ * often deletes thousands of rows. Hence, the otid vector is compressed to only store the
+ * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a
+ * record id is deleted or not, is done by performing a binary search on the
+ * compressed otid range. If a match is found, then a binary search is then performed on
+ * the larger rowId vector between the given toIndex and fromIndex. Of course, there is rough
+ * heuristic that prevents creation of an instance of this class if the memory pressure is high.
+ * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios.
+ */
+ static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {
+ /**
+ * A simple wrapper class to hold the (otid, rowId) pair.
+ */
+ static class DeleteRecordKey implements Comparable<DeleteRecordKey> {
+ private long originalTransactionId;
+ private long rowId;
+ public DeleteRecordKey() {
+ this.originalTransactionId = -1;
+ this.rowId = -1;
+ }
+ public DeleteRecordKey(long otid, long rowId) {
+ this.originalTransactionId = otid;
+ this.rowId = rowId;
+ }
+ public void set(long otid, long rowId) {
+ this.originalTransactionId = otid;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public int compareTo(DeleteRecordKey other) {
+ if (other == null) {
+ return -1;
+ }
+ if (originalTransactionId != other.originalTransactionId) {
+ return originalTransactionId < other.originalTransactionId ? -1 : 1;
+ }
+ if (rowId != other.rowId) {
+ return rowId < other.rowId ? -1 : 1;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * This class actually reads the delete delta files in vectorized row batches.
+ * For every call to next(), it returns the next smallest record id in the file if available.
+ * Internally, the next() buffers a row batch and maintains an index pointer, reading the
+ * next batch when the previous batch is exhausted.
+ */
+ static class DeleteReaderValue {
+ private VectorizedRowBatch batch;
+ private final RecordReader recordReader;
+ private int indexPtrInBatch;
+ private final int bucketForSplit; // The bucket value should be same for all the records.
+ private final ValidTxnList validTxnList;
+
+ public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
+ ValidTxnList validTxnList) throws IOException {
+ this.recordReader = deleteDeltaReader.rowsOptions(readerOptions);
+ this.bucketForSplit = bucket;
+ this.batch = deleteDeltaReader.getSchema().createRowBatch();
+ if (!recordReader.nextBatch(batch)) { // Read the first batch.
+ this.batch = null; // Oh! the first batch itself was null. Close the reader.
+ }
+ this.indexPtrInBatch = 0;
+ this.validTxnList = validTxnList;
+ }
+
+ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException {
+ if (batch == null) {
+ return false;
+ }
+ boolean isValidNext = false;
+ while (!isValidNext) {
+ if (indexPtrInBatch >= batch.size) {
+ // We have exhausted our current batch, read the next batch.
+ if (recordReader.nextBatch(batch)) {
+ // Whenever we are reading a batch, we must ensure that all the records in the batch
+ // have the same bucket id as the bucket id of the split. If not, throw exception.
+ // NOTE: this assertion might not hold, once virtual bucketing is in place. However,
+ // it should be simple to fix that case. Just replace check for bucket equality with
+ // a check for valid bucket mapping. Until virtual bucketing is added, it means
+ // either the split computation got messed up or we found some corrupted records.
+ long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+ if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating)
+ || (bucketForRecord != bucketForSplit)){
+ throw new IOException("Corrupted records with different bucket ids "
+ + "from the containing bucket file found! Expected bucket id "
+ + bucketForSplit + ", however found the bucket id " + bucketForRecord);
+ }
+ indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning.
+ } else {
+ return false; // no more batches to read, exhausted the reader.
+ }
+ }
+ int originalTransactionIndex =
+ batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+ long originalTransaction =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+ long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
+ int currentTransactionIndex =
+ batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+ long currentTransaction =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+ ++indexPtrInBatch;
+ if (validTxnList.isTxnValid(currentTransaction)) {
+ isValidNext = true;
+ deleteRecordKey.set(originalTransaction, rowId);
+ }
+ }
+ return true;
+ }
+
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+ }
+
+ /**
+ * A CompressedOtid class stores a compressed representation of the original
+ * transaction ids (otids) read from the delete delta files. Since the record ids
+ * are sorted by (otid, rowId) and otids are highly likely to be repetitive, it is
+ * efficient to compress them as a CompressedOtid that stores the fromIndex and
+ * the toIndex. These fromIndex and toIndex reference the larger vector formed by
+ * concatenating the correspondingly ordered rowIds.
+ */
+ private class CompressedOtid implements Comparable<CompressedOtid> {
+ long originalTransactionId;
+ int fromIndex; // inclusive
+ int toIndex; // exclusive
+
+ public CompressedOtid(long otid, int fromIndex, int toIndex) {
+ this.originalTransactionId = otid;
+ this.fromIndex = fromIndex;
+ this.toIndex = toIndex;
+ }
+
+ @Override
+ public int compareTo(CompressedOtid other) {
+ // When comparing the CompressedOtid, the one with the lesser value is smaller.
+ if (originalTransactionId != other.originalTransactionId) {
+ return originalTransactionId < other.originalTransactionId ? -1 : 1;
+ }
+ return 0;
+ }
+ }
+
+ private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
+ private long rowIds[];
+ private CompressedOtid compressedOtids[];
+ private ValidTxnList validTxnList;
+
+ public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
+ Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
+ int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+ this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
+ this.rowIds = null;
+ this.compressedOtids = null;
+ int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY);
+
+ try {
+ final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
+ if (deleteDeltaDirs.length > 0) {
+ int totalDeleteEventCount = 0;
+ for (Path deleteDeltaDir : deleteDeltaDirs) {
+ Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket);
+ FileSystem fs = deleteDeltaFile.getFileSystem(conf);
+ // NOTE: Calling last flush length below is more for future-proofing when we have
+ // streaming deletes. But currently we don't support streaming deletes, and this can
+ // be removed if this becomes a performance issue.
+ long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile);
+ // NOTE: A check for existence of deleteDeltaFile is required because we may not have
+ // deletes for the bucket being taken into consideration for this split processing.
+ if (length != -1 && fs.exists(deleteDeltaFile)) {
+ Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile,
+ OrcFile.readerOptions(conf).maxLength(length));
+ AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader);
+ if (acidStats.deletes == 0) {
+ continue; // just a safe check to ensure that we are not reading empty delete files.
+ }
+ totalDeleteEventCount += acidStats.deletes;
+ if (totalDeleteEventCount > maxEventsInMemory) {
+ // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas
+ // into memory. To prevent out-of-memory errors, this check is a rough heuristic that
+ // prevents creation of an object of this class if the total number of delete events
+ // exceed this value. By default, it has been set to 10 million delete events per bucket.
+ LOG.info("Total number of delete events exceeds the maximum number of delete events "
+ + "that can be loaded into memory for the delete deltas in the directory at : "
+ + deleteDeltaDirs.toString() +". The max limit is currently set at "
+ + maxEventsInMemory + " and can be changed by setting the Hive config variable "
+ + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname);
+ throw new DeleteEventsOverflowMemoryException();
+ }
+ DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
+ readerOptions, bucket, validTxnList);
+ DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+ if (deleteReaderValue.next(deleteRecordKey)) {
+ sortMerger.put(deleteRecordKey, deleteReaderValue);
+ } else {
+ deleteReaderValue.close();
+ }
+ }
+ }
+ if (totalDeleteEventCount > 0) {
+ // Initialize the rowId array when we have some delete events.
+ rowIds = new long[totalDeleteEventCount];
+ readAllDeleteEventsFromDeleteDeltas();
+ }
+ }
+ } catch(IOException|DeleteEventsOverflowMemoryException e) {
+ close(); // close any open readers, if there was some exception during initialization.
+ throw e; // rethrow the exception so that the caller can handle.
+ }
+ }
+
+ private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
+ if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read.
+ int distinctOtids = 0;
+ long lastSeenOtid = -1;
+ long otids[] = new long[rowIds.length];
+ int index = 0;
+ while (!sortMerger.isEmpty()) {
+ // The sortMerger is a heap data structure that stores a pair of
+ // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey.
+ // The deleteReaderValue is the actual wrapper class that has the reference to the
+ // underlying delta file that is being read, and its corresponding deleteRecordKey
+ // is the smallest record id for that file. In each iteration of this loop, we extract(poll)
+ // the minimum deleteRecordKey pair. Once we have processed that deleteRecordKey, we
+ // advance the pointer for the corresponding deleteReaderValue. If the underlying file
+ // itself has no more records, then we remove that pair from the heap, or else we
+ // add the updated pair back to the heap.
+ Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
+ DeleteRecordKey deleteRecordKey = entry.getKey();
+ DeleteReaderValue deleteReaderValue = entry.getValue();
+ otids[index] = deleteRecordKey.originalTransactionId;
+ rowIds[index] = deleteRecordKey.rowId;
+ ++index;
+ if (lastSeenOtid != deleteRecordKey.originalTransactionId) {
+ ++distinctOtids;
+ lastSeenOtid = deleteRecordKey.originalTransactionId;
+ }
+ if (deleteReaderValue.next(deleteRecordKey)) {
+ sortMerger.put(deleteRecordKey, deleteReaderValue);
+ } else {
+ deleteReaderValue.close(); // Exhausted reading all records, close the reader.
+ }
+ }
+
+ // Once we have processed all the delete events and seen all the distinct otids,
+ // we compress the otids into CompressedOtid data structure that records
+ // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid.
+ this.compressedOtids = new CompressedOtid[distinctOtids];
+ lastSeenOtid = otids[0];
+ int fromIndex = 0, pos = 0;
+ for (int i = 1; i < otids.length; ++i) {
+ if (otids[i] != lastSeenOtid) {
+ compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i);
+ lastSeenOtid = otids[i];
+ fromIndex = i;
+ ++pos;
+ }
+ }
+ // account for the last distinct otid
+ compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length);
+ }
+
+ private boolean isDeleted(long otid, long rowId) {
+ if (compressedOtids == null || rowIds == null) {
+ return false;
+ }
+ // To find if a given (otid, rowId) pair is deleted or not, we perform
+ // two binary searches at most. The first binary search is on the
+ // compressed otids. If a match is found, only then we do the next
+ // binary search in the larger rowId vector between the given toIndex & fromIndex.
+
+ // Check if otid is outside the range of all otids present.
+ if (otid < compressedOtids[0].originalTransactionId
+ || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) {
+ return false;
+ }
+ // Create a dummy key for searching the otid in the compressed otid ranges.
+ CompressedOtid key = new CompressedOtid(otid, -1, -1);
+ int pos = Arrays.binarySearch(compressedOtids, key);
+ if (pos >= 0) {
+ // Otid with the given value found! Searching now for rowId...
+ key = compressedOtids[pos]; // Retrieve the actual CompressedOtid that matched.
+ // Check if rowId is outside the range of all rowIds present for this otid.
+ if (rowId < rowIds[key.fromIndex]
+ || rowId > rowIds[key.toIndex - 1]) {
+ return false;
+ }
+ if (Arrays.binarySearch(rowIds, key.fromIndex, key.toIndex, rowId) >= 0) {
+ return true; // rowId also found!
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet)
+ throws IOException {
+ if (rowIds == null || compressedOtids == null) {
+ return;
+ }
+ // Iterate through the batch and for each (otid, rowid) in the batch
+ // check if it is deleted or not.
+
+ long[] originalTransactionVector =
+ batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+ long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1
+ : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+
+ long[] rowIdVector =
+ ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
+
+ for (int setBitIndex = selectedBitSet.nextSetBit(0);
+ setBitIndex >= 0;
+ setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
+ long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex]
+ : repeatedOriginalTransaction ;
+ long rowId = rowIdVector[setBitIndex];
+ if (isDeleted(otid, rowId)) {
+ selectedBitSet.clear(setBitIndex);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // ColumnizedDeleteEventRegistry reads all the delete events into memory during initialization
+ // and it closes the delete event readers after it. If an exception gets thrown during
+ // initialization, we may have to close any readers that are still left open.
+ while (!sortMerger.isEmpty()) {
+ Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry();
+ entry.getValue().close(); // close the reader for this entry
+ }
+ }
+ }
+
+ static class DeleteEventsOverflowMemoryException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
new file mode 100644
index 0000000..44a9412
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by
+ * default, and having 'transactional_properties' set to 'default'. This specifically tests the
+ * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
+ */
+public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate {
+
+ public TestTxnCommands2WithSplitUpdateAndVectorization() {
+ super();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ }
+
+ @Override
+ @Test
+ public void testFailureOnAlteringTransactionalProperties() throws Exception {
+ // Override to do nothing, as the this test is not related with vectorization.
+ // The parent class creates a temporary table in this test and alters its properties.
+ // To not override this test, that temporary table needs to be renamed. However, as
+ // mentioned this does not serve any purpose, as this test does not relate to vectorization.
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
new file mode 100644
index 0000000..4656ab2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry;
+import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+/**
+ * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set
+ * of delete delta files. The split is on an insert delta and there are multiple delete deltas
+ * with interleaving list of record ids that get deleted. Correctness is tested by validating
+ * that the correct set of record ids are returned in sorted order for valid transactions only.
+ */
+public class TestVectorizedOrcAcidRowBatchReader {
+
+ private static final long NUM_ROWID_PER_OTID = 15000L;
+ private static final long NUM_OTID = 10L;
+ private JobConf conf;
+ private FileSystem fs;
+ private Path root;
+
+ static class DummyRow {
+ LongWritable field;
+ RecordIdentifier ROW__ID;
+
+ DummyRow(long val) {
+ field = new LongWritable(val);
+ ROW__ID = null;
+ }
+
+ DummyRow(long val, long rowId, long origTxn, int bucket) {
+ field = new LongWritable(val);
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+ }
+
+ static String getColumnNamesProperty() {
+ return "x";
+ }
+ static String getColumnTypesProperty() {
+ return "bigint";
+ }
+
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new JobConf();
+ conf.set("bucket_count", "1");
+ conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true);
+ conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, DummyRow.getColumnNamesProperty());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, DummyRow.getColumnTypesProperty());
+ conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+ conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+ fs = FileSystem.getLocal(conf);
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ root = new Path(workDir, "TestVectorizedOrcAcidRowBatch.testDump");
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ int bucket = 0;
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .bucket(bucket)
+ .writingBase(false)
+ .minimumTransactionId(1)
+ .maximumTransactionId(NUM_OTID)
+ .inspector(inspector)
+ .reporter(Reporter.NULL)
+ .recordIdColumn(1)
+ .finalDestination(root);
+ RecordUpdater updater = new OrcRecordUpdater(root, options);
+ // Create a single insert delta with 150,000 rows, with 15000 rowIds per original transaction id.
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; ++j) {
+ long payload = (i-1) * NUM_ROWID_PER_OTID + j;
+ updater.insert(i, new DummyRow(payload, j, i, bucket));
+ }
+ }
+ updater.close(false);
+
+ // Now create three types of delete deltas- first has rowIds divisible by 2 but not by 3,
+ // second has rowIds divisible by 3 but not by 2, and the third has rowIds divisible by
+ // both 2 and 3. This should produce delete deltas that will thoroughly test the sort-merge
+ // logic when the delete events in the delete delta files interleave in the sort order.
+
+ // Create a delete delta that has rowIds divisible by 2 but not by 3. This will produce
+ // a delete delta file with 50,000 delete events.
+ long currTxnId = NUM_OTID + 1;
+ options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+ updater = new OrcRecordUpdater(root, options);
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+ if (j % 2 == 0 && j % 3 != 0) {
+ updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+ }
+ }
+ }
+ updater.close(false);
+ // Now, create a delete delta that has rowIds divisible by 3 but not by 2. This will produce
+ // a delete delta file with 25,000 delete events.
+ currTxnId = NUM_OTID + 2;
+ options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+ updater = new OrcRecordUpdater(root, options);
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+ if (j % 2 != 0 && j % 3 == 0) {
+ updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+ }
+ }
+ }
+ updater.close(false);
+ // Now, create a delete delta that has rowIds divisible by both 3 and 2. This will produce
+ // a delete delta file with 25,000 delete events.
+ currTxnId = NUM_OTID + 3;
+ options.minimumTransactionId(currTxnId).maximumTransactionId(currTxnId);
+ updater = new OrcRecordUpdater(root, options);
+ for (long i = 1; i <= NUM_OTID; ++i) {
+ for (long j = 0; j < NUM_ROWID_PER_OTID; j += 1) {
+ if (j % 2 == 0 && j % 3 == 0) {
+ updater.delete(currTxnId, new DummyRow(-1, j, i, bucket));
+ }
+ }
+ }
+ updater.close(false);
+ }
+
+ private List<OrcSplit> getSplits() throws Exception {
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+ OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
+ OrcInputFormat.AcidDirInfo adi = gen.call();
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
+ null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ null, null, true);
+ assertEquals(1, splitStrategies.size());
+ List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(1, splits.size());
+ assertEquals("file://" + root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
+ splits.get(0).getPath().toUri().toString());
+ assertFalse(splits.get(0).isOriginal());
+ return splits;
+ }
+
+ @Test
+ public void testVectorizedOrcAcidRowBatchReader() throws Exception {
+ testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName());
+
+ // To test the SortMergedDeleteEventRegistry, we need to explicitly set the
+ // HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value.
+ int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000);
+ conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000);
+ testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName());
+
+ // Restore the old value.
+ conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue);
+ }
+
+
+ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) throws Exception {
+ List<OrcSplit> splits = getSplits();
+ // Mark one of the transactions as an exception to test that invalid transactions
+ // are being handled properly.
+ conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5
+
+ VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL);
+ if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) {
+ assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry);
+ }
+ if (deleteEventRegistry.equals(SortMergedDeleteEventRegistry.class.getName())) {
+ assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof SortMergedDeleteEventRegistry);
+ }
+ TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+ VectorizedRowBatch vectorizedRowBatch = schema.createRowBatch();
+ vectorizedRowBatch.setPartitionInfo(1, 0); // set data column count as 1.
+ long previousPayload = Long.MIN_VALUE;
+ while (vectorizedReader.next(null, vectorizedRowBatch)) {
+ assertTrue(vectorizedRowBatch.selectedInUse);
+ LongColumnVector col = (LongColumnVector) vectorizedRowBatch.cols[0];
+ for (int i = 0; i < vectorizedRowBatch.size; ++i) {
+ int idx = vectorizedRowBatch.selected[i];
+ long payload = col.vector[idx];
+ long otid = (payload / NUM_ROWID_PER_OTID) + 1;
+ long rowId = payload % NUM_ROWID_PER_OTID;
+ assertFalse(rowId % 2 == 0 || rowId % 3 == 0);
+ assertTrue(otid != 5); // Check that txn#5 has been excluded.
+ assertTrue(payload > previousPayload); // Check that the data is in sorted order.
+ previousPayload = payload;
+ }
+ }
+ }
+
+ @Test
+ public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
+ OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getLegacy().toInt());
+ // Test false when trying to create a vectorized ACID row batch reader for a legacy table.
+ assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ Mockito.when(mockSplit.isOriginal()).thenReturn(true);
+ // Test false when trying to create a vectorized ACID row batch reader when reading originals.
+ assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+
+ // A positive test case.
+ Mockito.when(mockSplit.isOriginal()).thenReturn(false);
+ assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/queries/clientpositive/acid_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization.q b/ql/src/test/queries/clientpositive/acid_vectorization.q
index 832909b..4c37563 100644
--- a/ql/src/test/queries/clientpositive/acid_vectorization.q
+++ b/ql/src/test/queries/clientpositive/acid_vectorization.q
@@ -15,3 +15,15 @@ set hive.vectorized.execution.enabled=true;
delete from acid_vectorized where b = 'foo';
set hive.vectorized.execution.enabled=true;
select a, b from acid_vectorized order by a, b;
+
+
+CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default');
+insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10;
+set hive.vectorized.execution.enabled=true;
+insert into table acid_fast_vectorized values (1, 'bar');
+set hive.vectorized.execution.enabled=true;
+update acid_fast_vectorized set b = 'foo' where b = 'bar';
+set hive.vectorized.execution.enabled=true;
+delete from acid_fast_vectorized where b = 'foo';
+set hive.vectorized.execution.enabled=true;
+select a, b from acid_fast_vectorized order by a, b;
http://git-wip-us.apache.org/repos/asf/hive/blob/1f6949f9/ql/src/test/results/clientpositive/acid_vectorization.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_vectorization.q.out b/ql/src/test/results/clientpositive/acid_vectorization.q.out
index 1792979..24330bd 100644
--- a/ql/src/test/results/clientpositive/acid_vectorization.q.out
+++ b/ql/src/test/results/clientpositive/acid_vectorization.q.out
@@ -60,3 +60,65 @@ POSTHOOK: Input: default@acid_vectorized
-1070883071 0ruyd6Y50JpdGRf6HqD
-1070551679 iUR3Q
-1069736047 k17Am8uPHWk02cEf1jet
+PREHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: Lineage: acid_fast_vectorized.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ]
+PREHOOK: query: insert into table acid_fast_vectorized values (1, 'bar')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: insert into table acid_fast_vectorized values (1, 'bar')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: Lineage: acid_fast_vectorized.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: delete from acid_fast_vectorized where b = 'foo'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+PREHOOK: Output: default@acid_fast_vectorized
+POSTHOOK: query: delete from acid_fast_vectorized where b = 'foo'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+POSTHOOK: Output: default@acid_fast_vectorized
+PREHOOK: query: select a, b from acid_fast_vectorized order by a, b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_fast_vectorized
+#### A masked pattern was here ####
+POSTHOOK: query: select a, b from acid_fast_vectorized order by a, b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_fast_vectorized
+#### A masked pattern was here ####
+-1073279343 oj1YrV5Wa
+-1073051226 A34p7oRr2WvUJNf
+-1072910839 0iqrc5
+-1072081801 dPkN74F7
+-1072076362 2uLyD28144vklju213J1mr
+-1071480828 aw724t8c5558x2xneC624
+-1071363017 Anj0oF
+-1070883071 0ruyd6Y50JpdGRf6HqD
+-1070551679 iUR3Q
+-1069736047 k17Am8uPHWk02cEf1jet