You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/21 06:56:44 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 64760eed4 [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)
64760eed4 is described below
commit 64760eed4d532690989042ea37959edddc7749d5
Author: hailin0 <wa...@apache.org>
AuthorDate: Fri Apr 21 14:56:38 2023 +0800
[Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)
* [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error
---
release-note.md | 1 +
.../iceberg/source/enumerator/AbstractSplitEnumerator.java | 4 ++++
.../source/enumerator/IcebergStreamSplitEnumerator.java | 10 ++++++----
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/release-note.md b/release-note.md
index f2dec47af..1030df0e6 100644
--- a/release-note.md
+++ b/release-note.md
@@ -70,6 +70,7 @@
- [Kudu] Fix connector source snapshot state NPE #4027
- [Maxcompute] Fix some data type parse fail #3894
- [Doris] Fix content-length header already present #4277
+- [Iceberg] Fix iceberg source stream mode init error #4638
### Zeta Engine
- [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index d0329b879..199c56eb1 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFil
import org.apache.iceberg.Table;
+import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -45,6 +46,7 @@ public abstract class AbstractSplitEnumerator
protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;
protected IcebergTableLoader icebergTableLoader;
+ @Getter private volatile boolean isOpen = false;
public AbstractSplitEnumerator(
@NonNull SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context,
@@ -59,6 +61,7 @@ public abstract class AbstractSplitEnumerator
public void open() {
icebergTableLoader = IcebergTableLoader.create(sourceConfig);
icebergTableLoader.open();
+ isOpen = true;
}
@Override
@@ -70,6 +73,7 @@ public abstract class AbstractSplitEnumerator
@Override
public void close() throws IOException {
icebergTableLoader.close();
+ isOpen = false;
}
@Override
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
index b74bf7a9b..59bffc22e 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
@@ -62,11 +62,13 @@ public class IcebergStreamSplitEnumerator extends AbstractSplitEnumerator {
@Override
public void handleSplitRequest(int subtaskId) {
- synchronized (this) {
- if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) {
- refreshPendingSplits();
+ if (isOpen()) {
+ synchronized (this) {
+ if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) {
+ refreshPendingSplits();
+ }
+ assignPendingSplits(Collections.singleton(subtaskId));
}
- assignPendingSplits(Collections.singleton(subtaskId));
}
}