You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/12 01:59:07 UTC

[incubator-seatunnel] branch dev updated: [Feature][Shade][typesafe-config] Make Config can be serialized (#4586)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9725a4251 [Feature][Shade][typesafe-config] Make Config can be serialized (#4586)
9725a4251 is described below

commit 9725a425177ecca19aa4ce3f614e57db5f7cadf0
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Fri May 12 09:58:59 2023 +0800

    [Feature][Shade][typesafe-config] Make Config can be serialized (#4586)
    
    
    
    ---------
    
    Co-authored-by: Hisoka <fa...@qq.com>
    Co-authored-by: gaojun <ga...@gmail.com>
---
 .github/workflows/backend.yml                      |  6 +--
 seatunnel-config/seatunnel-config-base/pom.xml     |  1 +
 .../shade/com/typesafe/config/ConfigMergeable.java | 28 ++++++++++++
 .../org/apache/seatunnel/config/SerializeTest.java | 52 ++++++++++++++++++++++
 .../src/test/resources/seatunnel/serialize.conf    | 34 ++++++++++++++
 .../engine/client/SeaTunnelClientTest.java         | 20 ++++++---
 .../engine/server/dag/physical/PhysicalVertex.java | 13 ++++++
 .../engine/server/dag/physical/SubPlan.java        |  4 ++
 8 files changed, 149 insertions(+), 9 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index b7eb76964..6359fde52 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -281,14 +281,14 @@ jobs:
       - name: run all modules unit test
         if: needs.changes.outputs.api == 'true'
         run: |
-          ./mvnw -B -T 1C clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates
+          ./mvnw -B -T 1 clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates
         env:
           MAVEN_OPTS: -Xmx4096m
 
       - name: run updated modules unit test
         if: needs.changes.outputs.api == 'false' && needs.changes.outputs.ut-modules != ''
         run: |
-          ./mvnw -B -T 1C clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl ${{needs.changes.outputs.ut-modules}} -am -Pci
+          ./mvnw -B -T 1 clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl ${{needs.changes.outputs.ut-modules}} -am -Pci
         env:
           MAVEN_OPTS: -Xmx4096m
 
@@ -366,7 +366,7 @@ jobs:
       - name: run seatunnel zeta integration test
         if: needs.changes.outputs.api == 'true'
         run: |
-          ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
+          ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
         env:
           MAVEN_OPTS: -Xmx4096m
 
diff --git a/seatunnel-config/seatunnel-config-base/pom.xml b/seatunnel-config/seatunnel-config-base/pom.xml
index 99c958369..677b1d57e 100644
--- a/seatunnel-config/seatunnel-config-base/pom.xml
+++ b/seatunnel-config/seatunnel-config-base/pom.xml
@@ -67,6 +67,7 @@
                                 <exclude>META-INF/MANIFEST.MF</exclude>
                                 <exclude>META-INF/NOTICE</exclude>
                                 <exclude>com/typesafe/config/ConfigParseOptions.class</exclude>
+                                <exclude>com/typesafe/config/ConfigMergeable.class</exclude>
                                 <exclude>com/typesafe/config/impl/ConfigParser.class</exclude>
                                 <exclude>com/typesafe/config/impl/ConfigNodePath.class</exclude>
                                 <exclude>com/typesafe/config/impl/PathParser.class</exclude>
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java
new file mode 100644
index 000000000..e1855c0c3
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config;
+
+import java.io.Serializable;
+
+/**
+ * Copy from {@link com.typesafe.config.ConfigMergeable}, in order to make the {@link Config} can be
+ * serialized
+ */
+public interface ConfigMergeable extends Serializable {
+    ConfigMergeable withFallback(ConfigMergeable configMergeable);
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java
new file mode 100644
index 000000000..457908ea9
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.config.utils.FileUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/** Test if {@link Config} can be serialized. */
+public class SerializeTest {
+
+    @Test
+    void testSerialize(@TempDir Path tempDir)
+            throws URISyntaxException, IOException, ClassNotFoundException {
+        Config config =
+                ConfigFactory.parseFile(
+                        FileUtils.getFileFromResources("/seatunnel/serialize.conf"));
+        Path path = tempDir.resolve("test.config.ser");
+        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path));
+        objectOutputStream.writeObject(config);
+        objectOutputStream.close();
+        ObjectInputStream in = new ObjectInputStream(Files.newInputStream(path));
+        in.readObject();
+        in.close();
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf
new file mode 100644
index 000000000..39ebfe9fc
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+env {
+  job.mode = BATCH
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+}
+
+sink {
+  Console {}
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index a15dad5e3..369120673 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -273,12 +273,20 @@ public class SeaTunnelClientTest {
 
             await().atMost(180000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
-                            () ->
-                                    Assertions.assertTrue(
-                                            jobClient.getJobDetailStatus(jobId).contains("FINISHED")
-                                                    && jobClient
-                                                            .listJobStatus(true)
-                                                            .contains("FINISHED")));
+                            () -> {
+                                Thread.sleep(1000);
+                                System.out.println(
+                                        "======================job status:"
+                                                + jobClient.getJobDetailStatus(jobId));
+                                System.out.println(
+                                        "======================list job status:"
+                                                + jobClient.listJobStatus(true));
+                                Assertions.assertTrue(
+                                        jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+                                                && jobClient
+                                                        .listJobStatus(true)
+                                                        .contains("FINISHED"));
+                            });
             // Finished
             JobDAGInfo jobInfo = jobClient.getJobInfo(jobId);
             Assertions.assertTrue(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 24ed0340a..362e2074f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -358,6 +358,10 @@ public class PhysicalVertex {
 
     public boolean updateTaskState(
             @NonNull ExecutionState current, @NonNull ExecutionState targetState) {
+        LOGGER.fine(
+                String.format(
+                        "Try to update the task %s state from %s to %s",
+                        taskFullName, current, targetState));
         synchronized (this) {
             // consistency check
             if (current.isEndState()) {
@@ -397,6 +401,10 @@ public class PhysicalVertex {
                                 taskFullName, current, targetState));
                 return true;
             } else {
+                LOGGER.warning(
+                        String.format(
+                                "The task %s state in Imap is %s, not equals expected state %s",
+                                taskFullName, runningJobStateIMap.get(taskGroupLocation), current));
                 return false;
             }
         }
@@ -411,6 +419,11 @@ public class PhysicalVertex {
         } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
             noticeTaskExecutionServiceCancel();
         }
+
+        LOGGER.info(
+                String.format(
+                        "can not cancel task %s because it is in state %s ",
+                        taskFullName, getExecutionState()));
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index d11ad63ee..55a3de226 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -413,6 +413,10 @@ public class SubPlan {
                     },
                     executorService);
         }
+        LOGGER.info(
+                String.format(
+                        "can not cancel task %s because it is in state %s ",
+                        task.getTaskFullName(), task.getExecutionState()));
         return null;
     }