You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/10/06 21:11:00 UTC

hive git commit: HIVE-20635: VectorizedOrcAcidRowBatchReader doesn't filter delete events for original files (Saurabh Seth via Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master f122e258b -> 39ed52c48


HIVE-20635: VectorizedOrcAcidRowBatchReader doesn't filter delete events for original files (Saurabh Seth via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39ed52c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39ed52c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39ed52c4

Branch: refs/heads/master
Commit: 39ed52c48d6970e4ae83d423fe6cf5ced914a69c
Parents: f122e25
Author: Saurabh Seth <sa...@gmail.com>
Authored: Sat Oct 6 14:10:48 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Sat Oct 6 14:10:48 2018 -0700

----------------------------------------------------------------------
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |  83 ++++++++-
 .../TestVectorizedOrcAcidRowBatchReader.java    | 177 +++++++++++++++++++
 2 files changed, 252 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/39ed52c4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 0cefeee..66280b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -228,6 +228,8 @@ public class VectorizedOrcAcidRowBatchReader
     LOG.info("Read ValidWriteIdList: " + this.validWriteIdList.toString()
             + ":" + orcSplit);
 
+    this.syntheticProps = orcSplit.getSyntheticAcidProps();
+
     // Clone readerOptions for deleteEvents.
     Reader.Options deleteEventReaderOptions = readerOptions.clone();
     // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because
@@ -257,7 +259,6 @@ public class VectorizedOrcAcidRowBatchReader
     }
     rowIdProjected = areRowIdsProjected(rbCtx);
     rootPath = orcSplit.getRootDir();
-    syntheticProps = orcSplit.getSyntheticAcidProps();
 
     /**
      * This could be optimized by moving dir type/write id based checks are
@@ -393,6 +394,13 @@ public class VectorizedOrcAcidRowBatchReader
       LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false");
       return new OrcRawRecordMerger.KeyInterval(null, null);
     }
+
+    //todo: since we already have OrcSplit.orcTail, should somehow use it to
+    // get the acid.index, stats, etc rather than fetching the footer again
+    // though it seems that orcTail is mostly null....
+    Reader reader = OrcFile.createReader(orcSplit.getPath(),
+        OrcFile.readerOptions(conf));
+
     if(orcSplit.isOriginal()) {
       /**
        * Among originals we may have files with _copy_N suffix.  To properly
@@ -403,14 +411,11 @@ public class VectorizedOrcAcidRowBatchReader
        * Kind of chicken-and-egg - deal with this later.
        * See {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int,
        * Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/
-      LOG.debug("findMinMaxKeys(original split) - ignoring");
-      return new OrcRawRecordMerger.KeyInterval(null, null);
+      LOG.debug("findMinMaxKeys(original split)");
+
+      return findOriginalMinMaxKeys(orcSplit, reader, deleteEventReaderOptions);
     }
-    //todo: since we already have OrcSplit.orcTail, should somehow use it to
-    // get the acid.index, stats, etc rather than fetching the footer again
-    // though it seems that orcTail is mostly null....
-    Reader reader = OrcFile.createReader(orcSplit.getPath(),
-        OrcFile.readerOptions(conf));
+
     List<StripeInformation> stripes = reader.getStripes();
     final long splitStart = orcSplit.getStart();
     final long splitEnd = splitStart + orcSplit.getLength();
@@ -578,6 +583,68 @@ public class VectorizedOrcAcidRowBatchReader
     return keyInterval;
   }
 
+  private OrcRawRecordMerger.KeyInterval findOriginalMinMaxKeys(OrcSplit orcSplit, Reader reader,
+      Reader.Options deleteEventReaderOptions) {
+
+    // This method returns the minimum and maximum synthetic row ids that are present in this split
+    // because min and max keys are both inclusive when filtering out the delete delta records.
+
+    if (syntheticProps == null) {
+      // syntheticProps containing the synthetic rowid offset is computed if there are delete delta files.
+      // If there aren't any delete delta files, then we don't need this anyway.
+      return new OrcRawRecordMerger.KeyInterval(null, null);
+    }
+
+    long splitStart = orcSplit.getStart();
+    long splitEnd = orcSplit.getStart() + orcSplit.getLength();
+
+    long minRowId = syntheticProps.getRowIdOffset();
+    long maxRowId = syntheticProps.getRowIdOffset();
+
+    for(StripeInformation stripe: reader.getStripes()) {
+      if (splitStart > stripe.getOffset()) {
+        // This stripe starts before the current split starts. This stripe is not included in this split.
+        minRowId += stripe.getNumberOfRows();
+      }
+
+      if (splitEnd > stripe.getOffset()) {
+        // This stripe starts before the current split ends.
+        maxRowId += stripe.getNumberOfRows();
+      } else {
+        // The split ends before (or exactly where) this stripe starts.
+        // Remaining stripes are not included in this split.
+        break;
+      }
+    }
+
+    RecordIdentifier minKey = new RecordIdentifier(syntheticProps.getSyntheticWriteId(),
+        syntheticProps.getBucketProperty(), minRowId);
+
+    RecordIdentifier maxKey = new RecordIdentifier(syntheticProps.getSyntheticWriteId(),
+        syntheticProps.getBucketProperty(), maxRowId > 0? maxRowId - 1: 0);
+
+    OrcRawRecordMerger.KeyInterval keyIntervalTmp = new OrcRawRecordMerger.KeyInterval(minKey, maxKey);
+
+    if (minRowId >= maxRowId) {
+      /**
+       * The split lies entirely within a single stripe. In this case, the reader for this split will not read any data.
+       * See {@link org.apache.orc.impl.RecordReaderImpl#RecordReaderImpl
+       * We can return the min max key interval as is (it will not read any of the delete delta records into mem)
+       */
+
+      LOG.info("findOriginalMinMaxKeys(): This split starts and ends in the same stripe.");
+    }
+
+    LOG.info("findOriginalMinMaxKeys(): " + keyIntervalTmp);
+
+    // Using min/max ROW__ID from original will work for ppd to the delete deltas because the writeid is the same in
+    // the min and the max ROW__ID
+    setSARG(keyIntervalTmp, deleteEventReaderOptions, minKey.getBucketProperty(), maxKey.getBucketProperty(),
+        minKey.getRowId(), maxKey.getRowId());
+
+    return keyIntervalTmp;
+  }
+
   /**
    * See {@link #next(NullWritable, VectorizedRowBatch)} first and
    * {@link OrcRawRecordMerger.OriginalReaderPair}.

http://git-wip-us.apache.org/repos/asf/hive/blob/39ed52c4/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 8f477f4..0a499b1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.File;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.Columniz
 import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry;
 
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.io.LongWritable;
@@ -63,6 +66,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
   private FileSystem fs;
   private Path root;
   private ObjectInspector inspector;
+  private ObjectInspector originalInspector;
 
   public static class DummyRow {
     LongWritable field;
@@ -88,6 +92,24 @@ public class TestVectorizedOrcAcidRowBatchReader {
 
   }
 
+  /**
+   * Dummy row for original files.
+   */
+  public static class DummyOriginalRow {
+    LongWritable field;
+
+    DummyOriginalRow(long val) {
+      field = new LongWritable(val);
+    }
+
+    static String getColumnNamesProperty() {
+      return "field";
+    }
+    static String getColumnTypesProperty() {
+      return "bigint";
+    }
+  }
+
   @Before
   public void setup() throws Exception {
     conf = new JobConf();
@@ -110,6 +132,9 @@ public class TestVectorizedOrcAcidRowBatchReader {
     synchronized (TestOrcFile.class) {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+
+      originalInspector = ObjectInspectorFactory.getReflectionObjectInspector(DummyOriginalRow.class,
+          ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
   }
   @Test
@@ -370,6 +395,158 @@ public class TestVectorizedOrcAcidRowBatchReader {
     }
 
   }
+
+  @Test
+  public void testDeleteEventOriginalFilteringOn() throws Exception {
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true);
+    testDeleteEventOriginalFiltering();
+  }
+
+  @Test
+  public void testDeleteEventOriginalFilteringOff() throws Exception {
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false);
+    testDeleteEventOriginalFiltering();
+  }
+
+  public void testDeleteEventOriginalFiltering() throws Exception {
+    boolean filterOn =
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS);
+
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
+
+    // Create 3 original files with 3 rows each
+    Properties properties = new Properties();
+    properties.setProperty("columns", DummyOriginalRow.getColumnNamesProperty());
+    properties.setProperty("columns.types", DummyOriginalRow.getColumnTypesProperty());
+
+    OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(properties, conf);
+    writerOptions.inspector(originalInspector);
+
+    Path testFilePath = new Path(root, "000000_0");
+    Writer writer = OrcFile.createWriter(testFilePath, writerOptions);
+
+    writer.addRow(new DummyOriginalRow(0));
+    writer.addRow(new DummyOriginalRow(0));
+    writer.addRow(new DummyOriginalRow(0));
+    writer.close();
+
+    testFilePath = new Path(root, "000000_0_copy_1");
+
+    writer = OrcFile.createWriter(testFilePath, writerOptions);
+    writer.addRow(new DummyOriginalRow(0));
+    writer.addRow(new DummyOriginalRow(0));
+    writer.addRow(new DummyOriginalRow(0));
+    writer.close();
+
+    testFilePath = new Path(root, "000000_0_copy_2");
+
+    writer = OrcFile.createWriter(testFilePath, writerOptions);
+    writer.addRow(new DummyOriginalRow(0));
+    writer.addRow(new DummyOriginalRow(0));
+    writer.addRow(new DummyOriginalRow(0));
+    writer.close();
+
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+
+    int bucket = 0;
+
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .filesystem(fs)
+        .bucket(bucket)
+        .writingBase(false)
+        .minimumWriteId(1)
+        .maximumWriteId(1)
+        .inspector(inspector)
+        .reporter(Reporter.NULL)
+        .recordIdColumn(1)
+        .finalDestination(root);
+
+    int bucketProperty = BucketCodec.V1.encode(options);
+
+    RecordUpdater updater = new OrcRecordUpdater(root, options);
+
+    //delete 1 row from each of the original files
+    // Delete the last record in this split to test boundary conditions. It should not be present in the delete event
+    // registry for the next split
+    updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 2, 0, bucket));
+    // Delete the first record in this split to test boundary conditions. It should not be present in the delete event
+    // registry for the previous split
+    updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 3, 0, bucket));
+    updater.delete(options.getMinimumWriteId(), new DummyRow(-1, 7, 0, bucket));
+    updater.close(false);
+
+    //HWM is not important - just make sure deltas created above are read as if committed
+    conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:2:" + Long.MAX_VALUE + "::");
+
+    // Set vector mode to true int the map work so that we recognize this as a vector mode execution during the split
+    // generation. Without this we will not compute the offset for the synthetic row ids.
+    MapWork mapWork = new MapWork();
+    mapWork.setVectorMode(true);
+    VectorizedRowBatchCtx vrbContext = new VectorizedRowBatchCtx();
+    mapWork.setVectorizedRowBatchCtx(vrbContext);
+    HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+    Utilities.setMapWork(conf, mapWork);
+
+    // now we have 3 delete events total, but for each split we should only
+    // load 1 into DeleteRegistry (if filtering is on)
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies();
+    assertEquals(1, splitStrategies.size());
+    List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+
+    assertEquals(3, splits.size());
+    assertEquals(root.toUri().toString() + File.separator + "000000_0",
+        splits.get(0).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+
+    assertEquals(root.toUri().toString() + File.separator + "000000_0_copy_1",
+        splits.get(1).getPath().toUri().toString());
+    assertTrue(splits.get(1).isOriginal());
+
+    assertEquals(root.toUri().toString() + File.separator + "000000_0_copy_2",
+        splits.get(2).getPath().toUri().toString());
+    assertTrue(splits.get(2).isOriginal());
+
+    VectorizedOrcAcidRowBatchReader vectorizedReader =
+        new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, vrbContext);
+    ColumnizedDeleteEventRegistry deleteEventRegistry =
+        (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry();
+    assertEquals("number of delete events for stripe 1", filterOn ? 1 : 3, deleteEventRegistry.size());
+    OrcRawRecordMerger.KeyInterval keyInterval = vectorizedReader.getKeyInterval();
+    if(filterOn) {
+      assertEquals(new OrcRawRecordMerger.KeyInterval(
+              new RecordIdentifier(0, bucketProperty, 0),
+              new RecordIdentifier(0, bucketProperty, 2)),
+          keyInterval);
+    } else {
+      assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval);
+    }
+
+    vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(1), conf, Reporter.NULL, vrbContext);
+    deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry();
+    assertEquals("number of delete events for stripe 2", filterOn ? 1 : 3, deleteEventRegistry.size());
+    keyInterval = vectorizedReader.getKeyInterval();
+    if(filterOn) {
+      assertEquals(new OrcRawRecordMerger.KeyInterval(
+              new RecordIdentifier(0, bucketProperty, 3),
+              new RecordIdentifier(0, bucketProperty, 5)),
+          keyInterval);
+    } else {
+      assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval);
+    }
+
+    vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(2), conf, Reporter.NULL, vrbContext);
+    deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader.getDeleteEventRegistry();
+    assertEquals("number of delete events for stripe 3", filterOn ? 1 : 3, deleteEventRegistry.size());
+    keyInterval = vectorizedReader.getKeyInterval();
+    if(filterOn) {
+      assertEquals(new OrcRawRecordMerger.KeyInterval(
+          new RecordIdentifier(0, bucketProperty, 6),
+          new RecordIdentifier(0, bucketProperty, 8)), keyInterval);
+    } else {
+      assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval);
+    }
+  }
+
     @Test
   public void testVectorizedOrcAcidRowBatchReader() throws Exception {
     conf.set("bucket_count", "1");