You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/10/06 21:11:00 UTC
hive git commit: HIVE-20635: VectorizedOrcAcidRowBatchReader doesn't
filter delete events for original files (Saurabh Seth via Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master f122e258b -> 39ed52c48
HIVE-20635: VectorizedOrcAcidRowBatchReader doesn't filter delete events for original files (Saurabh Seth via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39ed52c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39ed52c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39ed52c4
Branch: refs/heads/master
Commit: 39ed52c48d6970e4ae83d423fe6cf5ced914a69c
Parents: f122e25
Author: Saurabh Seth <sa...@gmail.com>
Authored: Sat Oct 6 14:10:48 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Oct 6 14:10:48 2018 -0700
----------------------------------------------------------------------
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 83 ++++++++-
.../TestVectorizedOrcAcidRowBatchReader.java | 177 +++++++++++++++++++
2 files changed, 252 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/39ed52c4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 0cefeee..66280b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -228,6 +228,8 @@ public class VectorizedOrcAcidRowBatchReader
LOG.info("Read ValidWriteIdList: " + this.validWriteIdList.toString()
+ ":" + orcSplit);
+ this.syntheticProps = orcSplit.getSyntheticAcidProps();
+
// Clone readerOptions for deleteEvents.
Reader.Options deleteEventReaderOptions = readerOptions.clone();
// Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
@@ -257,7 +259,6 @@ public class VectorizedOrcAcidRowBatchReader
}
rowIdProjected = areRowIdsProjected(rbCtx);
rootPath = orcSplit.getRootDir();
- syntheticProps = orcSplit.getSyntheticAcidProps();
/**
* This could be optimized by moving dir type/write id based checks are
@@ -393,6 +394,13 @@ public class VectorizedOrcAcidRowBatchReader
LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false");
return new OrcRawRecordMerger.KeyInterval(null, null);
}
+
+ //todo: since we already have OrcSplit.orcTail, should somehow use it to
+ // get the acid.index, stats, etc rather than fetching the footer again
+ // though it seems that orcTail is mostly null....
+ Reader reader = OrcFile.createReader(orcSplit.getPath(),
+ OrcFile.readerOptions(conf));
+
if(orcSplit.isOriginal()) {
/**
* Among originals we may have files with _copy_N suffix. To properly
@@ -403,14 +411,11 @@ public class VectorizedOrcAcidRowBatchReader
* Kind of chicken-and-egg - deal with this later.
* See {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int,
* Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/
- LOG.debug("findMinMaxKeys(original split) - ignoring");
- return new OrcRawRecordMerger.KeyInterval(null, null);
+ LOG.debug("findMinMaxKeys(original split)");
+
+ return findOriginalMinMaxKeys(orcSplit, reader, deleteEventReaderOptions);
}
- //todo: since we already have OrcSplit.orcTail, should somehow use it to
- // get the acid.index, stats, etc rather than fetching the footer again
- // though it seems that orcTail is mostly null....
- Reader reader = OrcFile.createReader(orcSplit.getPath(),
- OrcFile.readerOptions(conf));
+
List<StripeInformation> stripes = reader.getStripes();
final long splitStart = orcSplit.getStart();
final long splitEnd = splitStart + orcSplit.getLength();
@@ -578,6 +583,68 @@ public class VectorizedOrcAcidRowBatchReader
return keyInterval;
}
+ private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, Reader reader,
+ Reader.Options deleteEventReaderOptions) {
+
+ // This method returns the minimum and maximum synthetic row ids that are present in this split
+ // because min and max keys are both inclusive when filtering out the delete delta records.
+
+ if (syntheticProps == null) {
+ // syntheticProps containing the synthetic rowid offset is computed if there are delete delta files.
+ // If there aren't any delete delta files, then we don't need this anyway.
+ return new OrcRawRecordMerger.KeyInterval(null, null);
+ }
+
+ long splitStart = orcSplit.getStart();
+ long splitEnd = orcSplit.getStart() + orcSplit.getLength();
+
+ long minRowId = syntheticProps.getRowIdOffset();
+ long maxRowId = syntheticProps.getRowIdOffset();
+
+ for(StripeInformation stripe: reader.getStripes()) {
+ if (splitStart > stripe.getOffset()) {
+ // This stripe starts before the current split starts. This stripe is not included in this split.
+ minRowId += stripe.getNumberOfRows();
+ }
+
+ if (splitEnd > stripe.getOffset()) {
+ // This stripe starts before the current split ends.
+ maxRowId += stripe.getNumberOfRows();
+ } else {
+ // The split ends before (or exactly where) this stripe starts.
+ // Remaining stripes are not included in this split.
+ break;
+ }
+ }
+
+ RecordIdentifier minKey = new RecordIdentifier(syntheticProps.getSyntheticWriteId(),
+ syntheticProps.getBucketProperty(), minRowId);
+
+ RecordIdentifier maxKey = new RecordIdentifier(syntheticProps.getSyntheticWriteId(),
+ syntheticProps.getBucketProperty(), maxRowId > 0? maxRowId - 1: 0);
+
+ OrcRawRecordMerger.KeyInterval keyIntervalTmp = new OrcRawRecordMerger.KeyInterval(minKey, maxKey);
+
+ if (minRowId >= maxRowId) {
+ /**
+ * The split lies entirely within a single stripe. In this case, the reader for this split will not read any data.
+ * See {@link org.apache.orc.impl.RecordReaderImpl#RecordReaderImpl
+ * We can return the min max key interval as is (it will not read any of the delete delta records into mem)
+ */
+
+ LOG.info("findOriginalMinMaxKeys(): This split starts and ends in the same stripe.");
+ }
+
+ LOG.info("findOriginalMinMaxKeys(): " + keyIntervalTmp);
+
+ // Using min/max ROW__ID from original will work for ppd to the delete deltas because the writeid is the same in
+ // the min and the max ROW__ID
+ setSARG(keyIntervalTmp, deleteEventReaderOptions, minKey.getBucketProperty(), maxKey.getBucketProperty(),
+ minKey.getRowId(), maxKey.getRowId());
+
+ return keyIntervalTmp;
+ }
+
/**
* See {@link #next(NullWritable, VectorizedRowBatch)} first and
* {@link OrcRawRecordMerger.OriginalReaderPair}.
http://git-wip-us.apache.org/repos/asf/hive/blob/39ed52c4/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 8f477f4..0a499b1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.File;
import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.Columniz
import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.io.LongWritable;
@@ -63,6 +66,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
private FileSystem fs;
private Path root;
private ObjectInspector inspector;
+ private ObjectInspector originalInspector;
public static class DummyRow {
LongWritable field;
@@ -88,6 +92,24 @@ public class TestVectorizedOrcAcidRowBatchReader {
}
+ /**
+ * Dummy row for original files.
+ */
+ public static class DummyOriginalRow {
+ LongWritable field;
+
+ DummyOriginalRow(long val) {
+ field = new LongWritable(val);
+ }
+
+ static String getColumnNamesProperty() {
+ return "field";
+ }
+ static String getColumnTypesProperty() {
+ return "bigint";
+ }
+ }
+
@Before
public void setup() throws Exception {
conf = new JobConf();
@@ -110,6 +132,9 @@ public class TestVectorizedOrcAcidRowBatchReader {
synchronized (TestOrcFile.class) {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+
+ originalInspector = ObjectInspectorFactory.getReflectionObjectInspector(DummyOriginalRow.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
}
@Test
@@ -370,6 +395,158 @@ public class TestVectorizedOrcAcidRowBatchReader {
}
}
+
+ @Test
+ public void testDeleteEventOriginalFilteringOn() throws Exception {
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true);
+ testDeleteEventOriginalFiltering();
+ }
+
+ @Test
+ public void testDeleteEventOriginalFilteringOff() throws Exception {
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false);
+ testDeleteEventOriginalFiltering();
+ }
+
+ public void testDeleteEventOriginalFiltering() throws Exception {
+ boolean filterOn =
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS);
+
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
+
+ // Create 3 original files with 3 rows each
+ Properties properties = new Properties();
+ properties.setProperty("columns", DummyOriginalRow.getColumnNamesProperty());
+ properties.setProperty("columns.types", DummyOriginalRow.getColumnTypesProperty());
+
+ OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(properties, conf);
+ writerOptions.inspector(originalInspector);
+
+ Path testFilePath = new Path(root, "000000_0");
+ Writer writer = OrcFile.createWriter(testFilePath, writerOptions);
+
+ writer.addRow(new DummyOriginalRow(0));
+ writer.addRow(new DummyOriginalRow(0));
+ writer.addRow(new DummyOriginalRow(0));
+ writer.close();
+
+ testFilePath = new Path(root, "000000_0_copy_1");
+
+ writer = OrcFile.createWriter(testFilePath, writerOptions);
+ writer.addRow(new DummyOriginalRow(0));
+ writer.addRow(new DummyOriginalRow(0));
+ writer.addRow(new DummyOriginalRow(0));
+ writer.close();
+
+ testFilePath = new Path(root, "000000_0_copy_2");
+
+ writer = OrcFile.createWriter(testFilePath, writerOptions);
+ writer.addRow(new DummyOriginalRow(0));
+ writer.addRow(new DummyOriginalRow(0));
+ writer.addRow(new DummyOriginalRow(0));
+ writer.close();
+
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+
+ int bucket = 0;
+
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .bucket(bucket)
+ .writingBase(false)
+ .minimumWriteId(1)
+ .maximumWriteId(1)
+ .inspector(inspector)
+ .reporter(Reporter.NULL)
+ .recordIdColumn(1)
+ .finalDestination(root);
+
+ int bucketProperty = BucketCodec.V1.encode(options);
+
+ RecordUpdater updater = new OrcRecordUpdater(root, options);
+
+ //delete 1 row from each of the original files
+ // Delete the last record in this split to test boundary conditions. It should not be present in the delete event
+ // registry for the next split
+ updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 2, 0, bucket));
+ // Delete the first record in this split to test boundary conditions. It should not be present in the delete event
+ // registry for the previous split
+ updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 3, 0, bucket));
+ updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 7, 0, bucket));
+ updater.close(false);
+
+ //HWM is not important - just make sure deltas created above are read as if committed
+ conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:2:" + Long.MAX_VALUE + "::");
+
+ // Set vector mode to true int the map work so that we recognize this as a vector mode execution during the split
+ // generation. Without this we will not compute the offset for the synthetic row ids.
+ MapWork mapWork = new MapWork();
+ mapWork.setVectorMode(true);
+ VectorizedRowBatchCtx vrbContext = new VectorizedRowBatchCtx();
+ mapWork.setVectorizedRowBatchCtx(vrbContext);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+ Utilities.setMapWork(conf, mapWork);
+
+ // now we have 3 delete events total, but for each split we should only
+ // load 1 into DeleteRegistry (if filtering is on)
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies();
+ assertEquals(1, splitStrategies.size());
+ List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+
+ assertEquals(3, splits.size());
+ assertEquals(root.toUri().toString() + File.separator + "000000_0",
+ splits.get(0).getPath().toUri().toString());
+ assertTrue(splits.get(0).isOriginal());
+
+ assertEquals(root.toUri().toString() + File.separator + "000000_0_copy_1",
+ splits.get(1).getPath().toUri().toString());
+ assertTrue(splits.get(1).isOriginal());
+
+ assertEquals(root.toUri().toString() + File.separator + "000000_0_copy_2",
+ splits.get(2).getPath().toUri().toString());
+ assertTrue(splits.get(2).isOriginal());
+
+ VectorizedOrcAcidRowBatchReader vectorizedReader =
+ new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, vrbContext);
+ ColumnizedDeleteEventRegistry deleteEventRegistry =
+ (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry();
+ assertEquals("number of delete events for stripe 1", filterOn ? 1 : 3, deleteEventRegistry.size());
+ OrcRawRecordMerger.KeyInterval keyInterval = vectorizedReader.getKeyInterval();
+ if(filterOn) {
+ assertEquals(new OrcRawRecordMerger.KeyInterval(
+ new RecordIdentifier(0, bucketProperty, 0),
+ new RecordIdentifier(0, bucketProperty, 2)),
+ keyInterval);
+ } else {
+ assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval);
+ }
+
+ vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(1), conf, Reporter.NULL, vrbContext);
+ deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry();
+ assertEquals("number of delete events for stripe 2", filterOn ? 1 : 3, deleteEventRegistry.size());
+ keyInterval = vectorizedReader.getKeyInterval();
+ if(filterOn) {
+ assertEquals(new OrcRawRecordMerger.KeyInterval(
+ new RecordIdentifier(0, bucketProperty, 3),
+ new RecordIdentifier(0, bucketProperty, 5)),
+ keyInterval);
+ } else {
+ assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval);
+ }
+
+ vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(2), conf, Reporter.NULL, vrbContext);
+ deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry();
+ assertEquals("number of delete events for stripe 3", filterOn ? 1 : 3, deleteEventRegistry.size());
+ keyInterval = vectorizedReader.getKeyInterval();
+ if(filterOn) {
+ assertEquals(new OrcRawRecordMerger.KeyInterval(
+ new RecordIdentifier(0, bucketProperty, 6),
+ new RecordIdentifier(0, bucketProperty, 8)), keyInterval);
+ } else {
+ assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval);
+ }
+ }
+
@Test
public void testVectorizedOrcAcidRowBatchReader() throws Exception {
conf.set("bucket_count", "1");