You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/04 15:16:48 UTC

[flink] branch master updated (70a3f66 -> 6f05439)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 70a3f66  [FLINK-25261][state] Truncate changelog upon materialization
     new fc44a6d  [FLINK-25340][testutils] Remove JM temp files before stopping container to avoid file permission issues
     new 55a0bad  [FLINK-25263][e2e/kafka] Re-enable temporarily disabled KafkaSourceE2ECase
     new 6f05439  [hotfix][connector/kafka] Add Kafka connector dependency in Kafka E2E test module

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink-end-to-end-tests-common-kafka/pom.xml    |  5 ++
 .../flink/tests/util/kafka/KafkaSourceE2ECase.java |  2 -
 .../util/flink/container/FlinkContainers.java      | 53 ++++++++++++++++++++--
 .../flink/container/FlinkContainersBuilder.java    | 38 ++--------------
 4 files changed, 58 insertions(+), 40 deletions(-)

[flink] 02/03: [FLINK-25263][e2e/kafka] Re-enable temporarily disabled KafkaSourceE2ECase

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 55a0bada0a1afbadd2978934ef40df3208408cc2
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Dec 16 16:03:55 2021 +0800

    [FLINK-25263][e2e/kafka] Re-enable temporarily disabled KafkaSourceE2ECase
---
 .../test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java | 2 --
 1 file changed, 2 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
index ac43d9e..9f5e9f7 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -29,12 +29,10 @@ import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
 import org.apache.flink.util.DockerImageVersions;
 
-import org.junit.jupiter.api.Disabled;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
 /** Kafka E2E test based on connector testing framework. */
-@Disabled("FLINK-25263")
 public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
     private static final String KAFKA_HOSTNAME = "kafka";
 

[flink] 01/03: [FLINK-25340][testutils] Remove JM temp files before stopping container to avoid file permission issues

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc44a6dee8338c4e4af1e3cecf4512c8bf11ae67
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Dec 16 16:02:38 2021 +0800

    [FLINK-25340][testutils] Remove JM temp files before stopping container to avoid file permission issues
    
    Co-authored-by: Fabian Paul <fa...@ververica.com>
---
 .../util/flink/container/FlinkContainers.java      | 53 ++++++++++++++++++++--
 .../flink/container/FlinkContainersBuilder.java    | 38 ++--------------
 2 files changed, 53 insertions(+), 38 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
index 693ab0d..5d4fa8e 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainers.java
@@ -20,7 +20,9 @@ package org.apache.flink.tests.util.flink.container;
 
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
@@ -30,6 +32,8 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.tests.util.flink.SQLJobSubmission;
 import org.apache.flink.util.function.RunnableWithException;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -49,8 +53,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -113,7 +120,6 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
     private final List<GenericContainer<?>> taskManagers;
     private final GenericContainer<?> haService;
     private final Configuration conf;
-    private final Runnable cleanupHook;
 
     @Nullable private RestClusterClient<StandaloneClusterId> restClusterClient;
     private boolean isStarted = false;
@@ -127,13 +133,11 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
             GenericContainer<?> jobManager,
             List<GenericContainer<?>> taskManagers,
             @Nullable GenericContainer<?> haService,
-            Configuration conf,
-            Runnable cleanupHook) {
+            Configuration conf) {
         this.jobManager = jobManager;
         this.taskManagers = taskManagers;
         this.haService = haService;
         this.conf = conf;
-        this.cleanupHook = cleanupHook;
     }
 
     /** Starts all containers. */
@@ -160,11 +164,11 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
             restClusterClient.close();
         }
         this.taskManagers.forEach(GenericContainer::stop);
+        deleteJobManagerTemporaryFiles();
         this.jobManager.stop();
         if (this.haService != null) {
             this.haService.stop();
         }
-        cleanupHook.run();
     }
 
     /** Gets the running state of the cluster. */
@@ -317,4 +321,43 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
                 DEFAULT_TIMEOUT,
                 "TaskManagers are not ready within 30 seconds");
     }
+
+    private void deleteJobManagerTemporaryFiles() {
+        final String checkpointDir = conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+        final String haDir = conf.get(HighAvailabilityOptions.HA_STORAGE_PATH);
+        final Collection<String> usedPaths =
+                Lists.newArrayList(checkpointDir, haDir).stream()
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList());
+        if (usedPaths.isEmpty()) {
+            return;
+        }
+        final StringBuilder deletionBaseCommand = new StringBuilder("rm -rf");
+        usedPaths.forEach(p -> deletionBaseCommand.append(formatFilePathForDeletion(p)));
+        final String[] command = {"bash", "-c", deletionBaseCommand.toString()};
+        final Container.ExecResult result;
+        try {
+            result = jobManager.execInContainer(command);
+            if (result.getExitCode() != 0) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Command \"%s\" returned non-zero exit code %d. \nSTDOUT: %s\nSTDERR: %s",
+                                String.join(" ", command),
+                                result.getExitCode(),
+                                result.getStdout(),
+                                result.getStderr()));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to delete temporary files generated by the flink cluster.", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Failed to delete temporary files generated by the flink cluster.", e);
+        }
+    }
+
+    private String formatFilePathForDeletion(String path) {
+        return " " + Paths.get(path).toString().split("file:")[1] + "/*";
+    }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
index 2179860..bb9f190 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -165,13 +164,10 @@ public class FlinkContainersBuilder {
                 CheckpointingOptions.CHECKPOINTS_DIRECTORY,
                 CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
 
-        final List<Path> temporaryPaths = new ArrayList<>();
-
         // Create temporary directory for building Flink image
         final Path imageBuildingTempDir;
         try {
             imageBuildingTempDir = Files.createTempDirectory("flink-image-build");
-            temporaryPaths.add(imageBuildingTempDir);
         } catch (IOException e) {
             throw new RuntimeException("Failed to create temporary directory", e);
         }
@@ -185,22 +181,13 @@ public class FlinkContainersBuilder {
 
         // Mount HA storage to JobManager
         if (enableZookeeperHA) {
-            final Path haStorage =
-                    createTempDirAndMountToContainer("flink-recovery", HA_STORAGE_PATH, jobManager);
-            temporaryPaths.add(haStorage);
+            createTempDirAndMountToContainer("flink-recovery", HA_STORAGE_PATH, jobManager);
         }
 
         // Mount checkpoint storage to JobManager
-        final Path checkpointPath =
-                createTempDirAndMountToContainer("flink-checkpoint", CHECKPOINT_PATH, jobManager);
-        temporaryPaths.add(checkpointPath);
-
-        return new FlinkContainers(
-                jobManager,
-                taskManagers,
-                zookeeper,
-                conf,
-                () -> deleteTemporaryPaths(temporaryPaths));
+        createTempDirAndMountToContainer("flink-checkpoint", CHECKPOINT_PATH, jobManager);
+
+        return new FlinkContainers(jobManager, taskManagers, zookeeper, conf);
     }
 
     // --------------------------- Helper Functions -------------------------------------
@@ -292,30 +279,15 @@ public class FlinkContainersBuilder {
         conf.set(HighAvailabilityOptions.HA_STORAGE_PATH, HA_STORAGE_PATH.toUri().toString());
     }
 
-    private Path createTempDirAndMountToContainer(
+    private void createTempDirAndMountToContainer(
             String tempDirPrefix, Path containerPath, GenericContainer<?> container) {
         try {
             Path tempDirPath = Files.createTempDirectory(tempDirPrefix);
             container.withFileSystemBind(
                     tempDirPath.toAbsolutePath().toString(),
                     containerPath.toAbsolutePath().toString());
-            return tempDirPath;
         } catch (IOException e) {
             throw new IllegalStateException("Failed to create temporary recovery directory", e);
         }
     }
-
-    private void deleteTemporaryPaths(List<Path> temporaryPaths) {
-        temporaryPaths.forEach(
-                (path) -> {
-                    try {
-                        FileUtils.deleteDirectory(path.toFile());
-                    } catch (IOException e) {
-                        throw new RuntimeException(
-                                String.format(
-                                        "Failed to delete path \"%s\"", path.toAbsolutePath()),
-                                e);
-                    }
-                });
-    }
 }

[flink] 03/03: [hotfix][connector/kafka] Add Kafka connector dependency in Kafka E2E test module

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f054398268f69169cd066ff0905479d103df7b1
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Tue Jan 4 16:27:34 2022 +0800

    [hotfix][connector/kafka] Add Kafka connector dependency in Kafka E2E test module
---
 flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
index 12fa08f..b526600 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -45,6 +45,11 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils-junit</artifactId>
 			<scope>compile</scope>
 		</dependency>