You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/09/16 09:00:20 UTC
[hive] branch master updated: HIVE-26496: Improvement to the fetch operator to scan only delete_delta folders containing transactions relevant to the current split (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Laszlo Bodor)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 74e0877298 HIVE-26496: Improvement to the fetch operator to scan only delete_delta folders containing transactions relevant to the current split (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Laszlo Bodor)
74e0877298 is described below
commit 74e087729830babe4e1e846e053d52ad0716e314
Author: Dmitriy Fingerman <dm...@gmail.com>
AuthorDate: Fri Sep 16 05:00:07 2022 -0400
HIVE-26496: Improvement to the fetch operator to scan only delete_delta folders containing transactions relevant to the current split (Dmitriy Fingerman, reviewed by Denys Kuzmenko, Laszlo Bodor)
Closes #3559
---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 2 +-
.../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java | 25 +--
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 2 +-
.../apache/hadoop/hive/ql/io/orc/TestOrcSplit.java | 184 +++++++++++++++++++++
.../orc/TestVectorizedOrcAcidRowBatchReader.java | 4 +-
5 files changed, 202 insertions(+), 15 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2811763c06..4582878817 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -537,7 +537,7 @@ public class AcidUtils {
*/
public static AcidOutputFormat.Options
parseBaseOrDeltaBucketFilename(Path bucketFile,
- Configuration conf) throws IOException {
+ Configuration conf) {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
int bucket = parseBucketId(bucketFile);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index b525d6feb4..a85b4bd7a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -35,7 +36,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.ColumnarSplit;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
@@ -48,7 +48,6 @@ import org.apache.orc.impl.OrcTail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* OrcFileSplit. Holds file meta info
*
@@ -104,7 +103,15 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
this.isOriginal = isOriginal;
this.hasBase = hasBase;
this.rootDir = rootDir;
- this.deltas.addAll(filterDeltasByBucketId(deltas, AcidUtils.parseBucketId(path)));
+ int bucketId = AcidUtils.parseBucketId(path);
+ long minWriteId = !deltas.isEmpty() ?
+ AcidUtils.parseBaseOrDeltaBucketFilename(path, null).getMinimumWriteId() : -1;
+ this.deltas.addAll(
+ deltas.stream()
+ // filtering out delete deltas with transactions happened before the transactions of the split
+ .filter(delta -> delta.getMaxWriteId() >= minWriteId)
+ .flatMap(delta -> filterDeltasByBucketId(delta, bucketId))
+ .collect(Collectors.toList()));
this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
// setting file length to Long.MAX_VALUE will let orc reader read file length from file system
this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen;
@@ -114,18 +121,16 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
/**
* For every split we only want to keep the delete deltas, that contains files for that bucket.
* If we filter out files, we might need to filter out statementIds from multistatement transactions.
- * @param deltas
+ * @param dmd
* @param bucketId
* @return
*/
- private List<AcidInputFormat.DeltaMetaData> filterDeltasByBucketId(List<AcidInputFormat.DeltaMetaData> deltas, int bucketId) {
- List<AcidInputFormat.DeltaMetaData> results = new ArrayList<>();
- for (AcidInputFormat.DeltaMetaData dmd : deltas) {
+ private Stream<AcidInputFormat.DeltaMetaData> filterDeltasByBucketId(AcidInputFormat.DeltaMetaData dmd, int bucketId) {
Map<Integer, AcidInputFormat.DeltaFileMetaData> bucketFilesbyStmtId =
dmd.getDeltaFiles().stream().filter(deltaFileMetaData -> deltaFileMetaData.getBucketId() == bucketId)
.collect(Collectors.toMap(AcidInputFormat.DeltaFileMetaData::getStmtId, Function.identity()));
if (bucketFilesbyStmtId.isEmpty()) {
- continue;
+ return Stream.empty();
}
// Keep only the relevant stmtIds
List<Integer> stmtIds = dmd.getStmtIds().stream()
@@ -137,10 +142,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
file.getFileId(), stmtIds.size() > 1 ? file.getStmtId() : null, bucketId))
.collect(Collectors.toList());
- results.add(new AcidInputFormat.DeltaMetaData(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtIds,
+ return Stream.of(new AcidInputFormat.DeltaMetaData(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtIds,
dmd.getVisibilityTxnId(), bucketFiles));
- }
- return results;
}
@Override
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index b6357093f4..070ad4b88a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -111,7 +111,7 @@ public class TestAcidUtils {
}
@Test
- public void testParsing() throws Exception {
+ public void testParsing() {
Configuration conf = new Configuration();
MockFileSystem fs = new MockFileSystem(conf,
//new MockFile("mock:/tmp/base_000123/bucket_00001", 500, new byte[0]),
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplit.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplit.java
new file mode 100644
index 0000000000..83d24a1880
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplit.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.*;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.OrcConf;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.BitSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for OrcSplit class
+ */
+public class TestOrcSplit {
+
+ private JobConf conf;
+ private FileSystem fs;
+ private Path root;
+ private ObjectInspector inspector;
+ public static class DummyRow {
+ LongWritable field;
+ RecordIdentifier ROW__ID;
+
+ DummyRow(long val, long rowId, long origTxn, int bucket) {
+ field = new LongWritable(val);
+ bucket = BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucket));
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+ }
+
+ static String getColumnNamesProperty() {
+ return "field";
+ }
+ static String getColumnTypesProperty() {
+ return "bigint";
+ }
+
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new JobConf();
+ conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true);
+ conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, DummyRow.getColumnNamesProperty());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, DummyRow.getColumnTypesProperty());
+ conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+ conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+ OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 1);
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ root = new Path(workDir, "TestOrcSplit.testDump");
+ fs = root.getFileSystem(conf);
+ root = fs.makeQualified(root);
+ fs.delete(root, true);
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ }
+
+ private List<OrcInputFormat.SplitStrategy<?>> getSplitStrategies() throws Exception {
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+ OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
+ context, () -> fs, root, false, null);
+ Directory adi = gen.call();
+ return OrcInputFormat.determineSplitStrategies(
+ null, context, adi.getFs(), adi.getPath(), adi.getFiles(), adi.getDeleteDeltas(),
+ null, null, true);
+ }
+
+ /**
+ * This test checks that a split filters out delete_delta folders which only have transactions that happened
+ * in the past relative to the current split
+ */
+ @Test
+ public void testDeleteDeltasFiltering() throws Exception {
+
+ int bucket = 0;
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .filesystem(fs)
+ .bucket(bucket)
+ .writingBase(false)
+ .minimumWriteId(1)
+ .maximumWriteId(1)
+ .inspector(inspector)
+ .reporter(Reporter.NULL)
+ .recordIdColumn(1)
+ .finalDestination(root);
+
+ RecordUpdater updater = new OrcRecordUpdater(root, options);
+
+ // Inserting a new record and then deleting it, 3 times.
+ // Every insertion/deletion is in a separate transaction.
+ // When reading, this will generate 3 splits, one per insert event.
+ for (int i = 1; i <= 5; i = i + 2) {
+ // Transaction that inserts one row with value i
+ options.minimumWriteId(i)
+ .maximumWriteId(i);
+ updater = new OrcRecordUpdater(root, options);
+ updater.insert(options.getMinimumWriteId(),
+ new DummyRow(i, 0, options.getMinimumWriteId(), bucket));
+ updater.close(false);
+
+ // Transaction that deletes previously inserted row
+ options.minimumWriteId(i+1)
+ .maximumWriteId(i+1);
+ updater = new OrcRecordUpdater(root, options);
+ updater.delete(options.getMinimumWriteId(),
+ new DummyRow(-1, 0, i, bucket));
+ updater.close(false);
+ }
+
+ conf.set(ValidTxnList.VALID_TXNS_KEY,
+ new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
+ conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY,
+ "tbl:6:" + Long.MAX_VALUE + "::");
+
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = getSplitStrategies();
+ assertEquals(1, splitStrategies.size());
+ List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+
+ assertEquals(3, splits.size());
+
+ // Split for transaction #1 should include all 3 delete deltas
+ assertEquals(3, splits.get(0).getDeltas().size());
+ assertEquals(2, splits.get(0).getDeltas().get(0).getMinWriteId());
+ assertEquals(2, splits.get(0).getDeltas().get(0).getMaxWriteId());
+ assertEquals(4, splits.get(0).getDeltas().get(1).getMinWriteId());
+ assertEquals(4, splits.get(0).getDeltas().get(1).getMaxWriteId());
+ assertEquals(6, splits.get(0).getDeltas().get(2).getMinWriteId());
+ assertEquals(6, splits.get(0).getDeltas().get(2).getMaxWriteId());
+
+ // Split for transaction #3 should include only 2 deltas with transactions #4 & #6
+ assertEquals(2, splits.get(1).getDeltas().size());
+ assertEquals(4, splits.get(1).getDeltas().get(0).getMinWriteId());
+ assertEquals(4, splits.get(1).getDeltas().get(0).getMaxWriteId());
+ assertEquals(6, splits.get(1).getDeltas().get(1).getMinWriteId());
+ assertEquals(6, splits.get(1).getDeltas().get(1).getMaxWriteId());
+
+ // Split for transaction #5 should include only 1 deltas with transactions #6
+ assertEquals(1, splits.get(2).getDeltas().size());
+ assertEquals(6, splits.get(2).getDeltas().get(0).getMinWriteId());
+ assertEquals(6, splits.get(2).getDeltas().get(0).getMaxWriteId());
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index b7fd6acc27..b192da437e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -1185,7 +1185,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
}
@Test
- public void testIsQualifiedDeleteDeltaForSplit() throws IOException {
+ public void testIsQualifiedDeleteDeltaForSplit() {
// Original file
checkPath("00000_0", "delete_delta_000012_000012_0000", true);
checkPath("00000_0", "delete_delta_000001_000001", true);
@@ -1231,7 +1231,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
checkPath("delta_0000002_0000002/bucket_00001", "delete_delta_0000002_0000002_0001", true);
}
- private void checkPath(String splitPath, String deleteDeltaPath, boolean expected) throws IOException {
+ private void checkPath(String splitPath, String deleteDeltaPath, boolean expected) {
String tableDir = "";//hdfs://localhost:59316/base/warehouse/acid_test/";
AcidOutputFormat.Options ao = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(tableDir + splitPath), conf);
ParsedDeltaLight parsedDelta = ParsedDeltaLight.parse(new Path(tableDir + deleteDeltaPath));