You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/02/06 18:14:25 UTC

[iceberg] branch master updated: Flink 1.14: Add ArrayBatchRecords and RecordAndPosition (#3865)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 333414b  Flink 1.14: Add ArrayBatchRecords and RecordAndPosition (#3865)
333414b is described below

commit 333414bfb08180955dcbc24013acc1e8eb89e0c5
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Sun Feb 6 10:14:01 2022 -0800

    Flink 1.14: Add ArrayBatchRecords and RecordAndPosition (#3865)
---
 .../flink/source/reader/ArrayBatchRecords.java     | 159 +++++++++++++++++++++
 .../flink/source/reader/RecordAndPosition.java     |  87 +++++++++++
 .../flink/source/reader/TestArrayBatchRecords.java |  63 ++++++++
 3 files changed, 309 insertions(+)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayBatchRecords.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayBatchRecords.java
new file mode 100644
index 0000000..f4e23a0
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayBatchRecords.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * {@link RecordsWithSplitIds} is used to pass a batch of records from fetcher to source reader.
+ * Batching is to improve the efficiency for records handover.
+ *
+ * {@link RecordsWithSplitIds} interface can encapsulate batches from multiple splits.
+ * This is the case for Kafka source where fetchers can retrieve records from multiple
+ * Kafka partitions at the same time.
+ *
+ * For file-based sources like Iceberg, readers always read one split/file at a time.
+ * Hence, we will only have a batch of records for one split here.
+ *
+ * This class uses array to store a batch of records from the same file (with the same fileOffset).
+ */
+class ArrayBatchRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+  @Nullable
+  private String splitId;
+  @Nullable
+  private final Pool.Recycler<T[]> recycler;
+  @Nullable
+  private final T[] records;
+  private final int numberOfRecords;
+  private final Set<String> finishedSplits;
+  private final RecordAndPosition<T> recordAndPosition;
+
+  // point to current read position within the records array
+  private int position;
+
+  private ArrayBatchRecords(
+      @Nullable String splitId, @Nullable Pool.Recycler<T[]> recycler, @Nullable T[] records,
+      int numberOfRecords, int fileOffset, long startingRecordOffset, Set<String> finishedSplits) {
+    Preconditions.checkArgument(numberOfRecords >= 0, "numberOfRecords can't be negative");
+    Preconditions.checkArgument(fileOffset >= 0, "fileOffset can't be negative");
+    Preconditions.checkArgument(startingRecordOffset >= 0, "numberOfRecords can't be negative");
+
+    this.splitId = splitId;
+    this.recycler = recycler;
+    this.records = records;
+    this.numberOfRecords = numberOfRecords;
+    this.finishedSplits = Preconditions.checkNotNull(finishedSplits, "finishedSplits can be empty but not null");
+    this.recordAndPosition = new RecordAndPosition<>();
+
+    recordAndPosition.set(null, fileOffset, startingRecordOffset);
+    this.position = 0;
+  }
+
+  @Nullable
+  @Override
+  public String nextSplit() {
+    String nextSplit = this.splitId;
+    // set the splitId to null to indicate no more splits
+    // this class only contains record for one split
+    this.splitId = null;
+    return nextSplit;
+  }
+
+  @Nullable
+  @Override
+  public RecordAndPosition<T> nextRecordFromSplit() {
+    if (position < numberOfRecords) {
+      recordAndPosition.record(records[position]);
+      position++;
+      return recordAndPosition;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * This method is called when all records from this batch has been emitted.
+   * If recycler is set, it should be called to return the records array back to pool.
+   */
+  @Override
+  public void recycle() {
+    if (recycler != null) {
+      recycler.recycle(records);
+    }
+  }
+
+  @Override
+  public Set<String> finishedSplits() {
+    return finishedSplits;
+  }
+
+  @VisibleForTesting
+  T[] records() {
+    return records;
+  }
+
+  @VisibleForTesting
+  int numberOfRecords() {
+    return numberOfRecords;
+  }
+
+  /**
+   * Create a ArrayBatchRecords backed up an array with records from the same file
+   *
+   * @param splitId Iceberg source only read from one split a time.
+   *                We never have multiple records from multiple splits.
+   * @param recycler Because {@link DataIterator} with {@link RowData} returns an iterator of reused RowData object,
+   *                 we need to clone RowData eagerly when constructing a batch of records.
+   *                 We can use object pool to reuse the RowData array object which can be expensive to create.
+   *                 This recycler can be provided to recycle the array object back to pool after read is exhausted.
+   *                 If the {@link DataIterator} returns an iterator of non-reused objects,
+   *                 we don't need to clone objects. It is cheap to just create the batch array.
+   *                 Hence, we don't need object pool and recycler can be set to null.
+   * @param records an array (maybe reused) holding a batch of records
+   * @param numberOfRecords actual number of records in the array
+   * @param fileOffset fileOffset for all records in this batch
+   * @param startingRecordOffset starting recordOffset
+   * @param <T> record type
+   */
+  public static <T> ArrayBatchRecords<T> forRecords(
+      String splitId, Pool.Recycler<T[]> recycler, T[] records, int numberOfRecords,
+      int fileOffset, long startingRecordOffset) {
+    return new ArrayBatchRecords<>(splitId, recycler, records, numberOfRecords,
+        fileOffset, startingRecordOffset, Collections.emptySet());
+  }
+
+  /**
+   * Create ab ArrayBatchRecords with only finished split id
+   *
+   * @param splitId for the split that is just exhausted
+   */
+  public static <T> ArrayBatchRecords<T> finishedSplit(String splitId) {
+    return new ArrayBatchRecords<>(null, null, null,
+        0, 0, 0, Collections.singleton(splitId));
+  }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java
new file mode 100644
index 0000000..39b097f
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordAndPosition.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A record along with the reader position to be stored in the checkpoint.
+ *
+ * <p>The position defines the point in the reader AFTER the record. Record processing and updating
+ * checkpointed state happens atomically. The position points to where the reader should resume
+ * after this record is processed.
+ *
+ * <p>This mutable object is useful in cases where only one instance of a {@code RecordAndPosition}
+ * is needed at a time. Then the same instance of RecordAndPosition can be reused.
+ */
+@Internal
+public class RecordAndPosition<T> {
+  private T record;
+  private int fileOffset;
+  private long recordOffset;
+
+  public RecordAndPosition(T record, int fileOffset, long recordOffset) {
+    this.record = record;
+    this.fileOffset = fileOffset;
+    this.recordOffset = recordOffset;
+  }
+
+  public RecordAndPosition() {
+  }
+
+  // ------------------------------------------------------------------------
+
+  public T record() {
+    return record;
+  }
+
+  public int fileOffset() {
+    return fileOffset;
+  }
+
+  public long recordOffset() {
+    return recordOffset;
+  }
+
+  /** Updates the record and position in this object. */
+  public void set(T newRecord, int newFileOffset, long newRecordOffset) {
+    this.record = newRecord;
+    this.fileOffset = newFileOffset;
+    this.recordOffset = newRecordOffset;
+  }
+
+  /** Sets the position without setting a record. */
+  public void position(int newFileOffset, long newRecordOffset) {
+    this.fileOffset = newFileOffset;
+    this.recordOffset = newRecordOffset;
+  }
+
+  /** Sets the next record of a sequence. This increments the {@code recordOffset} by one. */
+  public void record(T nextRecord) {
+    this.record = nextRecord;
+    this.recordOffset++;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s @ %d + %d", record, fileOffset, recordOffset);
+  }
+
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayBatchRecords.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayBatchRecords.java
new file mode 100644
index 0000000..4e46f33
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestArrayBatchRecords.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.reader;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestArrayBatchRecords {
+
+  @Test
+  public void testFullRange() {
+    String[] elements = new String[]{"0", "1", "2", "3"};
+    testArray(elements, elements.length, 2, 119);
+  }
+
+  @Test
+  public void testSubRange() {
+    String[] elements = new String[]{"0", "1", "2", "3"};
+    testArray(elements, 2, 0, 0);
+  }
+
+  private void testArray(String[] elements, int numberOfRecords, int fileOffset, long startingRecordOffset) {
+    String splitId = "iceberg_split_1";
+    AtomicBoolean recycled = new AtomicBoolean();
+
+    ArrayBatchRecords<String> recordsWithSplitIds = ArrayBatchRecords.forRecords(splitId,
+        ignored -> recycled.set(true), elements, numberOfRecords, fileOffset, startingRecordOffset);
+
+    Assert.assertEquals(splitId, recordsWithSplitIds.nextSplit());
+
+    for (int i = 0; i < numberOfRecords; i++) {
+      RecordAndPosition<String> recAndPos = recordsWithSplitIds.nextRecordFromSplit();
+      Assert.assertEquals(elements[i], recAndPos.record());
+      Assert.assertEquals(fileOffset, recAndPos.fileOffset());
+      // recordOffset points to the position after this one
+      Assert.assertEquals(startingRecordOffset + i + 1, recAndPos.recordOffset());
+    }
+
+    Assert.assertNull(recordsWithSplitIds.nextRecordFromSplit());
+    Assert.assertNull(recordsWithSplitIds.nextSplit());
+    recordsWithSplitIds.recycle();
+    Assert.assertTrue(recycled.get());
+  }
+
+}