You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/30 05:51:25 UTC
[incubator-seatunnel] branch dev updated: [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e538903ca [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)
e538903ca is described below
commit e538903ca20463db5fccff8c279619c366d48568
Author: Hisoka <fa...@qq.com>
AuthorDate: Mon Jan 30 13:51:17 2023 +0800
[Improve] [E2E] Change E2E To support ClusterFaultToleranceIT (#3976)
* [Improve] [E2E] Change E2E To support ClusterFaultToleranceIT
---
.github/workflows/backend.yml | 28 ++++++++++++++++++++--
.../connector-seatunnel-e2e-base/pom.xml | 6 +++++
.../engine/e2e/ClusterFaultToleranceIT.java | 22 +----------------
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 2 --
.../server/checkpoint/CheckpointCoordinator.java | 3 ++-
.../server/task/operation/DeployTaskOperation.java | 2 +-
.../imap-storage-plugins/imap-storage-file/pom.xml | 5 ----
.../imap/storage/file/IMapFileStorageFactory.java | 2 ++
8 files changed, 38 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index fb81649b5..70df79f9e 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -337,7 +337,7 @@ jobs:
env:
MAVEN_OPTS: -Xmx2048m
- engine-and-transform-v2-it:
+ engine-v2-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
@@ -357,7 +357,31 @@ jobs:
- name: run some modules integration test
if: needs.changes.outputs.api == 'true'
run: |
- ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e,:connector-seatunnel-e2e-base -am -Pci
+ ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
+ env:
+ MAVEN_OPTS: -Xmx4096m
+
+ transform-v2-it:
+ needs: [ changes, sanity-check ]
+ if: needs.changes.outputs.api == 'true'
+ runs-on: ${{ matrix.os }}
+ strategy:
+ matrix:
+ java: [ '8', '11' ]
+ os: [ 'ubuntu-latest' ]
+ timeout-minutes: 90
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v3
+ with:
+ java-version: ${{ matrix.java }}
+ distribution: 'temurin'
+ cache: 'maven'
+ - name: run some modules integration test
+ if: needs.changes.outputs.api == 'true'
+ run: |
+ ./mvnw -T 1C -B verify -DskipUT=true -DskipIT=false -D"checkstyle.skip"=true -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :seatunnel-transforms-v2-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index 22a3fa6ac..19ee88692 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -51,6 +51,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>imap-storage-file</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index ffcd4452f..269b69524 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -39,7 +39,6 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
@@ -54,7 +53,6 @@ import java.util.concurrent.TimeUnit;
* Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
*/
@Slf4j
-@Disabled
public class ClusterFaultToleranceIT {
public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
@@ -108,7 +106,6 @@ public class ClusterFaultToleranceIT {
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-
Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Thread.sleep(2000);
@@ -613,24 +610,7 @@ public class ClusterFaultToleranceIT {
SeaTunnelClient engineClient = null;
try {
- String yaml = "#\n" +
- "# Licensed to the Apache Software Foundation (ASF) under one or more\n" +
- "# contributor license agreements. See the NOTICE file distributed with\n" +
- "# this work for additional information regarding copyright ownership.\n" +
- "# The ASF licenses this file to You under the Apache License, Version 2.0\n" +
- "# (the \"License\"); you may not use this file except in compliance with\n" +
- "# the License. You may obtain a copy of the License at\n" +
- "#\n" +
- "# http://www.apache.org/licenses/LICENSE-2.0\n" +
- "#\n" +
- "# Unless required by applicable law or agreed to in writing, software\n" +
- "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
- "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
- "# See the License for the specific language governing permissions and\n" +
- "# limitations under the License.\n" +
- "#\n" +
- "\n" +
- "hazelcast:\n" +
+ String yaml = "hazelcast:\n" +
" cluster-name: seatunnel\n" +
" network:\n" +
" rest-api:\n" +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 9d386a749..aa3286a4c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -38,7 +38,6 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
@@ -53,7 +52,6 @@ import java.util.concurrent.TimeUnit;
* Cluster fault tolerance test. Test the job which have two pipelines can recovery capability and data consistency assurance capability in case of cluster node failure
*/
@Slf4j
-@Disabled
public class ClusterFaultToleranceTwoPipelineIT {
public static final String TEST_TEMPLATE_FILE_NAME = "cluster_batch_fake_to_localfile_two_pipeline_template.conf";
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 4d8229377..155b68cb2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -258,11 +258,12 @@ public class CheckpointCoordinator {
}
protected void restoreCoordinator(boolean alreadyStarted) {
+ LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
shutdown = false;
if (alreadyStarted) {
- tryTriggerPendingCheckpoint();
isAllTaskReady = true;
+ tryTriggerPendingCheckpoint();
} else {
isAllTaskReady = false;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 40416f480..336ef59a9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -47,7 +47,7 @@ public class DeployTaskOperation extends Operation implements IdentifiedDataSeri
public void run() throws Exception {
SeaTunnelServer server = getService();
server.getSlotService().getSlotContext(slotProfile)
- .getTaskExecutionService().deployTask(taskImmutableInformation).get();
+ .getTaskExecutionService().deployTask(taskImmutableInformation);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
index fba2d5603..0d80f8d17 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
@@ -31,11 +31,6 @@
<artifactId>imap-storage-file</artifactId>
<dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>imap-storage-api</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java
index 9920d3c3f..dccd52e7b 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java
@@ -26,10 +26,12 @@ import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory;
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+import com.google.auto.service.AutoService;
import org.apache.hadoop.conf.Configuration;
import java.util.Map;
+@AutoService(IMapStorageFactory.class)
public class IMapFileStorageFactory implements IMapStorageFactory {
@Override
public String factoryIdentifier() {