You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/20 15:55:57 UTC
[iceberg] branch master updated: Flink: Port #4986, FLIP-27 source enumerator to 1.14 module (#5078)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a64417c5ce Flink: Port #4986, FLIP-27 source enumerator to 1.14 module (#5078)
a64417c5ce is described below
commit a64417c5ce6ae9aac2cf6e5f5675d224a9846afd
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Jun 20 08:55:51 2022 -0700
Flink: Port #4986, FLIP-27 source enumerator to 1.14 module (#5078)
---
flink/v1.14/build.gradle | 1 +
.../enumerator/AbstractIcebergEnumerator.java | 1 +
.../enumerator/ContinuousIcebergEnumerator.java | 127 ++++++++++++++++
.../IcebergEnumeratorPositionSerializer.java | 89 +++++++++++
.../source/enumerator/IcebergEnumeratorState.java | 54 +++++++
.../IcebergEnumeratorStateSerializer.java | 115 +++++++++++++++
.../source/enumerator/StaticIcebergEnumerator.java | 2 +-
.../TestContinuousIcebergEnumerator.java | 164 +++++++++++++++++++++
.../TestIcebergEnumeratorStateSerializer.java | 89 +++++++++++
.../enumerator/AbstractIcebergEnumerator.java | 1 +
.../source/enumerator/StaticIcebergEnumerator.java | 2 +-
11 files changed, 643 insertions(+), 2 deletions(-)
diff --git a/flink/v1.14/build.gradle b/flink/v1.14/build.gradle
index a0e01c8a4c..537c977e19 100644
--- a/flink/v1.14/build.gradle
+++ b/flink/v1.14/build.gradle
@@ -60,6 +60,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}
+ testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-core:${flinkVersion}"
testImplementation "org.apache.flink:flink-runtime:${flinkVersion}"
testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
similarity index 99%
copy from flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
copy to flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 98c5fa3ede..8c9be862bf 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -150,6 +150,7 @@ abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourc
if (availableFuture.get() != null) {
return;
}
+
CompletableFuture<Void> future = assigner.isAvailable()
.thenAccept(ignore ->
// Must run assignSplits in coordinator thread
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..e2b94b8c3e
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
+
+ private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+ private final SplitAssigner assigner;
+ private final ScanContext scanContext;
+ private final ContinuousSplitPlanner splitPlanner;
+
+ /**
+ * snapshotId for the last enumerated snapshot. next incremental enumeration
+ * should be based off this as the starting position.
+ */
+ private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;
+
+ public ContinuousIcebergEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+ SplitAssigner assigner,
+ ScanContext scanContext,
+ ContinuousSplitPlanner splitPlanner,
+ @Nullable IcebergEnumeratorState enumState) {
+ super(enumeratorContext, assigner);
+
+ this.enumeratorContext = enumeratorContext;
+ this.assigner = assigner;
+ this.scanContext = scanContext;
+ this.splitPlanner = splitPlanner;
+ this.enumeratorPosition = new AtomicReference<>();
+ if (enumState != null) {
+ this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ enumeratorContext.callAsync(
+ this::discoverSplits,
+ this::processDiscoveredSplits,
+ 0L,
+ scanContext.monitorInterval().toMillis());
+ }
+
+ @Override
+ public void close() throws IOException {
+ splitPlanner.close();
+ super.close();
+ }
+
+ @Override
+ protected boolean shouldWaitForMoreSplits() {
+ return true;
+ }
+
+ @Override
+ public IcebergEnumeratorState snapshotState(long checkpointId) {
+ return new IcebergEnumeratorState(enumeratorPosition.get(), assigner.state());
+ }
+
+ /**
+ * This method is executed in an IO thread pool.
+ */
+ private ContinuousEnumerationResult discoverSplits() {
+ return splitPlanner.planSplits(enumeratorPosition.get());
+ }
+
+ /**
+ * This method is executed in a single coordinator thread.
+ */
+ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
+ if (error == null) {
+ if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
+ // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O thread pool.
+ // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the thread
+ // pool is busy and multiple discovery actions are executed concurrently. Discovery result should
+ // only be accepted if the starting position matches the enumerator position (like compare-and-swap).
+ LOG.info("Skip {} discovered splits because the scan starting position doesn't match " +
+ "the current enumerator position: enumerator position = {}, scan starting position = {}",
+ result.splits().size(), enumeratorPosition.get(), result.fromPosition());
+ } else {
+ assigner.onDiscoveredSplits(result.splits());
+ LOG.info("Added {} splits discovered between ({}, {}] to the assigner",
+ result.splits().size(), result.fromPosition(), result.toPosition());
+ // update the enumerator position even if there is no split discovered
+ // or the toPosition is empty (e.g. for empty table).
+ enumeratorPosition.set(result.toPosition());
+ LOG.info("Update enumerator position to {}", result.toPosition());
+ }
+ } else {
+ LOG.error("Failed to discover new splits", error);
+ }
+ }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
new file mode 100644
index 0000000000..83b230e80e
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+class IcebergEnumeratorPositionSerializer implements SimpleVersionedSerializer<IcebergEnumeratorPosition> {
+
+ public static final IcebergEnumeratorPositionSerializer INSTANCE = new IcebergEnumeratorPositionSerializer();
+
+ private static final int VERSION = 1;
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(128));
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergEnumeratorPosition position) throws IOException {
+ return serializeV1(position);
+ }
+
+ @Override
+ public IcebergEnumeratorPosition deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unknown version: " + version);
+ }
+ }
+
+ private byte[] serializeV1(IcebergEnumeratorPosition position) throws IOException {
+ DataOutputSerializer out = SERIALIZER_CACHE.get();
+ out.writeBoolean(position.snapshotId() != null);
+ if (position.snapshotId() != null) {
+ out.writeLong(position.snapshotId());
+ }
+ out.writeBoolean(position.snapshotTimestampMs() != null);
+ if (position.snapshotTimestampMs() != null) {
+ out.writeLong(position.snapshotTimestampMs());
+ }
+ byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ private IcebergEnumeratorPosition deserializeV1(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ Long snapshotId = null;
+ if (in.readBoolean()) {
+ snapshotId = in.readLong();
+ }
+
+ Long snapshotTimestampMs = null;
+ if (in.readBoolean()) {
+ snapshotTimestampMs = in.readLong();
+ }
+
+ if (snapshotId != null) {
+ return IcebergEnumeratorPosition.of(snapshotId, snapshotTimestampMs);
+ } else {
+ return IcebergEnumeratorPosition.empty();
+ }
+ }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
new file mode 100644
index 0000000000..bd2f44c005
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+
+/**
+ * Enumerator state for checkpointing
+ */
+public class IcebergEnumeratorState implements Serializable {
+ @Nullable
+ private final IcebergEnumeratorPosition lastEnumeratedPosition;
+ private final Collection<IcebergSourceSplitState> pendingSplits;
+
+ public IcebergEnumeratorState(Collection<IcebergSourceSplitState> pendingSplits) {
+ this(null, pendingSplits);
+ }
+
+ public IcebergEnumeratorState(
+ @Nullable IcebergEnumeratorPosition lastEnumeratedPosition,
+ Collection<IcebergSourceSplitState> pendingSplits) {
+ this.lastEnumeratedPosition = lastEnumeratedPosition;
+ this.pendingSplits = pendingSplits;
+ }
+
+ @Nullable
+ public IcebergEnumeratorPosition lastEnumeratedPosition() {
+ return lastEnumeratedPosition;
+ }
+
+ public Collection<IcebergSourceSplitState> pendingSplits() {
+ return pendingSplits;
+ }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..8f020bbe53
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+@Internal
+public class IcebergEnumeratorStateSerializer implements SimpleVersionedSerializer<IcebergEnumeratorState> {
+
+ public static final IcebergEnumeratorStateSerializer INSTANCE = new IcebergEnumeratorStateSerializer();
+
+ private static final int VERSION = 1;
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
+
+ private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE;
+ private final IcebergSourceSplitSerializer splitSerializer = IcebergSourceSplitSerializer.INSTANCE;
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergEnumeratorState enumState) throws IOException {
+ return serializeV1(enumState);
+ }
+
+ @Override
+ public IcebergEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unknown version: " + version);
+ }
+ }
+
+ private byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException {
+ DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+ out.writeBoolean(enumState.lastEnumeratedPosition() != null);
+ if (enumState.lastEnumeratedPosition() != null) {
+ out.writeInt(positionSerializer.getVersion());
+ byte[] positionBytes = positionSerializer.serialize(enumState.lastEnumeratedPosition());
+ out.writeInt(positionBytes.length);
+ out.write(positionBytes);
+ }
+
+ out.writeInt(splitSerializer.getVersion());
+ out.writeInt(enumState.pendingSplits().size());
+ for (IcebergSourceSplitState splitState : enumState.pendingSplits()) {
+ byte[] splitBytes = splitSerializer.serialize(splitState.split());
+ out.writeInt(splitBytes.length);
+ out.write(splitBytes);
+ out.writeUTF(splitState.status().name());
+ }
+
+ byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ private IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+ IcebergEnumeratorPosition enumeratorPosition = null;
+ if (in.readBoolean()) {
+ int version = in.readInt();
+ byte[] positionBytes = new byte[in.readInt()];
+ in.read(positionBytes);
+ enumeratorPosition = positionSerializer.deserialize(version, positionBytes);
+ }
+
+ int splitSerializerVersion = in.readInt();
+ int splitCount = in.readInt();
+ Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayListWithCapacity(splitCount);
+ for (int i = 0; i < splitCount; ++i) {
+ byte[] splitBytes = new byte[in.readInt()];
+ in.read(splitBytes);
+ IcebergSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, splitBytes);
+ String statusName = in.readUTF();
+ pendingSplits.add(new IcebergSourceSplitState(split, IcebergSourceSplitStatus.valueOf(statusName)));
+ }
+ return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
similarity index 98%
copy from flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
copy to flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
index 0f287864be..8d33b6f073 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * One-time split enumeration at the start-up
+ * One-time split enumeration at the start-up for batch execution
*/
@Internal
public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..5e7f926e3a
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestContinuousIcebergEnumerator {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ Collection<IcebergSourceSplitState> pendingSplitsEmpty = enumerator.snapshotState(1).pendingSplits();
+ Assert.assertEquals(0, pendingSplitsEmpty.size());
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ enumeratorContext.triggerAllActions();
+
+ Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+ }
+
+ @Test
+ public void testDiscoverWhenReaderRegistered() throws Exception {
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // register one reader, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2,
+ new SplitRequestEvent());
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ enumeratorContext.triggerAllActions();
+
+ Assert.assertTrue(enumerator.snapshotState(1).pendingSplits().isEmpty());
+ MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+ CoreMatchers.hasItem(splits.get(0)));
+ }
+
+ @Test
+ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext config = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, config, splitPlanner);
+
+ // register one reader, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2,
+ new SplitRequestEvent());
+
+ // remove the reader (like in a failure)
+ enumeratorContext.registeredReaders().remove(2);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ Assert.assertEquals(1, splits.size());
+ splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ enumeratorContext.triggerAllActions();
+
+ Assert.assertFalse(enumeratorContext.getSplitAssignments().containsKey(2));
+ List<String> pendingSplitIds = enumerator.snapshotState(1).pendingSplits().stream()
+ .map(IcebergSourceSplitState::split)
+ .map(IcebergSourceSplit::splitId)
+ .collect(Collectors.toList());
+ Assert.assertEquals(splits.size(), pendingSplitIds.size());
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplitIds.get(0));
+
+ // register the reader again, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2,
+ new SplitRequestEvent());
+
+ Assert.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty());
+ MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+ CoreMatchers.hasItem(splits.get(0)));
+ }
+
+ private static ContinuousIcebergEnumerator createEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> context,
+ ScanContext scanContext,
+ ContinuousSplitPlanner splitPlanner) {
+
+ ContinuousIcebergEnumerator enumerator =
+ new ContinuousIcebergEnumerator(
+ context,
+ new SimpleSplitAssigner(Collections.emptyList()),
+ scanContext,
+ splitPlanner,
+ null);
+ enumerator.start();
+ return enumerator;
+ }
+
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..05abe7bc17
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergEnumeratorStateSerializer {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private final IcebergEnumeratorStateSerializer serializer = IcebergEnumeratorStateSerializer.INSTANCE;
+
+ @Test
+ public void testEmptySnapshotIdAndPendingSplits() throws Exception {
+ IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(Collections.emptyList());
+ byte[] result = serializer.serialize(enumeratorState);
+ IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertEnumeratorStateEquals(enumeratorState, deserialized);
+ }
+
+ @Test
+ public void testSomeSnapshotIdAndEmptyPendingSplits() throws Exception {
+ IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(1L, System.currentTimeMillis());
+ IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, Collections.emptyList());
+ byte[] result = serializer.serialize(enumeratorState);
+ IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertEnumeratorStateEquals(enumeratorState, deserialized);
+ }
+
+ @Test
+ public void testSomeSnapshotIdAndPendingSplits() throws Exception {
+ IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(2L, System.currentTimeMillis());
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 3, 1);
+ Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayList();
+ pendingSplits.add(new IcebergSourceSplitState(splits.get(0), IcebergSourceSplitStatus.UNASSIGNED));
+ pendingSplits.add(new IcebergSourceSplitState(splits.get(1), IcebergSourceSplitStatus.ASSIGNED));
+ pendingSplits.add(new IcebergSourceSplitState(splits.get(2), IcebergSourceSplitStatus.COMPLETED));
+
+ IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, pendingSplits);
+ byte[] result = serializer.serialize(enumeratorState);
+ IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertEnumeratorStateEquals(enumeratorState, deserialized);
+ }
+
+ private void assertEnumeratorStateEquals(IcebergEnumeratorState expected, IcebergEnumeratorState actual) {
+ Assert.assertEquals(expected.lastEnumeratedPosition(), actual.lastEnumeratedPosition());
+ Assert.assertEquals(expected.pendingSplits().size(), actual.pendingSplits().size());
+ Iterator<IcebergSourceSplitState> expectedIterator = expected.pendingSplits().iterator();
+ Iterator<IcebergSourceSplitState> actualIterator = actual.pendingSplits().iterator();
+ for (int i = 0; i < expected.pendingSplits().size(); ++i) {
+ IcebergSourceSplitState expectedSplitState = expectedIterator.next();
+ IcebergSourceSplitState actualSplitState = actualIterator.next();
+ Assert.assertEquals(expectedSplitState.split().splitId(), actualSplitState.split().splitId());
+ Assert.assertEquals(expectedSplitState.split().fileOffset(), actualSplitState.split().fileOffset());
+ Assert.assertEquals(expectedSplitState.split().recordOffset(), actualSplitState.split().recordOffset());
+ Assert.assertEquals(expectedSplitState.status(), actualSplitState.status());
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 98c5fa3ede..8c9be862bf 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -150,6 +150,7 @@ abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourc
if (availableFuture.get() != null) {
return;
}
+
CompletableFuture<Void> future = assigner.isAvailable()
.thenAccept(ignore ->
// Must run assignSplits in coordinator thread
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
index 0f287864be..8d33b6f073 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * One-time split enumeration at the start-up
+ * One-time split enumeration at the start-up for batch execution
*/
@Internal
public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {