You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/12/16 13:11:17 UTC

[iceberg] branch master updated: Hive: ORC vectorization fails when split offsets are considered during split generation (#3748)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 339866d  Hive: ORC vectorization fails when split offsets are considered during split generation (#3748)
339866d is described below

commit 339866d72784a3e90625037d3a9de4fdafc5fcef
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Thu Dec 16 14:11:00 2021 +0100

    Hive: ORC vectorization fails when split offsets are considered during split generation (#3748)
---
 .../apache/iceberg/data/GenericAppenderHelper.java | 27 ++++++++---
 hive3/build.gradle                                 |  1 +
 .../mr/hive/vector/HiveVectorizedReader.java       | 14 ++++--
 .../apache/iceberg/orc/VectorizedReadUtils.java    | 52 ++++++++++++++++++++++
 .../java/org/apache/iceberg/mr/TestHelper.java     |  2 +-
 .../TestHiveIcebergStorageHandlerWithEngine.java   | 33 ++++++++++++++
 6 files changed, 119 insertions(+), 10 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index 299135a..7d9d7ad 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.data;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
@@ -39,14 +40,22 @@ import org.junit.rules.TemporaryFolder;
  */
 public class GenericAppenderHelper {
 
+  private static final String ORC_CONFIG_PREFIX = "^orc.*";
+
   private final Table table;
   private final FileFormat fileFormat;
   private final TemporaryFolder tmp;
+  private final Configuration conf;
 
-  public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+  public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp, Configuration conf) {
     this.table = table;
     this.fileFormat = fileFormat;
     this.tmp = tmp;
+    this.conf = conf;
+  }
+
+  public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) {
+    this(table, fileFormat, tmp, null);
   }
 
   public void appendToTable(DataFile... dataFiles) {
@@ -73,13 +82,20 @@ public class GenericAppenderHelper {
     Preconditions.checkNotNull(table, "table not set");
     File file = tmp.newFile();
     Assert.assertTrue(file.delete());
-    return appendToLocalFile(table, file, fileFormat, partition, records);
+    return appendToLocalFile(table, file, fileFormat, partition, records, conf);
   }
 
-  private static DataFile appendToLocalFile(
-      Table table, File file, FileFormat format, StructLike partition, List<Record> records)
+  private static DataFile appendToLocalFile(Table table, File file, FileFormat format, StructLike partition,
+      List<Record> records, Configuration conf)
       throws IOException {
-    FileAppender<Record> appender = new GenericAppenderFactory(table.schema()).newAppender(
+    GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema());
+
+    // Push down ORC related settings to appender if there are any
+    if (FileFormat.ORC.equals(format) && conf != null) {
+      appenderFactory.setAll(conf.getValByRegex(ORC_CONFIG_PREFIX));
+    }
+
+    FileAppender<Record> appender = appenderFactory.newAppender(
         Files.localOutput(file), format);
     try (FileAppender<Record> fileAppender = appender) {
       fileAppender.addAll(records);
@@ -92,6 +108,7 @@ public class GenericAppenderHelper {
         .withMetrics(appender.metrics())
         .withFormat(format)
         .withPartition(partition)
+        .withSplitOffsets(appender.splitOffsets())
         .build();
   }
 }
diff --git a/hive3/build.gradle b/hive3/build.gradle
index 75743d9..5dddf8a 100644
--- a/hive3/build.gradle
+++ b/hive3/build.gradle
@@ -49,6 +49,7 @@ project(':iceberg-hive3') {
     compileOnly project(':iceberg-hive3-orc-bundle')
     compileOnly project(':iceberg-mr')
     compileOnly project(':iceberg-data')
+    compileOnly project(':iceberg-orc')
 
     compileOnly("org.apache.hadoop:hadoop-client:${hadoopVersion}") {
       exclude group: 'org.apache.avro', module: 'avro'
diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 5c831e5..9677a33 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -42,7 +42,9 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.orc.VectorizedReadUtils;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.impl.OrcTail;
 
 /**
  * Utility class to create vectorized readers for Hive.
@@ -98,11 +100,15 @@ public class HiveVectorizedReader {
     try {
       switch (format) {
         case ORC:
-          InputSplit split = new OrcSplit(path, null, task.start(), task.length(), (String[]) null, null,
-              false, false, Lists.newArrayList(), 0, task.length(), path.getParent());
-          RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
 
-          recordReader = new VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
+          // Metadata information has to be passed along in the OrcSplit. Without specifying this, the vectorized
+          // reader will assume that the ORC file ends at the task's start + length, and might fail reading the tail..
+          OrcTail orcTail = VectorizedReadUtils.getOrcTail(inputFile, job);
+
+          InputSplit split = new OrcSplit(path, null, task.start(), task.length(), (String[]) null, orcTail,
+              false, false, Lists.newArrayList(), 0, task.length(), path.getParent());
+          RecordReader<NullWritable, VectorizedRowBatch> recordReader =
+              new VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
           return createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues);
 
         default:
diff --git a/hive3/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/hive3/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
new file mode 100644
index 0000000..2a7fd8b
--- /dev/null
+++ b/hive3/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.io.InputFile;
+import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.ReaderImpl;
+
+/**
+ * Utilities that rely on Iceberg code from org.apache.iceberg.orc package and are required for ORC vectorization.
+ */
+public class VectorizedReadUtils {
+
+  private VectorizedReadUtils() {
+
+  }
+
+  /**
+   * Opens the ORC inputFile and reads the metadata information to construct the OrcTail content.
+   * Unfortunately the API doesn't allow simple access to OrcTail, so we need the serialization trick.
+   * @param inputFile - the ORC file
+   * @param job - JobConf instance for the current task
+   * @throws IOException - errors relating to accessing the ORC file
+   */
+  public static OrcTail getOrcTail(InputFile inputFile, JobConf job) throws IOException {
+
+    try (ReaderImpl orcFileReader = (ReaderImpl) ORC.newFileReader(inputFile, job)) {
+      return ReaderImpl.extractFileTail(orcFileReader.getSerializedFileFooter());
+    }
+
+  }
+
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
index 394b747..3635707 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -115,7 +115,7 @@ public class TestHelper {
   }
 
   private GenericAppenderHelper appender() {
-    return new GenericAppenderHelper(table, fileFormat, tmp);
+    return new GenericAppenderHelper(table, fileFormat, tmp, conf);
   }
 
   public static class RecordsBuilder {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 577cef7..4626d19 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -59,6 +60,7 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assume.assumeTrue;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
 
@@ -777,6 +779,37 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     Assert.assertTrue(stats.startsWith("{\"BASIC_STATS\":\"true\"")); // it's followed by column stats in Hive3
   }
 
+  /**
+   * Tests that vectorized ORC reading code path correctly handles when the same ORC file is split into multiple parts.
+   * Although the split offsets and length will not always include the file tail that contains the metadata, the
+   * vectorized reader needs to make sure to handle the tail reading regardless of the offsets. If this is not done
+   * correctly, the last SELECT query will fail.
+   * @throws Exception - any test error
+   */
+  @Test
+  public void testVectorizedOrcMultipleSplits() throws Exception {
+    assumeTrue(isVectorized && FileFormat.ORC.equals(fileFormat));
+
+    // This data will be held by a ~870kB ORC file
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        20000, 0L);
+
+    // To support splitting the ORC file, we need to specify the stripe size to a small value. It looks like the min
+    // value is about 220kB, no smaller stripes are written by ORC. Anyway, this setting will produce 4 stripes.
+    shell.setHiveSessionValue("orc.stripe.size", "210000");
+
+    testTables.createTable(shell, "targettab", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, records);
+
+    // Will request 4 splits, separated on the exact stripe boundaries within the ORC file.
+    // (Would request 5 if ORC split generation wouldn't be split (aka stripe) offset aware).
+    shell.setHiveSessionValue(InputFormatConfig.SPLIT_SIZE, "210000");
+    List<Object[]> result = shell.executeStatement("SELECT * FROM targettab ORDER BY last_name");
+
+    Assert.assertEquals(20000, result.size());
+
+  }
+
   private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
     String tableName = "complex_table";
     Table table = testTables.createTable(shell, "complex_table", schema, fileFormat, ImmutableList.of());