You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/12/16 13:11:17 UTC
[iceberg] branch master updated: Hive: ORC vectorization fails when split offsets are considered during split generation (#3748)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 339866d Hive: ORC vectorization fails when split offsets are considered during split generation (#3748)
339866d is described below
commit 339866d72784a3e90625037d3a9de4fdafc5fcef
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Thu Dec 16 14:11:00 2021 +0100
Hive: ORC vectorization fails when split offsets are considered during split generation (#3748)
---
.../apache/iceberg/data/GenericAppenderHelper.java | 27 ++++++++---
hive3/build.gradle | 1 +
.../mr/hive/vector/HiveVectorizedReader.java | 14 ++++--
.../apache/iceberg/orc/VectorizedReadUtils.java | 52 ++++++++++++++++++++++
.../java/org/apache/iceberg/mr/TestHelper.java | 2 +-
.../TestHiveIcebergStorageHandlerWithEngine.java | 33 ++++++++++++++
6 files changed, 119 insertions(+), 10 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index 299135a..7d9d7ad 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.data;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
@@ -39,14 +40,22 @@ import org.junit.rules.TemporaryFolder;
*/
public class GenericAppenderHelper {
+ private static final String ORC_CONFIG_PREFIX = "^orc.*";
+
private final Table table;
private final FileFormat fileFormat;
private final TemporaryFolder tmp;
+ private final Configuration conf;
- public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+ public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp, Configuration conf) {
this.table = table;
this.fileFormat = fileFormat;
this.tmp = tmp;
+ this.conf = conf;
+ }
+
+ public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+ this(table, fileFormat, tmp, null);
}
public void appendToTable(DataFile... dataFiles) {
@@ -73,13 +82,20 @@ public class GenericAppenderHelper {
Preconditions.checkNotNull(table, "table not set");
File file = tmp.newFile();
Assert.assertTrue(file.delete());
- return appendToLocalFile(table, file, fileFormat, partition, records);
+ return appendToLocalFile(table, file, fileFormat, partition, records, conf);
}
- private static DataFile appendToLocalFile(
- Table table, File file, FileFormat format, StructLike partition, List<Record> records)
+ private static DataFile appendToLocalFile(Table table, File file, FileFormat format, StructLike partition,
+ List<Record> records, Configuration conf)
throws IOException {
- FileAppender<Record> appender = new GenericAppenderFactory(table.schema()).newAppender(
+ GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema());
+
+ // Push down ORC related settings to appender if there are any
+ if (FileFormat.ORC.equals(format) && conf != null) {
+ appenderFactory.setAll(conf.getValByRegex(ORC_CONFIG_PREFIX));
+ }
+
+ FileAppender<Record> appender = appenderFactory.newAppender(
Files.localOutput(file), format);
try (FileAppender<Record> fileAppender = appender) {
fileAppender.addAll(records);
@@ -92,6 +108,7 @@ public class GenericAppenderHelper {
.withMetrics(appender.metrics())
.withFormat(format)
.withPartition(partition)
+ .withSplitOffsets(appender.splitOffsets())
.build();
}
}
diff --git a/hive3/build.gradle b/hive3/build.gradle
index 75743d9..5dddf8a 100644
--- a/hive3/build.gradle
+++ b/hive3/build.gradle
@@ -49,6 +49,7 @@ project(':iceberg-hive3') {
compileOnly project(':iceberg-hive3-orc-bundle')
compileOnly project(':iceberg-mr')
compileOnly project(':iceberg-data')
+ compileOnly project(':iceberg-orc')
compileOnly("org.apache.hadoop:hadoop-client:${hadoopVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 5c831e5..9677a33 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -42,7 +42,9 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.orc.VectorizedReadUtils;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.impl.OrcTail;
/**
* Utility class to create vectorized readers for Hive.
@@ -98,11 +100,15 @@ public class HiveVectorizedReader {
try {
switch (format) {
case ORC:
- InputSplit split = new OrcSplit(path, null, task.start(), task.length(), (String[]) null, null,
- false, false, Lists.newArrayList(), 0, task.length(), path.getParent());
- RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
- recordReader = new VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
+ // Metadata information has to be passed along in the OrcSplit. Without specifying this, the vectorized
+ // reader will assume that the ORC file ends at the task's start + length, and might fail reading the tail..
+ OrcTail orcTail = VectorizedReadUtils.getOrcTail(inputFile, job);
+
+ InputSplit split = new OrcSplit(path, null, task.start(), task.length(), (String[]) null, orcTail,
+ false, false, Lists.newArrayList(), 0, task.length(), path.getParent());
+ RecordReader<NullWritable, VectorizedRowBatch> recordReader =
+ new VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
return createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues);
default:
diff --git a/hive3/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/hive3/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
new file mode 100644
index 0000000..2a7fd8b
--- /dev/null
+++ b/hive3/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.io.InputFile;
+import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.ReaderImpl;
+
+/**
+ * Utilities that rely on Iceberg code from org.apache.iceberg.orc package and are required for ORC vectorization.
+ */
+public class VectorizedReadUtils {
+
+ private VectorizedReadUtils() {
+
+ }
+
+ /**
+ * Opens the ORC inputFile and reads the metadata information to construct the OrcTail content.
+ * Unfortunately the API doesn't allow simple access to OrcTail, so we need the serialization trick.
+ * @param inputFile - the ORC file
+ * @param job - JobConf instance for the current task
+ * @throws IOException - errors relating to accessing the ORC file
+ */
+ public static OrcTail getOrcTail(InputFile inputFile, JobConf job) throws IOException {
+
+ try (ReaderImpl orcFileReader = (ReaderImpl) ORC.newFileReader(inputFile, job)) {
+ return ReaderImpl.extractFileTail(orcFileReader.getSerializedFileFooter());
+ }
+
+ }
+
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
index 394b747..3635707 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -115,7 +115,7 @@ public class TestHelper {
}
private GenericAppenderHelper appender() {
- return new GenericAppenderHelper(table, fileFormat, tmp);
+ return new GenericAppenderHelper(table, fileFormat, tmp, conf);
}
public static class RecordsBuilder {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 577cef7..4626d19 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -59,6 +60,7 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assume.assumeTrue;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
@@ -777,6 +779,37 @@ public class TestHiveIcebergStorageHandlerWithEngine {
Assert.assertTrue(stats.startsWith("{\"BASIC_STATS\":\"true\"")); // it's followed by column stats in Hive3
}
+ /**
+ * Tests that vectorized ORC reading code path correctly handles when the same ORC file is split into multiple parts.
+ * Although the split offsets and length will not always include the file tail that contains the metadata, the
+ * vectorized reader needs to make sure to handle the tail reading regardless of the offsets. If this is not done
+ * correctly, the last SELECT query will fail.
+ * @throws Exception - any test error
+ */
+ @Test
+ public void testVectorizedOrcMultipleSplits() throws Exception {
+ assumeTrue(isVectorized && FileFormat.ORC.equals(fileFormat));
+
+ // This data will be held by a ~870kB ORC file
+ List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ 20000, 0L);
+
+ // To support splitting the ORC file, we need to specify the stripe size to a small value. It looks like the min
+ // value is about 220kB, no smaller stripes are written by ORC. Anyway, this setting will produce 4 stripes.
+ shell.setHiveSessionValue("orc.stripe.size", "210000");
+
+ testTables.createTable(shell, "targettab", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, records);
+
+ // Will request 4 splits, separated on the exact stripe boundaries within the ORC file.
+ // (Would request 5 if ORC split generation wouldn't be split (aka stripe) offset aware).
+ shell.setHiveSessionValue(InputFormatConfig.SPLIT_SIZE, "210000");
+ List<Object[]> result = shell.executeStatement("SELECT * FROM targettab ORDER BY last_name");
+
+ Assert.assertEquals(20000, result.size());
+
+ }
+
private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
String tableName = "complex_table";
Table table = testTables.createTable(shell, "complex_table", schema, fileFormat, ImmutableList.of());