You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/11/07 13:05:32 UTC

[hive] branch master updated: HIVE-26694: Populate file row position information during vectorized Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 63b6134b970 HIVE-26694: Populate file row position information during vectorized Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)
63b6134b970 is described below

commit 63b6134b97036d6d31924ad0ec323fca2016dace
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Mon Nov 7 14:05:19 2022 +0100

    HIVE-26694: Populate file row position information during vectorized Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)
---
 .../iceberg/mr/hive/vector/HiveBatchContext.java   | 71 ++++++++++++++++++++++
 ...owBatchIterator.java => HiveBatchIterator.java} | 15 +++--
 .../vector/HiveIcebergVectorizedRecordReader.java  |  3 +-
 .../mr/hive/vector/HiveVectorizedReader.java       | 16 ++---
 .../io/RowPositionAwareVectorizedRecordReader.java | 30 +++++++++
 .../hive/ql/io/orc/VectorizedOrcInputFormat.java   |  8 ++-
 .../vector/VectorizedParquetRecordReader.java      | 31 +++++++++-
 .../ql/io/parquet/TestVectorizedColumnReader.java  |  5 ++
 .../io/parquet/VectorizedColumnReaderTestBase.java | 20 ++++++
 9 files changed, 185 insertions(+), 14 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
new file mode 100644
index 00000000000..08d2f732555
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Wraps a Hive VRB and holds corresponding metadata information about it, such as VRB context (e.g. type infos) and
+ * file row offset.
+ */
+public class HiveBatchContext {
+
+  private final VectorizedRowBatch batch;
+  private final VectorizedRowBatchCtx vrbCtx;
+  /**
+   * File row position of the first row in this batch. Long.MIN_VALUE if unknown.
+   */
+  private final long fileRowOffset;
+
+  public HiveBatchContext(VectorizedRowBatch batch, VectorizedRowBatchCtx vrbCtx, long fileRowOffset) {
+    this.batch = batch;
+    this.vrbCtx = vrbCtx;
+    this.fileRowOffset = fileRowOffset;
+  }
+
+  public VectorizedRowBatch getBatch() {
+    return batch;
+  }
+
+  public RowIterator rowIterator() throws IOException {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  // TODO: implement row iterator
+  class RowIterator implements CloseableIterator {
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public boolean hasNext() {
+      return false;
+    }
+
+    @Override
+    public Object next() {
+      return null;
+    }
+  }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
similarity index 83%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
index 1ad5180e58c..22a42f2953e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -30,7 +31,7 @@ import org.apache.iceberg.io.CloseableIterator;
 /**
  * Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers.
  */
-public final class VectorizedRowBatchIterator implements CloseableIterator<VectorizedRowBatch> {
+public final class HiveBatchIterator implements CloseableIterator<HiveBatchContext> {
 
   private final RecordReader<NullWritable, VectorizedRowBatch> recordReader;
   private final NullWritable key;
@@ -39,8 +40,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
   private final int[] partitionColIndices;
   private final Object[] partitionValues;
   private boolean advanced = false;
+  private long rowOffset = Long.MIN_VALUE;
 
-  VectorizedRowBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
+  HiveBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
       int[] partitionColIndices, Object[] partitionValues) {
     this.recordReader = recordReader;
     this.key = recordReader.createKey();
@@ -62,6 +64,11 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
         if (!recordReader.next(key, batch)) {
           batch.size = 0;
         }
+
+        if (recordReader instanceof RowPositionAwareVectorizedRecordReader) {
+          rowOffset = ((RowPositionAwareVectorizedRecordReader) recordReader).getRowNumber();
+        }
+
         // Fill partition values
         if (partitionColIndices != null) {
           for (int i = 0; i < partitionColIndices.length; ++i) {
@@ -86,9 +93,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
   }
 
   @Override
-  public VectorizedRowBatch next() {
+  public HiveBatchContext next() {
     advance();
     advanced = false;
-    return batch;
+    return new HiveBatchContext(batch, vrbCtx, rowOffset);
   }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
index 412f7478090..a70a37c2224 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
@@ -46,7 +46,8 @@ public final class HiveIcebergVectorizedRecordReader extends AbstractMapredIcebe
   public boolean next(Void key, VectorizedRowBatch value) throws IOException {
     try {
       if (innerReader.nextKeyValue()) {
-        VectorizedRowBatch newBatch = (VectorizedRowBatch) innerReader.getCurrentValue();
+        HiveBatchContext currentValue = (HiveBatchContext) innerReader.getCurrentValue();
+        VectorizedRowBatch newBatch = currentValue.getBatch();
         value.cols = newBatch.cols;
         value.endOfFile = newBatch.endOfFile;
         value.numCols = newBatch.numCols;
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 2e0d11e59c5..22131062ddf 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -76,8 +76,9 @@ public class HiveVectorizedReader {
 
   }
 
-  public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
-      TaskAttemptContext context, Expression residual) {
+  public static CloseableIterable<HiveBatchContext> reader(Path path, FileScanTask task,
+      Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression residual) {
+
     // Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
     JobConf job = new JobConf(context.getConfiguration());
     FileFormat format = task.file().format();
@@ -171,8 +172,9 @@ public class HiveVectorizedReader {
         VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);
 
     // If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
+    // TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num)
     if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
-        LlapProxy.getIo() != null) {
+        LlapProxy.getIo() != null && task.deletes().isEmpty()) {
       recordReader = LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, readColumnIds,
           job, start, length, reporter);
     }
@@ -220,14 +222,14 @@ public class HiveVectorizedReader {
     return inputFormat.getRecordReader(split, job, reporter);
   }
 
-  private static <D> CloseableIterable<D> createVectorizedRowBatchIterable(
+  private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
       RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
       Object[] partitionValues) {
 
-    VectorizedRowBatchIterator iterator =
-        new VectorizedRowBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
+    HiveBatchIterator iterator =
+        new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
 
-    return new CloseableIterable<D>() {
+    return new CloseableIterable<HiveBatchContext>() {
 
       @Override
       public CloseableIterator iterator() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java
new file mode 100644
index 00000000000..1775787f187
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+public interface RowPositionAwareVectorizedRecordReader {
+  /**
+   * Returns the row position (in the file) of the first row in the last returned batch.
+   * @return row position
+   * @throws IOException
+   */
+  long getRowNumber() throws IOException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index b7b48efbc74..aea4f37180b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketIdentifier;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.NullWritable;
@@ -55,7 +56,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
     SelfDescribingInputFormatInterface {
 
   static class VectorizedOrcRecordReader
-      implements RecordReader<NullWritable, VectorizedRowBatch> {
+      implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
     private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
     private final long offset;
     private final long length;
@@ -171,6 +172,11 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
     public float getProgress() throws IOException {
       return progress;
     }
+
+    @Override
+    public long getRowNumber() throws IOException {
+      return reader.getRowNumber();
+    }
   }
 
   public VectorizedOrcInputFormat() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index f7b13cb3d6a..c1c0a120686 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.BucketIdentifier;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
@@ -78,8 +79,10 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -90,7 +93,7 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
  * from Apache Spark and Apache Parquet.
  */
 public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
-  implements RecordReader<NullWritable, VectorizedRowBatch> {
+  implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
   public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);
 
   private List<Integer> colsToInclude;
@@ -128,6 +131,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private ZoneId writerTimezone;
   private final BucketIdentifier bucketIdentifier;
 
+  // number of rows returned with the last batch
+  private int lastReturnedRowCount = -1;
+
+  // row number (in the file) of the first row returned in the last batch
+  private long currentRowNumInRowGroup = -1;
+
+  // index of the current rowgroup, incremented after reader.readNextRowGroup() calls
+  private int currentRowGroupIndex = -1;
+
+  private Map<Integer, Long> rowGroupNumToRowPos = new HashMap<>();
+
   // LLAP cache integration
   // TODO: also support fileKey in splits, like OrcSplit does
   private Object cacheKey = null;
@@ -216,7 +230,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       offsets.add(offset);
     }
     blocks = new ArrayList<>();
+    long allRowsInFile = 0;
+    int blockIndex = 0;
     for (BlockMetaData block : parquetMetadata.getBlocks()) {
+      rowGroupNumToRowPos.put(blockIndex++, allRowsInFile);
+      allRowsInFile += block.getRowCount();
       if (offsets.contains(block.getStartingPos())) {
         blocks.add(block);
       }
@@ -365,10 +383,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     return 0;
   }
 
+  @Override
+  public long getRowNumber() throws IOException {
+    return rowGroupNumToRowPos.get(currentRowGroupIndex) + currentRowNumInRowGroup;
+  }
+
   /**
    * Advances to the next batch of rows. Returns false if there are no more.
    */
   private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
+    currentRowNumInRowGroup += lastReturnedRowCount;
+
     columnarBatch.reset();
     if (rowsReturned >= totalRowCount) {
       return false;
@@ -391,6 +416,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
             columnTypesList.get(colsToInclude.get(i)));
       }
     }
+    lastReturnedRowCount = num;
     rowsReturned += num;
     columnarBatch.size = num;
     return true;
@@ -430,6 +456,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       }
     }
 
+    currentRowNumInRowGroup = 0;
+    currentRowGroupIndex++;
+
     totalCountLoadedSoFar += pages.getRowCount();
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index e290e332e7f..0a0867fff9f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -119,6 +119,11 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
     stringReadDecimal(isDictionaryEncoding);
   }
 
+  @Test
+  public void verifyBatchOffsets() throws Exception {
+    super.verifyBatchOffsets();
+  }
+
   private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader {
     public TestVectorizedParquetRecordReader(
         org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) throws IOException {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
index e29b6f0c766..df78eb69758 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
@@ -1067,4 +1067,24 @@ public class VectorizedColumnReaderTestBase {
       reader.close();
     }
   }
+
+  protected void verifyBatchOffsets() throws Exception {
+    Configuration c = new Configuration();
+    c.set(IOConstants.COLUMNS, "int64_field");
+    c.set(IOConstants.COLUMNS_TYPES, "bigint");
+    c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+        createTestParquetReader("message test { required int64 int64_field;}", c);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      int batchCount = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        assertEquals(VectorizedRowBatch.DEFAULT_SIZE * batchCount++, reader.getRowNumber());
+      }
+      assertEquals(reader.getRowNumber(), nElements);
+    } finally {
+      reader.close();
+    }
+  }
 }