You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/30 05:51:25 UTC

[incubator-seatunnel] branch dev updated: [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e538903ca [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)
e538903ca is described below

commit e538903ca20463db5fccff8c279619c366d48568
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Jan 30 13:51:17 2023 +0800

    [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)
    
    * [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT
---
 .github/workflows/backend.yml                      | 28 ++++++++++++++++++++--
 .../connector-seatunnel-e2e-base/pom.xml           |  6 +++++
 .../engine/e2e/ClusterFaultToleranceIT.java        | 22 +----------------
 .../e2e/ClusterFaultToleranceTwoPipelineIT.java    |  2 --
 .../server/checkpoint/CheckpointCoordinator.java   |  3 ++-
 .../server/task/operation/DeployTaskOperation.java |  2 +-
 .../imap-storage-plugins/imap-storage-file/pom.xml |  5 ----
 .../imap/storage/file/IMapFileStorageFactory.java  |  2 ++
 8 files changed, 38 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index fb81649b5..70df79f9e 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -337,7 +337,7 @@ jobs:
         env:
           MAVEN_OPTS: -Xmx2048m
 
-  engine-and-transform-v2-it:
+  engine-v2-it:
     needs: [ changes, sanity-check ]
     if: needs.changes.outputs.api == 'true'
     runs-on: ${{ matrix.os }}
@@ -357,7 +357,31 @@ jobs:
       - name: run some modules integration test
         if: needs.changes.outputs.api == 'true'
         run: |
-          ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e,:connector-seatunnel-e2e-base -am -Pci
+          ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
+        env:
+          MAVEN_OPTS: -Xmx4096m
+
+  transform-v2-it:
+    needs: [ changes, sanity-check ]
+    if: needs.changes.outputs.api == 'true'
+    runs-on: ${{ matrix.os }}
+    strategy:
+      matrix:
+        java: [ '8', '11' ]
+        os: [ 'ubuntu-latest' ]
+    timeout-minutes: 90
+    steps:
+      - uses: actions/checkout@v2
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v3
+        with:
+          java-version: ${{ matrix.java }}
+          distribution: 'temurin'
+          cache: 'maven'
+      - name: run some modules integration test
+        if: needs.changes.outputs.api == 'true'
+        run: |
+          ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e -am -Pci
         env:
           MAVEN_OPTS: -Xmx4096m
 
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index 22a3fa6ac..19ee88692 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -51,6 +51,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>imap-storage-file</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index ffcd4452f..269b69524 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -39,7 +39,6 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -54,7 +53,6 @@ import java.util.concurrent.TimeUnit;
  * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
  */
 @Slf4j
-@Disabled
 public class ClusterFaultToleranceIT {
 
     public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
@@ -108,7 +106,6 @@ public class ClusterFaultToleranceIT {
             ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
             CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
             Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> {
                     Thread.sleep(2000);
@@ -613,24 +610,7 @@ public class ClusterFaultToleranceIT {
         SeaTunnelClient engineClient = null;
 
         try {
-            String yaml = "#\n" +
-                "# Licensed to the Apache Software Foundation (ASF) under one or more\n" +
-                "# contributor license agreements.  See the NOTICE file distributed with\n" +
-                "# this work for additional information regarding copyright ownership.\n" +
-                "# The ASF licenses this file to You under the Apache License, Version 2.0\n" +
-                "# (the \"License\"); you may not use this file except in compliance with\n" +
-                "# the License.  You may obtain a copy of the License at\n" +
-                "#\n" +
-                "#    http://www.apache.org/licenses/LICENSE-2.0\n" +
-                "#\n" +
-                "# Unless required by applicable law or agreed to in writing, software\n" +
-                "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
-                "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
-                "# See the License for the specific language governing permissions and\n" +
-                "# limitations under the License.\n" +
-                "#\n" +
-                "\n" +
-                "hazelcast:\n" +
+            String yaml = "hazelcast:\n" +
                 "  cluster-name: seatunnel\n" +
                 "  network:\n" +
                 "    rest-api:\n" +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 9d386a749..aa3286a4c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -38,7 +38,6 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
 
@@ -53,7 +52,6 @@ import java.util.concurrent.TimeUnit;
  * Cluster fault tolerance test. Test the job which have two pipelines can recovery capability and data consistency assurance capability in case of cluster node failure
  */
 @Slf4j
-@Disabled
 public class ClusterFaultToleranceTwoPipelineIT {
 
     public static final String TEST_TEMPLATE_FILE_NAME = "cluster_batch_fake_to_localfile_two_pipeline_template.conf";
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 4d8229377..155b68cb2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -258,11 +258,12 @@ public class CheckpointCoordinator {
     }
 
     protected void restoreCoordinator(boolean alreadyStarted) {
+        LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
         cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
         shutdown = false;
         if (alreadyStarted) {
-            tryTriggerPendingCheckpoint();
             isAllTaskReady = true;
+            tryTriggerPendingCheckpoint();
         } else {
             isAllTaskReady = false;
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 40416f480..336ef59a9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -47,7 +47,7 @@ public class DeployTaskOperation extends Operation implements IdentifiedDataSeri
     public void run() throws Exception {
         SeaTunnelServer server = getService();
         server.getSlotService().getSlotContext(slotProfile)
-            .getTaskExecutionService().deployTask(taskImmutableInformation).get();
+            .getTaskExecutionService().deployTask(taskImmutableInformation);
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
index fba2d5603..0d80f8d17 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
@@ -31,11 +31,6 @@
     <artifactId>imap-storage-file</artifactId>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>imap-storage-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>serializer-protobuf</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java
index 9920d3c3f..dccd52e7b 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java
@@ -26,10 +26,12 @@ import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
 import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory;
 import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
 
+import com.google.auto.service.AutoService;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.Map;
 
+@AutoService(IMapStorageFactory.class)
 public class IMapFileStorageFactory implements IMapStorageFactory {
     @Override
     public String factoryIdentifier() {