You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/11 12:57:40 UTC

[flink] branch release-1.14 updated: [FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 2b68658  [FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code
2b68658 is described below

commit 2b686582840d4c29df873702a78cb7945cffa3c4
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Thu Nov 11 20:56:12 2021 +0800

    [FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code
---
 .../runtime/rpc/akka/AkkaRpcSystemLoader.java      | 19 ++++++++---
 .../rpc/akka/FallbackAkkaRpcSystemLoader.java      | 26 +++++++++++----
 .../org/apache/flink/runtime/rpc/RpcSystem.java    |  3 +-
 .../runtime/rpc/exceptions/RpcLoaderException.java | 37 ++++++++++++++++++++++
 4 files changed, 72 insertions(+), 13 deletions(-)

diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
index ed8977a..216769e 100644
--- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
+++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.core.classloading.SubmoduleClassLoader;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.rpc.RpcSystemLoader;
+import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
 import org.apache.flink.util.IOUtils;
 
 import java.io.IOException;
@@ -42,6 +43,12 @@ import java.util.UUID;
  */
 public class AkkaRpcSystemLoader implements RpcSystemLoader {
 
+    /** The name of the akka dependency jar, bundled with flink-rpc-akka-loader module artifact. */
+    private static final String FLINK_RPC_AKKA_FAT_JAR = "flink-rpc-akka.jar";
+
+    static final String HINT_USAGE =
+            "mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader";
+
     @Override
     public RpcSystem loadRpcSystem(Configuration config) {
         try {
@@ -54,12 +61,14 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader {
                             tmpDirectory.resolve("flink-rpc-akka_" + UUID.randomUUID() + ".jar"));
 
             final InputStream resourceStream =
-                    flinkClassLoader.getResourceAsStream("flink-rpc-akka.jar");
+                    flinkClassLoader.getResourceAsStream(FLINK_RPC_AKKA_FAT_JAR);
             if (resourceStream == null) {
-                throw new RuntimeException(
-                        "Akka RPC system could not be found. If this happened while running a test in the IDE,"
-                                + "run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line,"
-                                + "or add a test dependency on the flink-rpc-akka-loader test-jar.");
+                throw new RpcLoaderException(
+                        String.format(
+                                "Akka RPC system could not be found. If this happened while running a test in the IDE, "
+                                        + "run '%s' on the command-line, "
+                                        + "or add a test dependency on the flink-rpc-akka-loader test-jar.",
+                                HINT_USAGE));
             }
 
             IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
diff --git a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
index a8624bb..1b8fc88 100644
--- a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
+++ b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.classloading.SubmoduleClassLoader;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.rpc.RpcSystemLoader;
+import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
 import org.apache.flink.util.OperatingSystem;
 
 import org.slf4j.Logger;
@@ -46,6 +47,9 @@ import java.util.stream.Stream;
 public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
     private static final Logger LOG = LoggerFactory.getLogger(FallbackAkkaRpcSystemLoader.class);
 
+    private static final String MODULE_FLINK_RPC = "flink-rpc";
+    private static final String MODULE_FLINK_RPC_AKKA = "flink-rpc-akka";
+
     @Override
     public RpcSystem loadRpcSystem(Configuration config) {
         try {
@@ -67,7 +71,13 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
                     akkaRpcModuleDirectory.resolve(Paths.get("target", "dependencies"));
 
             if (!Files.exists(akkaRpcModuleDependenciesDirectory)) {
-                downloadDependencies(akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory);
+                int exitCode =
+                        downloadDependencies(
+                                akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory);
+                if (exitCode != 0) {
+                    throw new RpcLoaderException(
+                            "Could not download dependencies of flink-rpc-akka, please see the log output for details.");
+                }
             } else {
                 LOG.debug(
                         "Re-using previously downloaded flink-rpc-akka dependencies. If you are experiencing strange issues, try clearing '{}'.",
@@ -95,8 +105,10 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
                     submoduleClassLoader,
                     null);
         } catch (Exception e) {
-            throw new RuntimeException(
-                    "Could not initialize RPC system. Run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line.",
+            throw new RpcLoaderException(
+                    String.format(
+                            "Could not initialize RPC system. Run '%s' on the command-line instead.",
+                            AkkaRpcSystemLoader.HINT_USAGE),
                     e);
         }
     }
@@ -109,18 +121,18 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
         try (Stream<Path> directoryContents = Files.list(currentParentCandidate)) {
             final Optional<Path> flinkRpcModuleDirectory =
                     directoryContents
-                            .filter(path -> path.getFileName().toString().equals("flink-rpc"))
+                            .filter(path -> path.getFileName().toString().equals(MODULE_FLINK_RPC))
                             .findFirst();
             if (flinkRpcModuleDirectory.isPresent()) {
                 return flinkRpcModuleDirectory
-                        .map(path -> path.resolve(Paths.get("flink-rpc-akka")))
+                        .map(path -> path.resolve(Paths.get(MODULE_FLINK_RPC_AKKA)))
                         .get();
             }
         }
         return findAkkaRpcModuleDirectory(currentParentCandidate.getParent());
     }
 
-    private static void downloadDependencies(Path workingDirectory, Path targetDirectory)
+    private static int downloadDependencies(Path workingDirectory, Path targetDirectory)
             throws IOException, InterruptedException {
 
         final String mvnExecutable = OperatingSystem.isWindows() ? "mvn.bat" : "mvn";
@@ -134,6 +146,6 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
                                 "-DincludeScope=runtime", // excludes provided/test dependencies
                                 "-DoutputDirectory=" + targetDirectory)
                         .redirectOutput(ProcessBuilder.Redirect.INHERIT);
-        mvn.start().waitFor();
+        return mvn.start().waitFor();
     }
 }
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index e6eab2d..cced3ca 100644
--- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
@@ -102,7 +103,7 @@ public interface RpcSystem extends RpcSystemUtils, AutoCloseable {
                 loadError = ExceptionUtils.firstOrSuppressed(e, loadError);
             }
         }
-        throw new RuntimeException("Could not load RpcSystem.", loadError);
+        throw new RpcLoaderException("Could not load RpcSystem.", loadError);
     }
 
     /** Descriptor for creating a fork-join thread-pool. */
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java
new file mode 100644
index 0000000..70449c5
--- /dev/null
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+/** Base class for RPC loader exceptions. */
+public class RpcLoaderException extends RuntimeException {
+
+    private static final long serialVersionUID = 7787884485642531050L;
+
+    public RpcLoaderException(String message) {
+        super(message);
+    }
+
+    public RpcLoaderException(Throwable cause) {
+        super(cause);
+    }
+
+    public RpcLoaderException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}