You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/11/04 17:14:46 UTC
[2/2] hive git commit: HIVE-17458 VectorizedOrcAcidRowBatchReader
doesn't handle 'original' files (Eugene Koifman, reviewed by Sergey Shelukhin)
HIVE-17458 VectorizedOrcAcidRowBatchReader doesn't handle 'original' files (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1649c074
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1649c074
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1649c074
Branch: refs/heads/master
Commit: 1649c0741a8c065b4831e8495a2a01b919d8b6d0
Parents: 7006ade
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Nov 4 10:14:04 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Nov 4 10:14:04 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 13 +-
.../test/resources/testconfiguration.properties | 6 +-
.../org/apache/hadoop/hive/ql/QTestUtil.java | 33 +-
.../apache/hadoop/hive/ql/udf/UDFRunWorker.java | 35 +
.../hive/llap/io/api/impl/LlapRecordReader.java | 3 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 51 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 2 +-
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 11 +-
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 2 +-
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 352 +++++++--
.../ql/io/orc/VectorizedOrcAcidRowReader.java | 143 ----
.../apache/hadoop/hive/ql/TestTxnCommands.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 118 +++
.../TestVectorizedOrcAcidRowBatchReader.java | 24 +-
.../acid_vectorization_original.q | 138 ++++
.../acid_vectorization_original_tez.q | 125 ++++
.../llap/acid_vectorization_original.q.out | 726 ++++++++++++++++++
.../tez/acid_vectorization_original_tez.q.out | 737 +++++++++++++++++++
18 files changed, 2232 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cbe4de5..48341a8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1895,15 +1895,10 @@ public class HiveConf extends Configuration {
"hive.lock.numretries and hive.lock.sleep.between.retries."),
HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
- "Sets the operational properties that control the appropriate behavior for various\n"
- + "versions of the Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
- + "for future versions of ACID. (See HIVE-14035 for details.)\n"
- + "0: Turn on the legacy mode for ACID\n"
- + "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n"
- + "2: Hash-based merge, which combines delta files using GRACE hash join based approach (not implemented)\n"
- + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n"
- + "This is intended to be used as an internal property for future versions of ACID. (See\n" +
- "HIVE-14035 for details.)"),
+ "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" +
+ "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" +
+ "This is intended to be used as an internal property for future versions of ACID. (See\n" +
+ "HIVE-14035 for details.)"),
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 462f332..9642697 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1,3 +1,5 @@
+# Note: the *.shared groups also run on TestCliDriver
+
# NOTE: files should be listed in alphabetical order
minimr.query.files=infer_bucket_sort_map_operators.q,\
infer_bucket_sort_dyn_part.q,\
@@ -50,7 +52,8 @@ minitez.query.files.shared=delete_orig_table.q,\
# NOTE: Add tests to minitez only if it is very
# specific to tez and cannot be added to minillap.
-minitez.query.files=explainuser_3.q,\
+minitez.query.files=acid_vectorization_original_tez.q,\
+ explainuser_3.q,\
explainanalyze_1.q,\
explainanalyze_2.q,\
explainanalyze_3.q,\
@@ -474,6 +477,7 @@ minillaplocal.query.files=\
acid_no_buckets.q, \
acid_globallimit.q,\
acid_vectorization_missing_cols.q,\
+ acid_vectorization_original.q,\
alter_merge_stats_orc.q,\
authorization_view_8.q,\
auto_join30.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 4477954..6c34c08 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -85,9 +85,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
@@ -1561,6 +1559,16 @@ public class QTestUtil {
}
}
}
+ else {
+ for (PatternReplacementPair prp : partialPlanMask) {
+ matcher = prp.pattern.matcher(line);
+ if (matcher.find()) {
+ line = line.replaceAll(prp.pattern.pattern(), prp.replacement);
+ partialMaskWasMatched = true;
+ break;
+ }
+ }
+ }
if (!partialMaskWasMatched) {
for (Pair<Pattern, String> pair : patternsWithMaskComments) {
@@ -1650,7 +1658,26 @@ public class QTestUtil {
"data/warehouse/(.*?/)+\\.hive-staging" // the directory might be db/table/partition
//TODO: add more expected test result here
});
-
+ /**
+ * Pattern to match and (partial) replacement text.
+ * For example, {"transaction":76,"bucketid":8249877}. We just want to mask 76 but a regex that
+ * matches just 76 will match a lot of other things.
+ */
+ private final static class PatternReplacementPair {
+ private final Pattern pattern;
+ private final String replacement;
+ PatternReplacementPair(Pattern p, String r) {
+ pattern = p;
+ replacement = r;
+ }
+ }
+ private final PatternReplacementPair[] partialPlanMask;
+ {
+ ArrayList<PatternReplacementPair> ppm = new ArrayList<>();
+ ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"transactionid\":[1-9][0-9]*,\"bucketid\":"),
+ "{\"transactionid\":### Masked txnid ###,\"bucketid\":"));
+ partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]);
+ }
/* This list may be modified by specific cli drivers to mask strings that change on every test */
private final List<Pair<Pattern, String>> patternsWithMaskComments = new ArrayList<Pair<Pattern, String>>() {{
add(toPatternPair("(pblob|s3.?|swift|wasb.?).*hive-staging.*","### BLOBSTORE_STAGING_PATH ###"));
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java
new file mode 100644
index 0000000..de6aca2
--- /dev/null
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf;
+
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A UDF for testing, which does key/value lookup from a file
+ */
+@Description(name = "runWorker",
+ value = "_FUNC_() - UDF launching Compaction Worker")
+public class UDFRunWorker extends UDF {
+ public void evaluate() throws Exception {
+ TestTxnCommands2.runWorker(SessionState.get().getConf());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index d66fac2..5f010be 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -277,7 +278,7 @@ class LlapRecordReader
acidVrb.cols = cvb.cols;
acidVrb.size = cvb.size;
final VectorizedOrcAcidRowBatchReader acidReader =
- new VectorizedOrcAcidRowBatchReader(split, jobConf, Reporter.NULL,
+ new VectorizedOrcAcidRowBatchReader((OrcSplit)split, jobConf, Reporter.NULL,
new RecordReader<NullWritable, VectorizedRowBatch>() {
@Override
public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index c364343..1e5b841 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1849,6 +1849,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
+ //CombineHiveInputFormat may produce FileSplit that is not OrcSplit
boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
boolean isAcidRead = isAcidRead(conf, inputSplit);
if (!isAcidRead) {
@@ -1868,27 +1869,19 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
reporter.setStatus(inputSplit.toString());
+ //if here we are now doing an Acid read so must have OrcSplit. CombineHiveInputFormat is
+ // disabled for acid path
+ OrcSplit split = (OrcSplit) inputSplit;
- boolean isFastVectorizedReaderAvailable =
- VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit);
-
- if (vectorMode && isFastVectorizedReaderAvailable) {
- // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching.
+ if (vectorMode) {
return (org.apache.hadoop.mapred.RecordReader)
- new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter);
+ new VectorizedOrcAcidRowBatchReader(split, conf, reporter);
}
Options options = new Options(conf).reporter(reporter);
final RowReader<OrcStruct> inner = getReader(inputSplit, options);
- if (vectorMode && !isFastVectorizedReaderAvailable) {
- // Vectorized regular ACID reader that does row-by-row stitching.
- return (org.apache.hadoop.mapred.RecordReader)
- new VectorizedOrcAcidRowReader(inner, conf,
- Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
- } else {
- // Non-vectorized regular ACID reader.
- return new NullKeyRecordReader(inner, conf);
- }
+ // Non-vectorized regular ACID reader.
+ return new NullKeyRecordReader(inner, conf);
}
/**
@@ -1945,38 +1938,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
throws IOException {
final OrcSplit split = (OrcSplit) inputSplit;
- final Path path = split.getPath();
- Path root;
- if (split.hasBase()) {
- if (split.isOriginal()) {
- root = split.getRootDir();
- } else {
- root = path.getParent().getParent();
- assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() +
- " path.p.p=" + root;
- }
- } else {
- throw new IllegalStateException("Split w/o base: " + path);
- }
// Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
AcidUtils.AcidOperationalProperties acidOperationalProperties
= AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+ if(!acidOperationalProperties.isSplitUpdate()) {
+ throw new IllegalStateException("Expected SpliUpdate table: " + split.getPath());
+ }
- // The deltas are decided based on whether split-update has been turned on for the table or not.
- // When split-update is turned off, everything in the delta_x_y/ directory should be treated
- // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory
- // need to be considered as delta, because files in delta_x_y/ will be processed as base files
- // since they only have insert events in them.
- final Path[] deltas =
- acidOperationalProperties.isSplitUpdate() ?
- AcidUtils.deserializeDeleteDeltas(root, split.getDeltas())
- : AcidUtils.deserializeDeltas(root, split.getDeltas());
+ final Path[] deltas = VectorizedOrcAcidRowBatchReader.getDeleteDeltaDirsFromSplit(split);
final Configuration conf = options.getConfiguration();
final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
- mergerOptions.rootPath(root);
+ mergerOptions.rootPath(split.getRootDir());
final int bucket;
if (split.hasBase()) {
AcidOutputFormat.Options acidIOOptions =
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index eed6d22..95a60dc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -433,7 +433,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
AcidOutputFormat.Options bucketOptions =
AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
if (bucketOptions.getBucketId() != bucketId) {
- continue;
+ continue;//todo: HIVE-16952
}
if (haveSeenCurrentFile) {
//if here we already saw current file and now found another file for the same bucket
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 1e19a91..315cc1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -62,21 +62,24 @@ public class OrcRecordUpdater implements RecordUpdater {
private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class);
- public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index";
- public static final String ACID_FORMAT = "_orc_acid_version";
- public static final int ORC_ACID_VERSION = 0;
+ static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index";
+ private static final String ACID_FORMAT = "_orc_acid_version";
+ private static final int ORC_ACID_VERSION = 0;
final static int INSERT_OPERATION = 0;
final static int UPDATE_OPERATION = 1;
final static int DELETE_OPERATION = 2;
-
+ //column indexes of corresponding data in storage layer
final static int OPERATION = 0;
final static int ORIGINAL_TRANSACTION = 1;
final static int BUCKET = 2;
final static int ROW_ID = 3;
final static int CURRENT_TRANSACTION = 4;
final static int ROW = 5;
+ /**
+ * total number of fields (above)
+ */
final static int FIELDS = 6;
final static int DELTA_BUFFER_SIZE = 16 * 1024;
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 260a5ac..58638b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -29,7 +29,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.ColumnarSplit;
@@ -238,6 +237,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
final AcidUtils.AcidOperationalProperties acidOperationalProperties
= AcidUtils.getAcidOperationalProperties(conf);
final boolean isSplitUpdate = acidOperationalProperties.isSplitUpdate();
+ assert isSplitUpdate : "should be true in Hive 3.0";
if (isOriginal) {
if (!isAcidRead && !hasDelta) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 1e16f09..bcde4fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
import java.util.Map.Entry;
import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -37,9 +38,12 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -51,10 +55,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
- * A fast vectorized batch reader class for ACID when split-update behavior is enabled.
- * When split-update is turned on, row-by-row stitching could be avoided to create the final
- * version of a row. Essentially, there are only insert and delete events. Insert events can be
- * directly read from the base files/insert_only deltas in vectorized row batches. The deleted
+ * A fast vectorized batch reader class for ACID. Insert events are read directly
+ * from the base files/insert_only deltas in vectorized row batches. The deleted
* rows can then be easily indicated via the 'selected' field of the vectorized row batch.
* Refer HIVE-14233 for more details.
*/
@@ -63,26 +65,51 @@ public class VectorizedOrcAcidRowBatchReader
private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class);
- public org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader;
- protected VectorizedRowBatchCtx rbCtx;
- protected VectorizedRowBatch vectorizedRowBatchBase;
+ private org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader;
+ private final VectorizedRowBatchCtx rbCtx;
+ private VectorizedRowBatch vectorizedRowBatchBase;
private long offset;
private long length;
protected float progress = 0.0f;
protected Object[] partitionValues;
- protected boolean addPartitionCols = true;
- private ValidTxnList validTxnList;
- protected DeleteEventRegistry deleteEventRegistry;
- protected StructColumnVector recordIdColumnVector;
- private org.apache.orc.Reader.Options readerOptions;
+ private boolean addPartitionCols = true;
+ private final ValidTxnList validTxnList;
+ private final DeleteEventRegistry deleteEventRegistry;
+ /**
+ * {@link RecordIdentifier}/{@link VirtualColumn#ROWID} information
+ */
+ private final StructColumnVector recordIdColumnVector;
+ private final Reader.Options readerOptions;
+ private final boolean isOriginal;
+ /**
+ * something further in the data pipeline wants {@link VirtualColumn#ROWID}
+ */
+ private final boolean rowIdProjected;
+ /**
+ * partition/table root
+ */
+ private final Path rootPath;
+ /**
+ * for reading "original" files
+ */
+ private final OffsetAndBucketProperty syntheticProps;
+ /**
+ * To have access to {@link RecordReader#getRowNumber()} in the underlying file
+ */
+ private RecordReader innerReader;
- public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf,
- Reporter reporter) throws IOException {
- this.init(inputSplit, conf, reporter, Utilities.getVectorizedRowBatchCtx(conf));
+ VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
+ Reporter reporter) throws IOException {
+ this(inputSplit, conf,reporter, null);
+ }
+ @VisibleForTesting
+ VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf,
+ Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException {
+ this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx);
final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit);
// Careful with the range here now, we do not want to read the whole base file like deltas.
- final RecordReader innerReader = reader.rowsOptions(readerOptions.range(offset, length));
+ innerReader = reader.rowsOptions(readerOptions.range(offset, length));
baseReader = new org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch>() {
@Override
@@ -117,16 +144,19 @@ public class VectorizedOrcAcidRowBatchReader
};
this.vectorizedRowBatchBase = ((RecordReaderImpl) innerReader).createRowBatch();
}
-
- public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter,
+ /**
+ * LLAP IO c'tor
+ */
+ public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter,
org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader,
VectorizedRowBatchCtx rbCtx) throws IOException {
- this.init(inputSplit, conf, reporter, rbCtx);
+ this(conf, inputSplit, reporter, rbCtx);
this.baseReader = baseReader;
+ this.innerReader = null;
this.vectorizedRowBatchBase = baseReader.createValue();
}
- private void init(InputSplit inputSplit, JobConf conf, Reporter reporter,
+ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter,
VectorizedRowBatchCtx rowBatchCtx) throws IOException {
this.rbCtx = rowBatchCtx;
final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
@@ -143,8 +173,7 @@ public class VectorizedOrcAcidRowBatchReader
final OrcSplit orcSplit = (OrcSplit) inputSplit;
reporter.setStatus(orcSplit.toString());
- readerOptions = OrcInputFormat.createOptionsForReader(conf);
- readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions);
+ readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf));
this.offset = orcSplit.getStart();
this.length = orcSplit.getLength();
@@ -167,55 +196,161 @@ public class VectorizedOrcAcidRowBatchReader
deleteEventReaderOptions.range(0, Long.MAX_VALUE);
// Disable SARGs for deleteEventReaders, as SARGs have no meaning.
deleteEventReaderOptions.searchArgument(null, null);
+ DeleteEventRegistry der;
try {
// See if we can load all the delete events from all the delete deltas in memory...
- this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+ der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
} catch (DeleteEventsOverflowMemoryException e) {
// If not, then create a set of hanging readers that do sort-merge to find the next smallest
// delete event on-demand. Caps the memory consumption to (some_const * no. of readers).
- this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
+ der = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions);
}
-
- recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null);
+ this.deleteEventRegistry = der;
+ isOriginal = orcSplit.isOriginal();
+ if(isOriginal) {
+ recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+ new LongColumnVector(), new LongColumnVector(), new LongColumnVector());
+ }
+ else {
+ //will swap in the Vectors from underlying row batch
+ recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null);
+ }
+ rowIdProjected = areRowIdsProjected(rbCtx);
+ rootPath = orcSplit.getRootDir();
+ syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList);
}
/**
- * Returns whether it is possible to create a valid instance of this class for a given split.
- * @param conf is the job configuration
- * @param inputSplit
- * @return true if it is possible, else false.
+ * Used for generating synthetic ROW__IDs for reading "original" files
+ */
+ private static final class OffsetAndBucketProperty {
+ private final long rowIdOffset;
+ private final int bucketProperty;
+ private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) {
+ this.rowIdOffset = rowIdOffset;
+ this.bucketProperty = bucketProperty;
+ }
+ }
+ /**
+ * See {@link #next(NullWritable, VectorizedRowBatch)} fist and
+ * {@link OrcRawRecordMerger.OriginalReaderPair}.
+ * When reading a split of an "original" file and we need to decorate data with ROW__ID.
+ * This requires treating multiple files that are part of the same bucket (tranche for unbucketed
+ * tables) as a single logical file to number rowids consistently.
+ *
+ * todo: This logic is executed per split of every "original" file. The computed result is the
+ * same for every split form the same file so this could be optimized by moving it to
+ * before/during splt computation and passing the info in the split. (HIVE-17917)
+ */
+ private OffsetAndBucketProperty computeOffsetAndBucket(
+ OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
+ if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+ return new OffsetAndBucketProperty(0,0);
+ }
+ long rowIdOffset = 0;
+ int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
+ int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId));
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf,
+ validTxnList, false, true);
+ for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+ AcidOutputFormat.Options bucketOptions =
+ AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucketId() != bucketId) {
+ continue;//HIVE-16952
+ }
+ if (f.getFileStatus().getPath().equals(split.getPath())) {
+ //'f' is the file whence this split is
+ break;
+ }
+ Reader reader = OrcFile.createReader(f.getFileStatus().getPath(),
+ OrcFile.readerOptions(conf));
+ rowIdOffset += reader.getNumberOfRows();
+ }
+ return new OffsetAndBucketProperty(rowIdOffset, bucketProperty);
+ }
+ /**
+ * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
+ * In some cases this cannot be used from LLAP IO elevator because
+ * {@link RecordReader#getRowNumber()} is not (currently) available there but is required to
+ * generate ROW__IDs for "original" files
+ * @param hasDeletes - if there are any deletes that apply to this split
+ * todo: HIVE-17944
*/
- public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) {
- if (!(inputSplit instanceof OrcSplit)) {
- return false; // must be an instance of OrcSplit.
- }
- // First check if we are reading any original files in the split.
- // To simplify the vectorization logic, the vectorized acid row batch reader does not handle
- // original files for now as they have a different schema than a regular ACID file.
- final OrcSplit split = (OrcSplit) inputSplit;
- if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) {
- // When split-update is turned on for ACID, a more optimized vectorized batch reader
- // can be created. But still only possible when we are *NOT* reading any originals.
+ static boolean canUseLlapForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) {
+ if(!split.isOriginal()) {
return true;
}
- return false; // no split-update or possibly reading originals!
+ VectorizedRowBatchCtx rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+ if(rbCtx == null) {
+ throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath());
+ }
+ return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx));
}
- private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
+ /**
+ * Does this reader need to decorate rows with ROW__IDs (for "original" reads).
+ * Even if ROW__ID is not projected you still need to decorate the rows with them to see if
+ * any of the delete events apply.
+ */
+ private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) {
+ return split.isOriginal() && (hasDeletes || rowIdProjected);
+ }
+ private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) {
+ if(rbCtx.getVirtualColumnCount() == 0) {
+ return false;
+ }
+ for(VirtualColumn vc : rbCtx.getNeededVirtualColumns()) {
+ if(vc == VirtualColumn.ROWID) {
+ //The query needs ROW__ID: maybe explicitly asked, maybe it's part of
+ // Update/Delete statement.
+ //Either way, we need to decorate "original" rows with row__id
+ return true;
+ }
+ }
+ return false;
+ }
+ static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException {
Path path = orcSplit.getPath();
Path root;
if (orcSplit.hasBase()) {
if (orcSplit.isOriginal()) {
- root = path.getParent();
+ root = orcSplit.getRootDir();
} else {
root = path.getParent().getParent();
+ assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() +
+ " path.p.p=" + root;
}
} else {
- root = path;
+ throw new IllegalStateException("Split w/o base w/Acid 2.0??: " + path);
}
return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas());
}
+ /**
+ * There are 2 types of schema from the {@link #baseReader} that this handles. In the case
+ * the data was written to a transactional table from the start, every row is decorated with
+ * transaction related info and looks like <op, otid, writerId, rowid, ctid, <f1, ... fn>>.
+ *
+ * The other case is when data was written to non-transactional table and thus only has the user
+ * data: <f1, ... fn>. Then this table was then converted to a transactional table but the data
+ * files are not changed until major compaction. These are the "original" files.
+ *
+ * In this case we may need to decorate the outgoing data with transactional column values at
+ * read time. (It's done somewhat out of band via VectorizedRowBatchCtx - ask Teddy Choi).
+ * The "otid, writerId, rowid" columns represent {@link RecordIdentifier}. They are assigned
+ * each time the table is read in a way that needs to project {@link VirtualColumn#ROWID}.
+ * Major compaction will attach these values to each row permanently.
+ * It's critical that these generated column values are assigned exactly the same way by each
+ * read of the same row and by the Compactor.
+ * See {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorMR} and
+ * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} for the Compactor read path.
+ * (Longer term should make compactor use this class)
+ *
+ * This only decorates original rows with metadata if something above is requesting these values
+ * or if there are Delete events to apply.
+ *
+ * @return false where there is no more data, i.e. {@code value} is empty
+ */
@Override
public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
try {
@@ -257,12 +392,60 @@ public class VectorizedOrcAcidRowBatchReader
// When selectedInUse is set to false, everything in the batch is selected.
selectedBitSet.set(0, vectorizedRowBatchBase.size, true);
}
-
- // Case 1- find rows which belong to transactions that are not valid.
- findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+ ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols;
+ if(isOriginal) {
+ /*
+ * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order
+ * to see if any deletes apply
+ */
+ if(rowIdProjected || !deleteEventRegistry.isEmpty()) {
+ if(innerReader == null) {
+ throw new IllegalStateException(getClass().getName() + " requires " +
+ org.apache.orc.RecordReader.class +
+ " to handle original files that require ROW__IDs: " + rootPath);
+ }
+ /**
+ * {@link RecordIdentifier#getTransactionId()}
+ */
+ recordIdColumnVector.fields[0].noNulls = true;
+ recordIdColumnVector.fields[0].isRepeating = true;
+ //all "original" is considered written by txnid:0 which committed
+ ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0;
+ /**
+ * This is {@link RecordIdentifier#getBucketProperty()}
+ * Also see {@link BucketCodec}
+ */
+ recordIdColumnVector.fields[1].noNulls = true;
+ recordIdColumnVector.fields[1].isRepeating = true;
+ ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty;
+ /**
+ * {@link RecordIdentifier#getRowId()}
+ */
+ recordIdColumnVector.fields[2].noNulls = true;
+ recordIdColumnVector.fields[2].isRepeating = false;
+ long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector;
+ for(int i = 0; i < vectorizedRowBatchBase.size; i++) {
+ //baseReader.getRowNumber() seems to point at the start of the batch todo: validate
+ rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i;
+ }
+ //Now populate a structure to use to apply delete events
+ innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS];
+ innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
+ innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
+ innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
+ }
+ }
+ else {
+ // Case 1- find rows which belong to transactions that are not valid.
+ findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
+ /**
+ * All "original" data belongs to txnid:0 and is always valid/committed for every reader
+ * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal
+ */
+ }
// Case 2- find rows which have been deleted.
- this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase.cols,
+ this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector,
vectorizedRowBatchBase.size, selectedBitSet);
if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) {
@@ -283,30 +466,39 @@ public class VectorizedOrcAcidRowBatchReader
}
}
- // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
- // NOTE: We only link up the user columns and not the ACID metadata columns because this
- // vectorized code path is not being used in cases of update/delete, when the metadata columns
- // would be expected to be passed up the operator pipeline. This is because
- // currently the update/delete specifically disable vectorized code paths.
- // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
- StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
- // Transfer columnVector objects from base batch to outgoing batch.
- System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
- if (rbCtx != null) {
- recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION];
- recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
- recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
+ if(isOriginal) {
+ /*Just copy the payload. {@link recordIdColumnVector} has already been populated*/
+ System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0,
+ value.getDataColumnCount());
+ }
+ else {
+ // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
+ // NOTE: We only link up the user columns and not the ACID metadata columns because this
+ // vectorized code path is not being used in cases of update/delete, when the metadata columns
+ // would be expected to be passed up the operator pipeline. This is because
+ // currently the update/delete specifically disable vectorized code paths.
+ // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
+ StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
+ // Transfer columnVector objects from base batch to outgoing batch.
+ System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
+ if(rowIdProjected) {
+ recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION];
+ recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
+ recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
+ }
+ }
+ if(rowIdProjected) {
rbCtx.setRecordIdColumnVector(recordIdColumnVector);
}
progress = baseReader.getProgress();
return true;
}
- protected void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
+ private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) {
findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet);
}
- protected void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) {
+ private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) {
if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
// When we have repeating values, we can unset the whole bitset at once
// if the repeating value is not a valid transaction.
@@ -387,6 +579,11 @@ public class VectorizedOrcAcidRowBatchReader
* @throws IOException
*/
public void close() throws IOException;
+
+ /**
+ * @return {@code true} if no delete events were found
+ */
+ boolean isEmpty();
}
/**
@@ -400,10 +597,10 @@ public class VectorizedOrcAcidRowBatchReader
private OrcRawRecordMerger deleteRecords;
private OrcRawRecordMerger.ReaderKey deleteRecordKey;
private OrcStruct deleteRecordValue;
- private boolean isDeleteRecordAvailable = true;
+ private Boolean isDeleteRecordAvailable = null;
private ValidTxnList validTxnList;
- public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
+ SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
throws IOException {
final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
if (deleteDeltas.length > 0) {
@@ -428,6 +625,13 @@ public class VectorizedOrcAcidRowBatchReader
}
@Override
+ public boolean isEmpty() {
+ if(isDeleteRecordAvailable == null) {
+ throw new IllegalStateException("Not yet initialized");
+ }
+ return !isDeleteRecordAvailable;
+ }
+ @Override
public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet)
throws IOException {
if (!isDeleteRecordAvailable) {
@@ -546,7 +750,7 @@ public class VectorizedOrcAcidRowBatchReader
*/
private int bucketProperty;
private long rowId;
- public DeleteRecordKey() {
+ DeleteRecordKey() {
this.originalTransactionId = -1;
this.rowId = -1;
}
@@ -596,7 +800,7 @@ public class VectorizedOrcAcidRowBatchReader
private boolean isBucketPropertyRepeating;
private final boolean isBucketedTable;
- public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
+ DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
ValidTxnList validTxnList, boolean isBucketedTable) throws IOException {
this.recordReader = deleteDeltaReader.rowsOptions(readerOptions);
this.bucketForSplit = bucket;
@@ -741,8 +945,9 @@ public class VectorizedOrcAcidRowBatchReader
private long rowIds[];
private CompressedOtid compressedOtids[];
private ValidTxnList validTxnList;
+ private Boolean isEmpty = null;
- public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
+ ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
@@ -804,6 +1009,7 @@ public class VectorizedOrcAcidRowBatchReader
readAllDeleteEventsFromDeleteDeltas();
}
}
+ isEmpty = compressedOtids == null || rowIds == null;
} catch(IOException|DeleteEventsOverflowMemoryException e) {
close(); // close any open readers, if there was some exception during initialization.
throw e; // rethrow the exception so that the caller can handle.
@@ -910,7 +1116,13 @@ public class VectorizedOrcAcidRowBatchReader
}
return false;
}
-
+ @Override
+ public boolean isEmpty() {
+ if(isEmpty == null) {
+ throw new IllegalStateException("Not yet initialized");
+ }
+ return isEmpty;
+ }
@Override
public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
deleted file mode 100644
index 885ef83..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-
-/**
- * Implement a RecordReader that stitches together base and delta files to
- * support tables and partitions stored in the ACID format. It works by using
- * the non-vectorized ACID reader and moving the data into a vectorized row
- * batch.
- */
-public class VectorizedOrcAcidRowReader
- implements org.apache.hadoop.mapred.RecordReader<NullWritable,
- VectorizedRowBatch> {
- private final AcidInputFormat.RowReader<OrcStruct> innerReader;
- private final RecordIdentifier key;
- private final OrcStruct value;
- private VectorizedRowBatchCtx rbCtx;
- private Object[] partitionValues;
- private final ObjectInspector objectInspector;
- private final DataOutputBuffer buffer = new DataOutputBuffer();
- private final StructColumnVector recordIdColumnVector;
- private final LongColumnVector transactionColumnVector;
- private final LongColumnVector bucketColumnVector;
- private final LongColumnVector rowIdColumnVector;
-
- public VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
- Configuration conf, VectorizedRowBatchCtx vectorizedRowBatchCtx, FileSplit split)
- throws IOException {
- this.innerReader = inner;
- this.key = inner.createKey();
- rbCtx = vectorizedRowBatchCtx;
- int partitionColumnCount = rbCtx.getPartitionColumnCount();
- if (partitionColumnCount > 0) {
- partitionValues = new Object[partitionColumnCount];
- rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
- }
- this.value = inner.createValue();
- this.objectInspector = inner.getObjectInspector();
- this.transactionColumnVector = new LongColumnVector();
- this.bucketColumnVector = new LongColumnVector();
- this.rowIdColumnVector = new LongColumnVector();
- this.recordIdColumnVector =
- new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- transactionColumnVector, bucketColumnVector, rowIdColumnVector);
- }
-
- @Override
- public boolean next(NullWritable nullWritable,
- VectorizedRowBatch vectorizedRowBatch
- ) throws IOException {
- vectorizedRowBatch.reset();
- buffer.reset();
- if (!innerReader.next(key, value)) {
- return false;
- }
- if (partitionValues != null) {
- rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues);
- }
- try {
- VectorizedBatchUtil.acidAddRowToBatch(value,
- (StructObjectInspector) objectInspector,
- vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer);
- addRecordId(vectorizedRowBatch.size, key);
- vectorizedRowBatch.size++;
- while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
- innerReader.next(key, value)) {
- VectorizedBatchUtil.acidAddRowToBatch(value,
- (StructObjectInspector) objectInspector,
- vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer);
- addRecordId(vectorizedRowBatch.size, key);
- vectorizedRowBatch.size++;
- }
- rbCtx.setRecordIdColumnVector(recordIdColumnVector);
- } catch (Exception e) {
- throw new IOException("error iterating", e);
- }
- return true;
- }
-
- private void addRecordId(int index, RecordIdentifier key) {
- transactionColumnVector.vector[index] = key.getTransactionId();
- bucketColumnVector.vector[index] = key.getBucketProperty();
- rowIdColumnVector.vector[index] = key.getRowId();
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public VectorizedRowBatch createValue() {
- return rbCtx.createVectorizedRowBatch();
- }
-
- @Override
- public long getPos() throws IOException {
- return innerReader.getPos();
- }
-
- @Override
- public void close() throws IOException {
- innerReader.close();
- }
-
- @Override
- public float getProgress() throws IOException {
- return innerReader.getProgress();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 662462c..2c76f79 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -56,9 +56,9 @@ import java.util.concurrent.TimeUnit;
* test AC=true, and AC=false with commit/rollback/exception and test resulting data.
*
* Can also test, calling commit in AC=true mode, etc, toggling AC...
- *
- * Tests here are for multi-statement transactions (WIP) and those that don't need to
- * run with Acid 2.0 (see subclasses of TestTxnCommands2)
+ *
+ * Tests here are for multi-statement transactions (WIP) and others
+ * Mostly uses bucketed tables
*/
public class TestTxnCommands extends TxnCommandsBaseForTests {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index c827dc4..f0d9ff2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.hive.ql;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -522,5 +526,119 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
// Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode());
Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support"));
}
+ /**
+ * Tests to check that we are able to use vectorized acid reader,
+ * VectorizedOrcAcidRowBatchReader, when reading "original" files,
+ * i.e. those that were written before the table was converted to acid.
+ * See also acid_vectorization_original*.q
+ */
+ @Test
+ public void testNonAcidToAcidVectorzied() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+ //this enables vectorization of ROW__ID
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("create table T(a int, b int) stored as orc");
+ int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
+ runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values));
+ //, 'transactional_properties'='default'
+ runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')");
+ //Execution mode: vectorized
+ //this uses VectorizedOrcAcidRowBatchReader
+ String query = "select a from T where b > 6 order by a";
+ List<String> rs = runStatementOnDriver(query);
+ String[][] expected = {
+ {"6", ""},
+ {"9", ""},
+ };
+ checkExpected(rs, expected, "After conversion");
+ Assert.assertEquals(Integer.toString(6), rs.get(0));
+ Assert.assertEquals(Integer.toString(9), rs.get(1));
+ assertVectorized(true, query);
+
+ //why isn't PPD working.... - it is working but storage layer doesn't do row level filtering; only row group level
+ //this uses VectorizedOrcAcidRowBatchReader
+ query = "select ROW__ID, a from T where b > 6 order by a";
+ rs = runStatementOnDriver(query);
+ String[][] expected1 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"}
+ };
+ checkExpected(rs, expected1, "After conversion with VC1");
+ assertVectorized(true, query);
+
+ //this uses VectorizedOrcAcidRowBatchReader
+ query = "select ROW__ID, a from T where b > 0 order by a";
+ rs = runStatementOnDriver(query);
+ String[][] expected2 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}", "1"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}", "2"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}", "5"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"}
+ };
+ checkExpected(rs, expected2, "After conversion with VC2");
+ assertVectorized(true, query);
+
+ //doesn't vectorize (uses neither of the Vectorzied Acid readers)
+ query = "select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a";
+ rs = runStatementOnDriver(query);
+ Assert.assertEquals("", 2, rs.size());
+ String[][] expected3 = {
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"}
+ };
+ checkExpected(rs, expected3, "After non-vectorized read");
+ Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+ //vectorized because there is INPUT__FILE__NAME
+ assertVectorized(false, query);
+
+ runStatementOnDriver("update T set b = 17 where a = 1");
+ //this should use VectorizedOrcAcidRowReader
+ query = "select ROW__ID, b from T where b > 0 order by a";
+ rs = runStatementOnDriver(query);
+ String[][] expected4 = {
+ {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}","17"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}","4"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}","6"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}","8"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}","10"}
+ };
+ checkExpected(rs, expected4, "After conversion with VC4");
+ assertVectorized(true, query);
+
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
+ Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+ Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+
+ //this should not vectorize at all
+ query = "select ROW__ID, a, b, INPUT__FILE__NAME from T where b > 0 order by a, b";
+ rs = runStatementOnDriver(query);
+ String[][] expected5 = {//the row__ids are the same after compaction
+ {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "warehouse/t/base_0000025/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4", "warehouse/t/base_0000025/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_0000025/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8", "warehouse/t/base_0000025/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_0000025/bucket_00000"}
+ };
+ checkExpected(rs, expected5, "After major compaction");
+ //vectorized because there is INPUT__FILE__NAME
+ assertVectorized(false, query);
+ }
+ private void assertVectorized(boolean vectorized, String query) throws Exception {
+ List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
+ for(String line : rs) {
+ if(line != null && line.contains("Execution mode: vectorized")) {
+ Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
+ return;
+ }
+ }
+ Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 43e0a4a..b2ac687 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
@@ -100,10 +101,11 @@ public class TestVectorizedOrcAcidRowBatchReader {
conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
- fs = FileSystem.getLocal(conf);
Path workDir = new Path(System.getProperty("test.tmp.dir",
"target" + File.separator + "test" + File.separator + "tmp"));
root = new Path(workDir, "TestVectorizedOrcAcidRowBatch.testDump");
+ fs = root.getFileSystem(conf);
+ root = fs.makeQualified(root);
fs.delete(root, true);
ObjectInspector inspector;
synchronized (TestOrcFile.class) {
@@ -189,7 +191,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
assertEquals(1, splitStrategies.size());
List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
assertEquals(1, splits.size());
- assertEquals("file://" + root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
+ assertEquals(root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000",
splits.get(0).getPath().toUri().toString());
assertFalse(splits.get(0).isOriginal());
return splits;
@@ -216,7 +218,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
// are being handled properly.
conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5
- VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL);
+ VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, new VectorizedRowBatchCtx());
if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) {
assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry);
}
@@ -242,20 +244,4 @@ public class TestVectorizedOrcAcidRowBatchReader {
}
}
}
-
- @Test
- public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
- OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
-
- conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
- AcidUtils.AcidOperationalProperties.getDefault().toInt());
- Mockito.when(mockSplit.isOriginal()).thenReturn(true);
- // Test false when trying to create a vectorized ACID row batch reader when reading originals.
- assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
-
- // A positive test case.
- Mockito.when(mockSplit.isOriginal()).thenReturn(false);
- assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/queries/clientpositive/acid_vectorization_original.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original.q b/ql/src/test/queries/clientpositive/acid_vectorization_original.q
new file mode 100644
index 0000000..ddf138d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_vectorization_original.q
@@ -0,0 +1,138 @@
+set hive.mapred.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+-- enables vectorizaiton of VirtualColumn.ROWID
+set hive.vectorized.row.identifier.enabled=true;
+-- enable ppd
+set hive.optimize.index.filter=true;
+
+set hive.explain.user=false;
+
+-- needed for TestCliDriver but not TestMiniTezCliDriver
+-- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+-- attempts to get compaction to run - see below
+-- set yarn.scheduler.maximum-allocation-mb=2024;
+-- set hive.tez.container.size=500;
+-- set mapreduce.map.memory.mb=500;
+-- set mapreduce.reduce.memory.mb=500;
+-- set mapred.job.map.memory.mb=500;
+-- set mapred.job.reduce.memory.mb=500;
+
+
+
+CREATE TEMPORARY FUNCTION runWorker AS 'org.apache.hadoop.hive.ql.udf.UDFRunWorker';
+create table mydual(a int);
+insert into mydual values(1);
+
+CREATE TABLE over10k(t tinyint,
+ si smallint,
+ i int,
+ b bigint,
+ f float,
+ d double,
+ bo boolean,
+ s string,
+ ts timestamp,
+ `dec` decimal(4,2),
+ bin binary)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
+STORED AS TEXTFILE;
+
+--oddly this has 9999 rows not > 10K
+LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE over10k;
+
+CREATE TABLE over10k_orc_bucketed(t tinyint,
+ si smallint,
+ i int,
+ b bigint,
+ f float,
+ d double,
+ bo boolean,
+ s string,
+ ts timestamp,
+ `dec` decimal(4,2),
+ bin binary) CLUSTERED BY(si) INTO 4 BUCKETS STORED AS ORC;
+
+-- this produces about 250 distinct values across all 4 equivalence classes
+select distinct si, si%4 from over10k order by si;
+
+-- explain insert into over10k_orc_bucketed select * from over10k cluster by si;
+-- w/o "cluster by" all data is written to 000000_0
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+-- create copy_N files
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+-- this output of this is masked in .out - it is visible in .orig
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+
+--this actually shows the data files in the .out on Tez but not LLAP
+select distinct 7 as seven, INPUT__FILE__NAME from over10k_orc_bucketed;
+
+-- convert table to acid
+alter table over10k_orc_bucketed set TBLPROPERTIES ('transactional'='true');
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- Execution mode: vectorized, llap
+-- LLAP IO: may be used (ACID table)
+explain select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i;
+select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- Execution mode: vectorized, llap
+-- LLAP IO: may be used (ACID table)
+explain select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+-- HIVE-17943
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- Execution mode: vectorized, llap
+-- LLAP IO: may be used (ACID table)
+explain update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+
+-- this should produce the same result (data) as previous time this exact query ran
+-- ROW__ID will be different (same bucketProperty)
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- The idea below was to do check sum queries to ensure that ROW__IDs are unique
+-- to run Compaction and to check that ROW__IDs are the same before and after compaction (for rows
+-- w/o applicable delete events)
+-- Everything below is commented out because it doesn't work
+-- group by ROW__ID produces wrong results
+-- compactor doesn't run - see error below...
+
+-- this doesn't vectorize but use llap which is perhaps the problem if it's using cache
+-- use explain VECTORIZATION DETAIL to see
+-- notVectorizedReason: Key expression for GROUPBY operator: Vectorizing complex type STRUCT not supported
+explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+-- on LLAP this produces "NULL, 6"; on tez it produces nothing: HIVE-17921
+select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+-- this produces nothing (as it should)
+select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null;
+
+-- schedule compactor
+-- alter table over10k_orc_bucketed compact 'major' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="500","compactor.hive.tez.container.size"="500");;
+
+
+-- run compactor - this currently fails with
+-- Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512
+-- HIVE-17922
+-- select runWorker() from mydual;
+
+-- show compactions;
+
+-- this should produce the same result (data+ ROW__ID) as previous time this exact query ran
+-- select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+-- select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q b/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q
new file mode 100644
index 0000000..4d93662
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q
@@ -0,0 +1,125 @@
+set hive.mapred.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+-- enables vectorizaiton of VirtualColumn.ROWID
+set hive.vectorized.row.identifier.enabled=true;
+-- enable ppd
+set hive.optimize.index.filter=true;
+
+set hive.explain.user=false;
+
+-- needed for TestCliDriver but not TestMiniTezCliDriver
+-- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
+-- attempts to get compaction to run - see below
+-- set yarn.scheduler.maximum-allocation-mb=2024;
+-- set hive.tez.container.size=500;
+-- set mapreduce.map.memory.mb=500;
+-- set mapreduce.reduce.memory.mb=500;
+-- set mapred.job.map.memory.mb=500;
+-- set mapred.job.reduce.memory.mb=500;
+
+
+
+CREATE TEMPORARY FUNCTION runWorker AS 'org.apache.hadoop.hive.ql.udf.UDFRunWorker';
+create table mydual(a int);
+insert into mydual values(1);
+
+CREATE TABLE over10k(t tinyint,
+ si smallint,
+ i int,
+ b bigint,
+ f float,
+ d double,
+ bo boolean,
+ s string,
+ ts timestamp,
+ `dec` decimal(4,2),
+ bin binary)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
+STORED AS TEXTFILE;
+
+--oddly this has 9999 rows not > 10K
+LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE over10k;
+
+CREATE TABLE over10k_orc_bucketed(t tinyint,
+ si smallint,
+ i int,
+ b bigint,
+ f float,
+ d double,
+ bo boolean,
+ s string,
+ ts timestamp,
+ `dec` decimal(4,2),
+ bin binary) CLUSTERED BY(si) INTO 4 BUCKETS STORED AS ORC;
+
+-- this produces about 250 distinct values across all 4 equivalence classes
+select distinct si, si%4 from over10k order by si;
+
+-- explain insert into over10k_orc_bucketed select * from over10k cluster by si;
+-- w/o "cluster by" all data is written to 000000_0
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+-- create copy_N files
+insert into over10k_orc_bucketed select * from over10k cluster by si;
+
+-- this output of this is masked in .out - it is visible in .orig
+dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed;
+
+--this actually shows the data files in the .out on Tez but not LLAP
+select distinct 7 as seven, INPUT__FILE__NAME from over10k_orc_bucketed;
+
+-- convert table to acid
+alter table over10k_orc_bucketed set TBLPROPERTIES ('transactional'='true');
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- Execution mode: vectorized (both Map and Reducer)
+explain select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i;
+select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- Execution mode: vectorized
+explain select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this should vectorize (and push predicate to storage: filterExpr in TableScan )
+-- same as above
+explain update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100;
+
+-- this should produce the same result (data) as previous time this exact query ran
+-- ROW__ID will be different (same bucketProperty)
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- The idea below was to do check sum queries to ensure that ROW__IDs are unique
+-- to run Compaction and to check that ROW__IDs are the same before and after compaction (for rows
+-- w/o applicable delete events)
+
+-- this doesn't vectorize
+-- use explain VECTORIZATION DETAIL to see
+-- notVectorizedReason: Key expression for GROUPBY operator: Vectorizing complex type STRUCT not supported
+explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;
+
+-- schedule compactor
+alter table over10k_orc_bucketed compact 'major' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="500","compactor.hive.tez.container.size"="500");;
+
+
+-- run compactor - this currently fails with
+-- Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512
+-- select runWorker() from mydual;
+
+-- show compactions;
+
+-- this should produce the same (data + ROW__ID) as before compaction
+select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID;
+
+-- this test that there are no duplicate ROW__IDs so should produce no output
+select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;