You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/02/06 18:14:25 UTC
[iceberg] branch master updated: Flink 1.14: Add ArrayBatchRecords and RecordAndPosition (#3865)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 333414b Flink 1.14: Add ArrayBatchRecords and RecordAndPosition (#3865)
333414b is described below
commit 333414bfb08180955dcbc24013acc1e8eb89e0c5
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Sun Feb 6 10:14:01 2022 -0800
Flink 1.14: Add ArrayBatchRecords and RecordAndPosition (#3865)
---
.../flink/source/reader/ArrayBatchRecords.java | 159 +++++++++++++++++++++
.../flink/source/reader/RecordAndPosition.java | 87 +++++++++++
.../flink/source/reader/TestArrayBatchRecords.java | 63 ++++++++
3 files changed, 309 insertions(+)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayBatchRecords.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayBatchRecords.java
new file mode 100644
index 0000000..f4e23a0
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayBatchRecords.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * {@link RecordsWithSplitIds} is used to pass a batch of records from fetcher to source reader.
+ * Batching is to improve the efficiency for records handover.
+ *
+ * {@link RecordsWithSplitIds} interface can encapsulate batches from multiple splits.
+ * This is the case for Kafka source where fetchers can retrieve records from multiple
+ * Kafka partitions at the same time.
+ *
+ * For file-based sources like Iceberg, readers always read one split/file at a time.
+ * Hence, we will only have a batch of records for one split here.
+ *
+ * This class uses array to store a batch of records from the same file (with the same fileOffset).
+ */
+class ArrayBatchRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+ @Nullable
+ private String splitId;
+ @Nullable
+ private final Pool.Recycler<T[]> recycler;
+ @Nullable
+ private final T[] records;
+ private final int numberOfRecords;
+ private final Set<String> finishedSplits;
+ private final RecordAndPosition<T> recordAndPosition;
+
+ // point to current read position within the records array
+ private int position;
+
+ private ArrayBatchRecords(
+ @Nullable String splitId, @Nullable Pool.Recycler<T[]> recycler, @Nullable T[] records,
+ int numberOfRecords, int fileOffset, long startingRecordOffset, Set<String> finishedSplits) {
+ Preconditions.checkArgument(numberOfRecords >= 0, "numberOfRecords can't be negative");
+ Preconditions.checkArgument(fileOffset >= 0, "fileOffset can't be negative");
+ Preconditions.checkArgument(startingRecordOffset >= 0, "numberOfRecords can't be negative");
+
+ this.splitId = splitId;
+ this.recycler = recycler;
+ this.records = records;
+ this.numberOfRecords = numberOfRecords;
+ this.finishedSplits = Preconditions.checkNotNull(finishedSplits, "finishedSplits can be empty but not null");
+ this.recordAndPosition = new RecordAndPosition<>();
+
+ recordAndPosition.set(null, fileOffset, startingRecordOffset);
+ this.position = 0;
+ }
+
+ @Nullable
+ @Override
+ public String nextSplit() {
+ String nextSplit = this.splitId;
+ // set the splitId to null to indicate no more splits
+ // this class only contains record for one split
+ this.splitId = null;
+ return nextSplit;
+ }
+
+ @Nullable
+ @Override
+ public RecordAndPosition<T> nextRecordFromSplit() {
+ if (position < numberOfRecords) {
+ recordAndPosition.record(records[position]);
+ position++;
+ return recordAndPosition;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * This method is called when all records from this batch has been emitted.
+ * If recycler is set, it should be called to return the records array back to pool.
+ */
+ @Override
+ public void recycle() {
+ if (recycler != null) {
+ recycler.recycle(records);
+ }
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+
+ @VisibleForTesting
+ T[] records() {
+ return records;
+ }
+
+ @VisibleForTesting
+ int numberOfRecords() {
+ return numberOfRecords;
+ }
+
+ /**
+ * Create a ArrayBatchRecords backed up an array with records from the same file
+ *
+ * @param splitId Iceberg source only read from one split a time.
+ * We never have multiple records from multiple splits.
+ * @param recycler Because {@link DataIterator} with {@link RowData} returns an iterator of reused RowData object,
+ * we need to clone RowData eagerly when constructing a batch of records.
+ * We can use object pool to reuse the RowData array object which can be expensive to create.
+ * This recycler can be provided to recycle the array object back to pool after read is exhausted.
+ * If the {@link DataIterator} returns an iterator of non-reused objects,
+ * we don't need to clone objects. It is cheap to just create the batch array.
+ * Hence, we don't need object pool and recycler can be set to null.
+ * @param records an array (maybe reused) holding a batch of records
+ * @param numberOfRecords actual number of records in the array
+ * @param fileOffset fileOffset for all records in this batch
+ * @param startingRecordOffset starting recordOffset
+ * @param <T> record type
+ */
+ public static <T> ArrayBatchRecords<T> forRecords(
+ String splitId, Pool.Recycler<T[]> recycler, T[] records, int numberOfRecords,
+ int fileOffset, long startingRecordOffset) {
+ return new ArrayBatchRecords<>(splitId, recycler, records, numberOfRecords,
+ fileOffset, startingRecordOffset, Collections.emptySet());
+ }
+
+ /**
+ * Create ab ArrayBatchRecords with only finished split id
+ *
+ * @param splitId for the split that is just exhausted
+ */
+ public static <T> ArrayBatchRecords<T> finishedSplit(String splitId) {
+ return new ArrayBatchRecords<>(null, null, null,
+ 0, 0, 0, Collections.singleton(splitId));
+ }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java
new file mode 100644
index 0000000..39b097f
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A record along with the reader position to be stored in the checkpoint.
+ *
+ * <p>The position defines the point in the reader AFTER the record. Record processing and updating
+ * checkpointed state happens atomically. The position points to where the reader should resume
+ * after this record is processed.
+ *
+ * <p>This mutable object is useful in cases where only one instance of a {@code RecordAndPosition}
+ * is needed at a time. Then the same instance of RecordAndPosition can be reused.
+ */
+@Internal
+public class RecordAndPosition<T> {
+ private T record;
+ private int fileOffset;
+ private long recordOffset;
+
+ public RecordAndPosition(T record, int fileOffset, long recordOffset) {
+ this.record = record;
+ this.fileOffset = fileOffset;
+ this.recordOffset = recordOffset;
+ }
+
+ public RecordAndPosition() {
+ }
+
+ // ------------------------------------------------------------------------
+
+ public T record() {
+ return record;
+ }
+
+ public int fileOffset() {
+ return fileOffset;
+ }
+
+ public long recordOffset() {
+ return recordOffset;
+ }
+
+ /** Updates the record and position in this object. */
+ public void set(T newRecord, int newFileOffset, long newRecordOffset) {
+ this.record = newRecord;
+ this.fileOffset = newFileOffset;
+ this.recordOffset = newRecordOffset;
+ }
+
+ /** Sets the position without setting a record. */
+ public void position(int newFileOffset, long newRecordOffset) {
+ this.fileOffset = newFileOffset;
+ this.recordOffset = newRecordOffset;
+ }
+
+ /** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */
+ public void record(T nextRecord) {
+ this.record = nextRecord;
+ this.recordOffset++;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s @ %d + %d", record, fileOffset, recordOffset);
+ }
+
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayBatchRecords.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayBatchRecords.java
new file mode 100644
index 0000000..4e46f33
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayBatchRecords.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestArrayBatchRecords {
+
+ @Test
+ public void testFullRange() {
+ String[] elements = new String[]{"0", "1", "2", "3"};
+ testArray(elements, elements.length, 2, 119);
+ }
+
+ @Test
+ public void testSubRange() {
+ String[] elements = new String[]{"0", "1", "2", "3"};
+ testArray(elements, 2, 0, 0);
+ }
+
+ private void testArray(String[] elements, int numberOfRecords, int fileOffset, long startingRecordOffset) {
+ String splitId = "iceberg_split_1";
+ AtomicBoolean recycled = new AtomicBoolean();
+
+ ArrayBatchRecords<String> recordsWithSplitIds = ArrayBatchRecords.forRecords(splitId,
+ ignored -> recycled.set(true), elements, numberOfRecords, fileOffset, startingRecordOffset);
+
+ Assert.assertEquals(splitId, recordsWithSplitIds.nextSplit());
+
+ for (int i = 0; i < numberOfRecords; i++) {
+ RecordAndPosition<String> recAndPos = recordsWithSplitIds.nextRecordFromSplit();
+ Assert.assertEquals(elements[i], recAndPos.record());
+ Assert.assertEquals(fileOffset, recAndPos.fileOffset());
+ // recordOffset points to the position after this one
+ Assert.assertEquals(startingRecordOffset + i + 1, recAndPos.recordOffset());
+ }
+
+ Assert.assertNull(recordsWithSplitIds.nextRecordFromSplit());
+ Assert.assertNull(recordsWithSplitIds.nextSplit());
+ recordsWithSplitIds.recycle();
+ Assert.assertTrue(recycled.get());
+ }
+
+}