You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/09/16 09:00:20 UTC

[hive] branch master updated: HIVE-26496: Improvement to the fetch operator to scan only delete_delta folders containing transactions relevant to the current split (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Laszlo Bodor)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 74e0877298 HIVE-26496: Improvement to the fetch operator to scan only delete_delta folders containing transactions relevant to the current split (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Laszlo Bodor)
74e0877298 is described below

commit 74e087729830babe4e1e846e053d52ad0716e314
Author: Dmitriy Fingerman <dm...@gmail.com>
AuthorDate: Fri Sep 16 05:00:07 2022 -0400

    HIVE-26496: Improvement to the fetch operator to scan only delete_delta folders containing transactions relevant to the current split (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Laszlo Bodor)
    
    Closes #3559
---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   2 +-
 .../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java |  25 +--
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java    |   2 +-
 .../apache/hadoop/hive/ql/io/orc/TestOrcSplit.java | 184 +++++++++++++++++++++
 .../orc/TestVectorizedOrcAcidRowBatchReader.java   |   4 +-
 5 files changed, 202 insertions(+), 15 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2811763c06..4582878817 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -537,7 +537,7 @@ public class AcidUtils {
    */
   public static AcidOutputFormat.Options
                     parseBaseOrDeltaBucketFilename(Path bucketFile,
-                                                   Configuration conf) throws IOException {
+                                                   Configuration conf) {
     AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
     String filename = bucketFile.getName();
     int bucket = parseBucketId(bucketFile);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index b525d6feb4..a85b4bd7a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -35,7 +36,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
@@ -48,7 +48,6 @@ import org.apache.orc.impl.OrcTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * OrcFileSplit. Holds file meta info
  *
@@ -104,7 +103,15 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     this.isOriginal = isOriginal;
     this.hasBase = hasBase;
     this.rootDir = rootDir;
-    this.deltas.addAll(filterDeltasByBucketId(deltas, AcidUtils.parseBucketId(path)));
+    int bucketId = AcidUtils.parseBucketId(path);
+    long minWriteId = !deltas.isEmpty() ?
+            AcidUtils.parseBaseOrDeltaBucketFilename(path, null).getMinimumWriteId() : -1;
+    this.deltas.addAll(
+            deltas.stream()
+            // filtering out delete deltas with transactions happened before the transactions of the split
+            .filter(delta -> delta.getMaxWriteId() >= minWriteId)
+            .flatMap(delta -> filterDeltasByBucketId(delta, bucketId))
+            .collect(Collectors.toList()));
     this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
     // setting file length to Long.MAX_VALUE will let orc reader read file length from file system
     this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen;
@@ -114,18 +121,16 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   /**
    * For every split we only want to keep the delete deltas, that contains files for that bucket.
    * If we filter out files, we might need to filter out statementIds from multistatement transactions.
-   * @param deltas
+   * @param dmd
    * @param bucketId
    * @return
    */
-  private List<AcidInputFormat.DeltaMetaData> filterDeltasByBucketId(List<AcidInputFormat.DeltaMetaData> deltas, int bucketId) {
-    List<AcidInputFormat.DeltaMetaData> results = new ArrayList<>();
-    for (AcidInputFormat.DeltaMetaData dmd : deltas) {
+  private Stream<AcidInputFormat.DeltaMetaData> filterDeltasByBucketId(AcidInputFormat.DeltaMetaData dmd, int bucketId) {
       Map<Integer, AcidInputFormat.DeltaFileMetaData> bucketFilesbyStmtId =
           dmd.getDeltaFiles().stream().filter(deltaFileMetaData -> deltaFileMetaData.getBucketId() == bucketId)
               .collect(Collectors.toMap(AcidInputFormat.DeltaFileMetaData::getStmtId, Function.identity()));
       if (bucketFilesbyStmtId.isEmpty()) {
-        continue;
+        return Stream.empty();
       }
       // Keep only the relevant stmtIds
       List<Integer> stmtIds = dmd.getStmtIds().stream()
@@ -137,10 +142,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
               file.getFileId(), stmtIds.size() > 1 ? file.getStmtId() : null, bucketId))
           .collect(Collectors.toList());
 
-      results.add(new AcidInputFormat.DeltaMetaData(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtIds,
+      return Stream.of(new AcidInputFormat.DeltaMetaData(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtIds,
           dmd.getVisibilityTxnId(), bucketFiles));
-    }
-    return results;
   }
 
   @Override
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index b6357093f4..070ad4b88a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -111,7 +111,7 @@ public class TestAcidUtils {
   }
 
   @Test
-  public void testParsing() throws Exception {
+  public void testParsing() {
     Configuration conf = new Configuration();
     MockFileSystem fs = new MockFileSystem(conf,
         //new MockFile("mock:/tmp/base_000123/bucket_00001", 500, new byte[0]),
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplit.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplit.java
new file mode 100644
index 0000000000..83d24a1880
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplit.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.*;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.OrcConf;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.BitSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for OrcSplit class
+ */
+public class TestOrcSplit {
+
+  private JobConf conf;
+  private FileSystem fs;
+  private Path root;
+  private ObjectInspector inspector;
+  public static class DummyRow {
+    LongWritable field;
+    RecordIdentifier ROW__ID;
+
+    DummyRow(long val, long rowId, long origTxn, int bucket) {
+      field = new LongWritable(val);
+      bucket = BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucket));
+      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+    }
+
+    static String getColumnNamesProperty() {
+      return "field";
+    }
+    static String getColumnTypesProperty() {
+      return "bigint";
+    }
+
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = new JobConf();
+    conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true);
+    conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, DummyRow.getColumnNamesProperty());
+    conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, DummyRow.getColumnTypesProperty());
+    conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+    conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+    OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 1);
+
+    Path workDir = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp"));
+    root = new Path(workDir, "TestOrcSplit.testDump");
+    fs = root.getFileSystem(conf);
+    root = fs.makeQualified(root);
+    fs.delete(root, true);
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+  }
+
+  private List<OrcInputFormat.SplitStrategy<?>> getSplitStrategies() throws Exception {
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+            AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+    OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
+            context, () -> fs, root, false, null);
+    Directory adi = gen.call();
+    return OrcInputFormat.determineSplitStrategies(
+            null, context, adi.getFs(), adi.getPath(), adi.getFiles(), adi.getDeleteDeltas(),
+            null, null, true);
+  }
+
+  /**
+   * This test checks that a split filters out delete_delta folders which only have transactions that happened
+   * in the past relative to the current split
+   */
+  @Test
+  public void testDeleteDeltasFiltering() throws Exception {
+
+    int bucket = 0;
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+            .filesystem(fs)
+            .bucket(bucket)
+            .writingBase(false)
+            .minimumWriteId(1)
+            .maximumWriteId(1)
+            .inspector(inspector)
+            .reporter(Reporter.NULL)
+            .recordIdColumn(1)
+            .finalDestination(root);
+
+    RecordUpdater updater = new OrcRecordUpdater(root, options);
+
+    // Inserting a new record and then deleting it, 3 times.
+    // Every insertion/deletion is in a separate transaction.
+    // When reading, this will generate 3 splits, one per insert event.
+    for (int i = 1; i <= 5; i = i + 2) {
+      // Transaction that inserts one row with value i
+      options.minimumWriteId(i)
+              .maximumWriteId(i);
+      updater = new OrcRecordUpdater(root, options);
+      updater.insert(options.getMinimumWriteId(),
+              new DummyRow(i, 0, options.getMinimumWriteId(), bucket));
+      updater.close(false);
+
+      // Transaction that deletes previously inserted row
+      options.minimumWriteId(i+1)
+              .maximumWriteId(i+1);
+      updater = new OrcRecordUpdater(root, options);
+      updater.delete(options.getMinimumWriteId(),
+              new DummyRow(-1, 0, i, bucket));
+      updater.close(false);
+    }
+
+    conf.set(ValidTxnList.VALID_TXNS_KEY,
+            new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
+    conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY,
+            "tbl:6:" + Long.MAX_VALUE + "::");
+
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies();
+    assertEquals(1, splitStrategies.size());
+    List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+
+    assertEquals(3, splits.size());
+
+    // Split for transaction #1 should include all 3 delete deltas
+    assertEquals(3, splits.get(0).getDeltas().size());
+    assertEquals(2, splits.get(0).getDeltas().get(0).getMinWriteId());
+    assertEquals(2, splits.get(0).getDeltas().get(0).getMaxWriteId());
+    assertEquals(4, splits.get(0).getDeltas().get(1).getMinWriteId());
+    assertEquals(4, splits.get(0).getDeltas().get(1).getMaxWriteId());
+    assertEquals(6, splits.get(0).getDeltas().get(2).getMinWriteId());
+    assertEquals(6, splits.get(0).getDeltas().get(2).getMaxWriteId());
+
+    // Split for transaction #3 should include only 2 deltas with transactions #4 & #6
+    assertEquals(2, splits.get(1).getDeltas().size());
+    assertEquals(4, splits.get(1).getDeltas().get(0).getMinWriteId());
+    assertEquals(4, splits.get(1).getDeltas().get(0).getMaxWriteId());
+    assertEquals(6, splits.get(1).getDeltas().get(1).getMinWriteId());
+    assertEquals(6, splits.get(1).getDeltas().get(1).getMaxWriteId());
+
+    // Split for transaction #5 should include only 1 deltas with transactions #6
+    assertEquals(1, splits.get(2).getDeltas().size());
+    assertEquals(6, splits.get(2).getDeltas().get(0).getMinWriteId());
+    assertEquals(6, splits.get(2).getDeltas().get(0).getMaxWriteId());
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index b7fd6acc27..b192da437e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -1185,7 +1185,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
   }
 
   @Test
-  public void testIsQualifiedDeleteDeltaForSplit() throws IOException {
+  public void testIsQualifiedDeleteDeltaForSplit() {
     // Original file
     checkPath("00000_0", "delete_delta_000012_000012_0000", true);
     checkPath("00000_0", "delete_delta_000001_000001", true);
@@ -1231,7 +1231,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
     checkPath("delta_0000002_0000002/bucket_00001", "delete_delta_0000002_0000002_0001", true);
   }
 
-  private void checkPath(String splitPath, String deleteDeltaPath, boolean expected) throws IOException {
+  private void checkPath(String splitPath, String deleteDeltaPath, boolean expected) {
     String tableDir = "";//hdfs://localhost:59316/base/warehouse/acid_test/";
     AcidOutputFormat.Options ao = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(tableDir + splitPath), conf);
     ParsedDeltaLight parsedDelta = ParsedDeltaLight.parse(new Path(tableDir + deleteDeltaPath));