You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/21 06:56:44 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 64760eed4 [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)
64760eed4 is described below

commit 64760eed4d532690989042ea37959edddc7749d5
Author: hailin0 <wa...@apache.org>
AuthorDate: Fri Apr 21 14:56:38 2023 +0800

    [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)
    
    * [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error
---
 release-note.md                                                |  1 +
 .../iceberg/source/enumerator/AbstractSplitEnumerator.java     |  4 ++++
 .../source/enumerator/IcebergStreamSplitEnumerator.java        | 10 ++++++----
 3 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/release-note.md b/release-note.md
index f2dec47af..1030df0e6 100644
--- a/release-note.md
+++ b/release-note.md
@@ -70,6 +70,7 @@
 - [Kudu] Fix connector source snapshot state NPE #4027
 - [Maxcompute] Fix some data type parse fail #3894
 - [Doris] Fix content-length header already present #4277
+- [Iceberg] Fix iceberg source stream mode init error #4638
 
 ### Zeta Engine
 - [Checkpoint] Fix Checkpoint Continue Trigger After Job CANCELED #3808
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index d0329b879..199c56eb1 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFil
 
 import org.apache.iceberg.Table;
 
+import lombok.Getter;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -45,6 +46,7 @@ public abstract class AbstractSplitEnumerator
     protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;
 
     protected IcebergTableLoader icebergTableLoader;
+    @Getter private volatile boolean isOpen = false;
 
     public AbstractSplitEnumerator(
             @NonNull SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context,
@@ -59,6 +61,7 @@ public abstract class AbstractSplitEnumerator
     public void open() {
         icebergTableLoader = IcebergTableLoader.create(sourceConfig);
         icebergTableLoader.open();
+        isOpen = true;
     }
 
     @Override
@@ -70,6 +73,7 @@ public abstract class AbstractSplitEnumerator
     @Override
     public void close() throws IOException {
         icebergTableLoader.close();
+        isOpen = false;
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
index b74bf7a9b..59bffc22e 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
@@ -62,11 +62,13 @@ public class IcebergStreamSplitEnumerator extends AbstractSplitEnumerator {
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-        synchronized (this) {
-            if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) {
-                refreshPendingSplits();
+        if (isOpen()) {
+            synchronized (this) {
+                if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) {
+                    refreshPendingSplits();
+                }
+                assignPendingSplits(Collections.singleton(subtaskId));
             }
-            assignPendingSplits(Collections.singleton(subtaskId));
         }
     }