You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/11/07 13:05:32 UTC
[hive] branch master updated: HIVE-26694: Populate file row position information during vectorized Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 63b6134b970 HIVE-26694: Populate file row position information during vectorized Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)
63b6134b970 is described below
commit 63b6134b97036d6d31924ad0ec323fca2016dace
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Mon Nov 7 14:05:19 2022 +0100
HIVE-26694: Populate file row position information during vectorized Iceberg reads (#3732) (Adam Szita, reviewed by Laszlo Pinter)
---
.../iceberg/mr/hive/vector/HiveBatchContext.java | 71 ++++++++++++++++++++++
...owBatchIterator.java => HiveBatchIterator.java} | 15 +++--
.../vector/HiveIcebergVectorizedRecordReader.java | 3 +-
.../mr/hive/vector/HiveVectorizedReader.java | 16 ++---
.../io/RowPositionAwareVectorizedRecordReader.java | 30 +++++++++
.../hive/ql/io/orc/VectorizedOrcInputFormat.java | 8 ++-
.../vector/VectorizedParquetRecordReader.java | 31 +++++++++-
.../ql/io/parquet/TestVectorizedColumnReader.java | 5 ++
.../io/parquet/VectorizedColumnReaderTestBase.java | 20 ++++++
9 files changed, 185 insertions(+), 14 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
new file mode 100644
index 00000000000..08d2f732555
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Wraps a Hive VRB and holds corresponding metadata information about it, such as VRB context (e.g. type infos) and
+ * file row offset.
+ */
+public class HiveBatchContext {
+
+ private final VectorizedRowBatch batch;
+ private final VectorizedRowBatchCtx vrbCtx;
+ /**
+ * File row position of the first row in this batch. Long.MIN_VALUE if unknown.
+ */
+ private final long fileRowOffset;
+
+ public HiveBatchContext(VectorizedRowBatch batch, VectorizedRowBatchCtx vrbCtx, long fileRowOffset) {
+ this.batch = batch;
+ this.vrbCtx = vrbCtx;
+ this.fileRowOffset = fileRowOffset;
+ }
+
+ public VectorizedRowBatch getBatch() {
+ return batch;
+ }
+
+ public RowIterator rowIterator() throws IOException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ // TODO: implement row iterator
+ class RowIterator implements CloseableIterator {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ return null;
+ }
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
similarity index 83%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
index 1ad5180e58c..22a42f2953e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/VectorizedRowBatchIterator.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -30,7 +31,7 @@ import org.apache.iceberg.io.CloseableIterator;
/**
* Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers.
*/
-public final class VectorizedRowBatchIterator implements CloseableIterator<VectorizedRowBatch> {
+public final class HiveBatchIterator implements CloseableIterator<HiveBatchContext> {
private final RecordReader<NullWritable, VectorizedRowBatch> recordReader;
private final NullWritable key;
@@ -39,8 +40,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
private final int[] partitionColIndices;
private final Object[] partitionValues;
private boolean advanced = false;
+ private long rowOffset = Long.MIN_VALUE;
- VectorizedRowBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
+ HiveBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
int[] partitionColIndices, Object[] partitionValues) {
this.recordReader = recordReader;
this.key = recordReader.createKey();
@@ -62,6 +64,11 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
if (!recordReader.next(key, batch)) {
batch.size = 0;
}
+
+ if (recordReader instanceof RowPositionAwareVectorizedRecordReader) {
+ rowOffset = ((RowPositionAwareVectorizedRecordReader) recordReader).getRowNumber();
+ }
+
// Fill partition values
if (partitionColIndices != null) {
for (int i = 0; i < partitionColIndices.length; ++i) {
@@ -86,9 +93,9 @@ public final class VectorizedRowBatchIterator implements CloseableIterator<Vecto
}
@Override
- public VectorizedRowBatch next() {
+ public HiveBatchContext next() {
advance();
advanced = false;
- return batch;
+ return new HiveBatchContext(batch, vrbCtx, rowOffset);
}
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
index 412f7478090..a70a37c2224 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveIcebergVectorizedRecordReader.java
@@ -46,7 +46,8 @@ public final class HiveIcebergVectorizedRecordReader extends AbstractMapredIcebe
public boolean next(Void key, VectorizedRowBatch value) throws IOException {
try {
if (innerReader.nextKeyValue()) {
- VectorizedRowBatch newBatch = (VectorizedRowBatch) innerReader.getCurrentValue();
+ HiveBatchContext currentValue = (HiveBatchContext) innerReader.getCurrentValue();
+ VectorizedRowBatch newBatch = currentValue.getBatch();
value.cols = newBatch.cols;
value.endOfFile = newBatch.endOfFile;
value.numCols = newBatch.numCols;
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 2e0d11e59c5..22131062ddf 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -76,8 +76,9 @@ public class HiveVectorizedReader {
}
- public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
- TaskAttemptContext context, Expression residual) {
+ public static CloseableIterable<HiveBatchContext> reader(Path path, FileScanTask task,
+ Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression residual) {
+
// Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
JobConf job = new JobConf(context.getConfiguration());
FileFormat format = task.file().format();
@@ -171,8 +172,9 @@ public class HiveVectorizedReader {
VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);
// If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
+ // TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num)
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
- LlapProxy.getIo() != null) {
+ LlapProxy.getIo() != null && task.deletes().isEmpty()) {
recordReader = LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, readColumnIds,
job, start, length, reporter);
}
@@ -220,14 +222,14 @@ public class HiveVectorizedReader {
return inputFormat.getRecordReader(split, job, reporter);
}
- private static <D> CloseableIterable<D> createVectorizedRowBatchIterable(
+ private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
Object[] partitionValues) {
- VectorizedRowBatchIterator iterator =
- new VectorizedRowBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
+ HiveBatchIterator iterator =
+ new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
- return new CloseableIterable<D>() {
+ return new CloseableIterable<HiveBatchContext>() {
@Override
public CloseableIterator iterator() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java
new file mode 100644
index 00000000000..1775787f187
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RowPositionAwareVectorizedRecordReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+public interface RowPositionAwareVectorizedRecordReader {
+ /**
+ * Returns the row position (in the file) of the first row in the last returned batch.
+ * @return row position
+ * @throws IOException
+ */
+ long getRowNumber() throws IOException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index b7b48efbc74..aea4f37180b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
@@ -55,7 +56,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
SelfDescribingInputFormatInterface {
static class VectorizedOrcRecordReader
- implements RecordReader<NullWritable, VectorizedRowBatch> {
+ implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
private final long offset;
private final long length;
@@ -171,6 +172,11 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
public float getProgress() throws IOException {
return progress;
}
+
+ @Override
+ public long getRowNumber() throws IOException {
+ return reader.getRowNumber();
+ }
}
public VectorizedOrcInputFormat() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index f7b13cb3d6a..c1c0a120686 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
@@ -78,8 +79,10 @@ import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -90,7 +93,7 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
* from Apache Spark and Apache Parquet.
*/
public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
- implements RecordReader<NullWritable, VectorizedRowBatch> {
+ implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);
private List<Integer> colsToInclude;
@@ -128,6 +131,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private ZoneId writerTimezone;
private final BucketIdentifier bucketIdentifier;
+ // number of rows returned with the last batch
+ private int lastReturnedRowCount = -1;
+
+ // row number (in the file) of the first row returned in the last batch
+ private long currentRowNumInRowGroup = -1;
+
+ // index of the current rowgroup, incremented after reader.readNextRowGroup() calls
+ private int currentRowGroupIndex = -1;
+
+ private Map<Integer, Long> rowGroupNumToRowPos = new HashMap<>();
+
// LLAP cache integration
// TODO: also support fileKey in splits, like OrcSplit does
private Object cacheKey = null;
@@ -216,7 +230,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
offsets.add(offset);
}
blocks = new ArrayList<>();
+ long allRowsInFile = 0;
+ int blockIndex = 0;
for (BlockMetaData block : parquetMetadata.getBlocks()) {
+ rowGroupNumToRowPos.put(blockIndex++, allRowsInFile);
+ allRowsInFile += block.getRowCount();
if (offsets.contains(block.getStartingPos())) {
blocks.add(block);
}
@@ -365,10 +383,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
return 0;
}
+ @Override
+ public long getRowNumber() throws IOException {
+ return rowGroupNumToRowPos.get(currentRowGroupIndex) + currentRowNumInRowGroup;
+ }
+
/**
* Advances to the next batch of rows. Returns false if there are no more.
*/
private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
+ currentRowNumInRowGroup += lastReturnedRowCount;
+
columnarBatch.reset();
if (rowsReturned >= totalRowCount) {
return false;
@@ -391,6 +416,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
columnTypesList.get(colsToInclude.get(i)));
}
}
+ lastReturnedRowCount = num;
rowsReturned += num;
columnarBatch.size = num;
return true;
@@ -430,6 +456,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
}
+ currentRowNumInRowGroup = 0;
+ currentRowGroupIndex++;
+
totalCountLoadedSoFar += pages.getRowCount();
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index e290e332e7f..0a0867fff9f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -119,6 +119,11 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
stringReadDecimal(isDictionaryEncoding);
}
+ @Test
+ public void verifyBatchOffsets() throws Exception {
+ super.verifyBatchOffsets();
+ }
+
private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader {
public TestVectorizedParquetRecordReader(
org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) throws IOException {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
index e29b6f0c766..df78eb69758 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
@@ -1067,4 +1067,24 @@ public class VectorizedColumnReaderTestBase {
reader.close();
}
}
+
+ protected void verifyBatchOffsets() throws Exception {
+ Configuration c = new Configuration();
+ c.set(IOConstants.COLUMNS, "int64_field");
+ c.set(IOConstants.COLUMNS_TYPES, "bigint");
+ c.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ c.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createTestParquetReader("message test { required int64 int64_field;}", c);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int batchCount = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ assertEquals(VectorizedRowBatch.DEFAULT_SIZE * batchCount++, reader.getRowNumber());
+ }
+ assertEquals(reader.getRowNumber(), nElements);
+ } finally {
+ reader.close();
+ }
+ }
}