You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/11 12:57:40 UTC
[flink] branch release-1.14 updated:
[FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 2b68658 [FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code
2b68658 is described below
commit 2b686582840d4c29df873702a78cb7945cffa3c4
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Thu Nov 11 20:56:12 2021 +0800
[FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code
---
.../runtime/rpc/akka/AkkaRpcSystemLoader.java | 19 ++++++++---
.../rpc/akka/FallbackAkkaRpcSystemLoader.java | 26 +++++++++++----
.../org/apache/flink/runtime/rpc/RpcSystem.java | 3 +-
.../runtime/rpc/exceptions/RpcLoaderException.java | 37 ++++++++++++++++++++++
4 files changed, 72 insertions(+), 13 deletions(-)
diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
index ed8977a..216769e 100644
--- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
+++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.core.classloading.SubmoduleClassLoader;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemLoader;
+import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
import org.apache.flink.util.IOUtils;
import java.io.IOException;
@@ -42,6 +43,12 @@ import java.util.UUID;
*/
public class AkkaRpcSystemLoader implements RpcSystemLoader {
+ /** The name of the akka dependency jar, bundled with flink-rpc-akka-loader module artifact. */
+ private static final String FLINK_RPC_AKKA_FAT_JAR = "flink-rpc-akka.jar";
+
+ static final String HINT_USAGE =
+ "mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader";
+
@Override
public RpcSystem loadRpcSystem(Configuration config) {
try {
@@ -54,12 +61,14 @@ public class AkkaRpcSystemLoader implements RpcSystemLoader {
tmpDirectory.resolve("flink-rpc-akka_" + UUID.randomUUID() + ".jar"));
final InputStream resourceStream =
- flinkClassLoader.getResourceAsStream("flink-rpc-akka.jar");
+ flinkClassLoader.getResourceAsStream(FLINK_RPC_AKKA_FAT_JAR);
if (resourceStream == null) {
- throw new RuntimeException(
- "Akka RPC system could not be found. If this happened while running a test in the IDE,"
- + "run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line,"
- + "or add a test dependency on the flink-rpc-akka-loader test-jar.");
+ throw new RpcLoaderException(
+ String.format(
+ "Akka RPC system could not be found. If this happened while running a test in the IDE, "
+ + "run '%s' on the command-line, "
+ + "or add a test dependency on the flink-rpc-akka-loader test-jar.",
+ HINT_USAGE));
}
IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
diff --git a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
index a8624bb..1b8fc88 100644
--- a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
+++ b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.classloading.SubmoduleClassLoader;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemLoader;
+import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
import org.apache.flink.util.OperatingSystem;
import org.slf4j.Logger;
@@ -46,6 +47,9 @@ import java.util.stream.Stream;
public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
private static final Logger LOG = LoggerFactory.getLogger(FallbackAkkaRpcSystemLoader.class);
+ private static final String MODULE_FLINK_RPC = "flink-rpc";
+ private static final String MODULE_FLINK_RPC_AKKA = "flink-rpc-akka";
+
@Override
public RpcSystem loadRpcSystem(Configuration config) {
try {
@@ -67,7 +71,13 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
akkaRpcModuleDirectory.resolve(Paths.get("target", "dependencies"));
if (!Files.exists(akkaRpcModuleDependenciesDirectory)) {
- downloadDependencies(akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory);
+ int exitCode =
+ downloadDependencies(
+ akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory);
+ if (exitCode != 0) {
+ throw new RpcLoaderException(
+ "Could not download dependencies of flink-rpc-akka, please see the log output for details.");
+ }
} else {
LOG.debug(
"Re-using previously downloaded flink-rpc-akka dependencies. If you are experiencing strange issues, try clearing '{}'.",
@@ -95,8 +105,10 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
submoduleClassLoader,
null);
} catch (Exception e) {
- throw new RuntimeException(
- "Could not initialize RPC system. Run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line.",
+ throw new RpcLoaderException(
+ String.format(
+ "Could not initialize RPC system. Run '%s' on the command-line instead.",
+ AkkaRpcSystemLoader.HINT_USAGE),
e);
}
}
@@ -109,18 +121,18 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
try (Stream<Path> directoryContents = Files.list(currentParentCandidate)) {
final Optional<Path> flinkRpcModuleDirectory =
directoryContents
- .filter(path -> path.getFileName().toString().equals("flink-rpc"))
+ .filter(path -> path.getFileName().toString().equals(MODULE_FLINK_RPC))
.findFirst();
if (flinkRpcModuleDirectory.isPresent()) {
return flinkRpcModuleDirectory
- .map(path -> path.resolve(Paths.get("flink-rpc-akka")))
+ .map(path -> path.resolve(Paths.get(MODULE_FLINK_RPC_AKKA)))
.get();
}
}
return findAkkaRpcModuleDirectory(currentParentCandidate.getParent());
}
- private static void downloadDependencies(Path workingDirectory, Path targetDirectory)
+ private static int downloadDependencies(Path workingDirectory, Path targetDirectory)
throws IOException, InterruptedException {
final String mvnExecutable = OperatingSystem.isWindows() ? "mvn.bat" : "mvn";
@@ -134,6 +146,6 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
"-DincludeScope=runtime", // excludes provided/test dependencies
"-DoutputDirectory=" + targetDirectory)
.redirectOutput(ProcessBuilder.Redirect.INHERIT);
- mvn.start().waitFor();
+ return mvn.start().waitFor();
}
}
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index e6eab2d..cced3ca 100644
--- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nullable;
@@ -102,7 +103,7 @@ public interface RpcSystem extends RpcSystemUtils, AutoCloseable {
loadError = ExceptionUtils.firstOrSuppressed(e, loadError);
}
}
- throw new RuntimeException("Could not load RpcSystem.", loadError);
+ throw new RpcLoaderException("Could not load RpcSystem.", loadError);
}
/** Descriptor for creating a fork-join thread-pool. */
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java
new file mode 100644
index 0000000..70449c5
--- /dev/null
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+/** Base class for RPC loader exceptions. */
+public class RpcLoaderException extends RuntimeException {
+
+ private static final long serialVersionUID = 7787884485642531050L;
+
+ public RpcLoaderException(String message) {
+ super(message);
+ }
+
+ public RpcLoaderException(Throwable cause) {
+ super(cause);
+ }
+
+ public RpcLoaderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}