You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/12 01:59:07 UTC
[incubator-seatunnel] branch dev updated: [Feature][Shade][typesafe-config] Make Config can be serialized (#4586)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9725a4251 [Feature][Shade][typesafe-config] Make Config can be serialized (#4586)
9725a4251 is described below
commit 9725a425177ecca19aa4ce3f614e57db5f7cadf0
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Fri May 12 09:58:59 2023 +0800
[Feature][Shade][typesafe-config] Make Config can be serialized (#4586)
---------
Co-authored-by: Hisoka <fa...@qq.com>
Co-authored-by: gaojun <ga...@gmail.com>
---
.github/workflows/backend.yml | 6 +--
seatunnel-config/seatunnel-config-base/pom.xml | 1 +
.../shade/com/typesafe/config/ConfigMergeable.java | 28 ++++++++++++
.../org/apache/seatunnel/config/SerializeTest.java | 52 ++++++++++++++++++++++
.../src/test/resources/seatunnel/serialize.conf | 34 ++++++++++++++
.../engine/client/SeaTunnelClientTest.java | 20 ++++++---
.../engine/server/dag/physical/PhysicalVertex.java | 13 ++++++
.../engine/server/dag/physical/SubPlan.java | 4 ++
8 files changed, 149 insertions(+), 9 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index b7eb76964..6359fde52 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -281,14 +281,14 @@ jobs:
- name: run all modules unit test
if: needs.changes.outputs.api == 'true'
run: |
- ./mvnw -B -T 1C clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates
+ ./mvnw -B -T 1 clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates
env:
MAVEN_OPTS: -Xmx4096m
- name: run updated modules unit test
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.ut-modules != ''
run: |
- ./mvnw -B -T 1C clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl ${{needs.changes.outputs.ut-modules}} -am -Pci
+ ./mvnw -B -T 1 clean verify -D"maven.test.skip"=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl ${{needs.changes.outputs.ut-modules}} -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
@@ -366,7 +366,7 @@ jobs:
- name: run seatunnel zeta integration test
if: needs.changes.outputs.api == 'true'
run: |
- ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
+ ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
diff --git a/seatunnel-config/seatunnel-config-base/pom.xml b/seatunnel-config/seatunnel-config-base/pom.xml
index 99c958369..677b1d57e 100644
--- a/seatunnel-config/seatunnel-config-base/pom.xml
+++ b/seatunnel-config/seatunnel-config-base/pom.xml
@@ -67,6 +67,7 @@
<exclude>META-INF/MANIFEST.MF</exclude>
<exclude>META-INF/NOTICE</exclude>
<exclude>com/typesafe/config/ConfigParseOptions.class</exclude>
+ <exclude>com/typesafe/config/ConfigMergeable.class</exclude>
<exclude>com/typesafe/config/impl/ConfigParser.class</exclude>
<exclude>com/typesafe/config/impl/ConfigNodePath.class</exclude>
<exclude>com/typesafe/config/impl/PathParser.class</exclude>
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java
new file mode 100644
index 000000000..e1855c0c3
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigMergeable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.shade.com.typesafe.config;
+
+import java.io.Serializable;
+
+/**
+ * Copy from {@link com.typesafe.config.ConfigMergeable}, in order to make the {@link Config} can be
+ * serialized
+ */
+public interface ConfigMergeable extends Serializable {
+ ConfigMergeable withFallback(ConfigMergeable configMergeable);
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java
new file mode 100644
index 000000000..457908ea9
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/SerializeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.config.utils.FileUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/** Test if {@link Config} can be serialized. */
+public class SerializeTest {
+
+ @Test
+ void testSerialize(@TempDir Path tempDir)
+ throws URISyntaxException, IOException, ClassNotFoundException {
+ Config config =
+ ConfigFactory.parseFile(
+ FileUtils.getFileFromResources("/seatunnel/serialize.conf"));
+ Path path = tempDir.resolve("test.config.ser");
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path));
+ objectOutputStream.writeObject(config);
+ objectOutputStream.close();
+ ObjectInputStream in = new ObjectInputStream(Files.newInputStream(path));
+ in.readObject();
+ in.close();
+ }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf
new file mode 100644
index 000000000..39ebfe9fc
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/test/resources/seatunnel/serialize.conf
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+env {
+ job.mode = BATCH
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ }
+}
+
+sink {
+ Console {}
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index a15dad5e3..369120673 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -273,12 +273,20 @@ public class SeaTunnelClientTest {
await().atMost(180000, TimeUnit.MILLISECONDS)
.untilAsserted(
- () ->
- Assertions.assertTrue(
- jobClient.getJobDetailStatus(jobId).contains("FINISHED")
- && jobClient
- .listJobStatus(true)
- .contains("FINISHED")));
+ () -> {
+ Thread.sleep(1000);
+ System.out.println(
+ "======================job status:"
+ + jobClient.getJobDetailStatus(jobId));
+ System.out.println(
+ "======================list job status:"
+ + jobClient.listJobStatus(true));
+ Assertions.assertTrue(
+ jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+ && jobClient
+ .listJobStatus(true)
+ .contains("FINISHED"));
+ });
// Finished
JobDAGInfo jobInfo = jobClient.getJobInfo(jobId);
Assertions.assertTrue(
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 24ed0340a..362e2074f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -358,6 +358,10 @@ public class PhysicalVertex {
public boolean updateTaskState(
@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
+ LOGGER.fine(
+ String.format(
+ "Try to update the task %s state from %s to %s",
+ taskFullName, current, targetState));
synchronized (this) {
// consistency check
if (current.isEndState()) {
@@ -397,6 +401,10 @@ public class PhysicalVertex {
taskFullName, current, targetState));
return true;
} else {
+ LOGGER.warning(
+ String.format(
+ "The task %s state in Imap is %s, not equals expected state %s",
+ taskFullName, runningJobStateIMap.get(taskGroupLocation), current));
return false;
}
}
@@ -411,6 +419,11 @@ public class PhysicalVertex {
} else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
noticeTaskExecutionServiceCancel();
}
+
+ LOGGER.info(
+ String.format(
+ "can not cancel task %s because it is in state %s ",
+ taskFullName, getExecutionState()));
}
@SuppressWarnings("checkstyle:MagicNumber")
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index d11ad63ee..55a3de226 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -413,6 +413,10 @@ public class SubPlan {
},
executorService);
}
+ LOGGER.info(
+ String.format(
+ "can not cancel task %s because it is in state %s ",
+ task.getTaskFullName(), task.getExecutionState()));
return null;
}