You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2021/11/18 19:10:24 UTC

[flink] branch master updated (ad16e2c -> 7a10982)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ad16e2c  [FLINK-24889][table] Support casting MULTISET to STRING
     new c2f510f  [hotfix][coordination] Improve hint to build akka RPC system.
     new e6798c3  [FLINK-24366][runtime] Don't log error for failed task restore if the task is already canceled.
     new 57253c5  [FLINK-24255][tests] Test environments respect configuration when being instantiated.
     new 7a10982  [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/rpc/akka/AkkaRpcSystemLoader.java      |  2 +-
 .../source/coordinator/SourceCoordinator.java      | 29 ++++++++++++++++------
 .../api/operators/BackendRestorerProcedure.java    | 12 ++++-----
 .../streaming/util/TestStreamEnvironment.java      | 13 +++++++---
 .../MiniClusterPipelineExecutorServiceLoader.java  | 25 ++++++++++++++++---
 .../apache/flink/test/util/TestEnvironment.java    |  4 ++-
 6 files changed, 63 insertions(+), 22 deletions(-)

[flink] 03/04: [FLINK-24255][tests] Test environments respect configuration when being instantiated.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57253c5fc880fff880526a8954446c8189ac7c72
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 27 02:27:10 2021 +0200

    [FLINK-24255][tests] Test environments respect configuration when being instantiated.
    
    This closes #17240
---
 .../streaming/util/TestStreamEnvironment.java      | 13 ++++++++---
 .../MiniClusterPipelineExecutorServiceLoader.java  | 25 +++++++++++++++++++---
 .../apache/flink/test/util/TestEnvironment.java    |  4 +++-
 3 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index b59f598..00f5692 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -48,19 +48,26 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
     public TestStreamEnvironment(
             MiniCluster miniCluster,
+            Configuration config,
             int parallelism,
             Collection<Path> jarFiles,
             Collection<URL> classPaths) {
         super(
                 new MiniClusterPipelineExecutorServiceLoader(miniCluster),
-                MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+                MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
+                        config, jarFiles, classPaths),
                 null);
 
         setParallelism(parallelism);
     }
 
     public TestStreamEnvironment(MiniCluster miniCluster, int parallelism) {
-        this(miniCluster, parallelism, Collections.emptyList(), Collections.emptyList());
+        this(
+                miniCluster,
+                new Configuration(),
+                parallelism,
+                Collections.emptyList(),
+                Collections.emptyList());
     }
 
     /**
@@ -83,7 +90,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
                 conf -> {
                     TestStreamEnvironment env =
                             new TestStreamEnvironment(
-                                    miniCluster, parallelism, jarFiles, classpaths);
+                                    miniCluster, conf, parallelism, jarFiles, classpaths);
 
                     randomizeConfiguration(miniCluster, conf);
 
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
index 9bc2e60..b181d51 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -36,6 +37,9 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -48,6 +52,10 @@ import java.util.stream.Stream;
  * PipelineExecutors} that use a given {@link MiniCluster}.
  */
 public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class);
+
     public static final String NAME = "minicluster";
 
     private final MiniCluster miniCluster;
@@ -60,9 +68,14 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
      * Populates a {@link Configuration} that is compatible with this {@link
      * MiniClusterPipelineExecutorServiceLoader}.
      */
-    public static Configuration createConfiguration(
-            Collection<Path> jarFiles, Collection<URL> classPaths) {
-        Configuration config = new Configuration();
+    public static Configuration updateConfigurationForMiniCluster(
+            Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) {
+
+        checkOverridesOption(config, PipelineOptions.JARS);
+        checkOverridesOption(config, PipelineOptions.CLASSPATHS);
+        checkOverridesOption(config, DeploymentOptions.TARGET);
+        checkOverridesOption(config, DeploymentOptions.ATTACHED);
+
         ConfigUtils.encodeCollectionToConfig(
                 config,
                 PipelineOptions.JARS,
@@ -75,6 +88,12 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
         return config;
     }
 
+    private static void checkOverridesOption(Configuration config, ConfigOption<?> option) {
+        if (config.contains(option)) {
+            LOG.warn("Overriding config setting '{}' for MiniCluster.", option.key());
+        }
+    }
+
     private static String getAbsoluteURL(Path path) {
         FileSystem fs;
         try {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index ac7c311..b63d9c4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.Preconditions;
@@ -46,7 +47,8 @@ public class TestEnvironment extends ExecutionEnvironment {
             Collection<URL> classPaths) {
         super(
                 new MiniClusterPipelineExecutorServiceLoader(miniCluster),
-                MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+                MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
+                        new Configuration(), jarFiles, classPaths),
                 null);
 
         this.miniCluster = Preconditions.checkNotNull(miniCluster);

[flink] 02/04: [FLINK-24366][runtime] Don't log error for failed task restore if the task is already canceled.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6798c3e40bffeccfc35c84021e63aae41a3ac48
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 23 17:47:13 2021 +0200

    [FLINK-24366][runtime] Don't log error for failed task restore if the task is already canceled.
---
 .../streaming/api/operators/BackendRestorerProcedure.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index d15ed63..5a0cea9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -137,6 +137,12 @@ public class BackendRestorerProcedure<T extends Closeable & Disposable, S extend
 
                 collectedException = ExceptionUtils.firstOrSuppressed(ex, collectedException);
 
+                if (backendCloseableRegistry.isClosed()) {
+                    throw new FlinkException(
+                            "Stopping restore attempts for already cancelled task.",
+                            collectedException);
+                }
+
                 LOG.warn(
                         "Exception while restoring {} from alternative ({}/{}), will retry while more "
                                 + "alternatives are available.",
@@ -144,12 +150,6 @@ public class BackendRestorerProcedure<T extends Closeable & Disposable, S extend
                         alternativeIdx,
                         restoreOptions.size(),
                         ex);
-
-                if (backendCloseableRegistry.isClosed()) {
-                    throw new FlinkException(
-                            "Stopping restore attempts for already cancelled task.",
-                            collectedException);
-                }
             }
         }
 

[flink] 01/04: [hotfix][coordination] Improve hint to build akka RPC system.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c2f510f670cc26ba85d183d822ef8c41ff6bd068
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Nov 18 15:31:29 2021 +0100

    [hotfix][coordination] Improve hint to build akka RPC system.
---
 .../java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
index 216769e..735f739 100644
--- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
+++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
@@ -47,7 +47,7 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader {
     private static final String FLINK_RPC_AKKA_FAT_JAR = "flink-rpc-akka.jar";
 
     static final String HINT_USAGE =
-            "mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader";
+            "mvn clean package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader -DskipTests";
 
     @Override
     public RpcSystem loadRpcSystem(Configuration config) {

[flink] 04/04: [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a109823a21150c7bea9ba41fc22203cbaf7094f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Aug 17 18:31:54 2021 +0200

    [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.
    
    This closes #16867
---
 .../source/coordinator/SourceCoordinator.java      | 29 ++++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 5ba4160..85a767e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -163,19 +164,31 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
     public void handleEventFromOperator(int subtask, OperatorEvent event) {
         runInEventLoop(
                 () -> {
-                    LOG.debug(
-                            "Handling event from subtask {} of source {}: {}",
-                            subtask,
-                            operatorName,
-                            event);
                     if (event instanceof RequestSplitEvent) {
+                        LOG.info(
+                                "Source {} received split request from parallel task {}",
+                                operatorName,
+                                subtask);
                         enumerator.handleSplitRequest(
                                 subtask, ((RequestSplitEvent) event).hostName());
                     } else if (event instanceof SourceEventWrapper) {
-                        enumerator.handleSourceEvent(
-                                subtask, ((SourceEventWrapper) event).getSourceEvent());
+                        final SourceEvent sourceEvent =
+                                ((SourceEventWrapper) event).getSourceEvent();
+                        LOG.debug(
+                                "Source {} received custom event from parallel task {}: {}",
+                                operatorName,
+                                subtask,
+                                sourceEvent);
+                        enumerator.handleSourceEvent(subtask, sourceEvent);
                     } else if (event instanceof ReaderRegistrationEvent) {
-                        handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+                        final ReaderRegistrationEvent registrationEvent =
+                                (ReaderRegistrationEvent) event;
+                        LOG.info(
+                                "Source {} registering reader for parallel task {} @ {}",
+                                operatorName,
+                                subtask,
+                                registrationEvent.location());
+                        handleReaderRegistrationEvent(registrationEvent);
                     } else {
                         throw new FlinkException("Unrecognized Operator Event: " + event);
                     }