You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zentol (via GitHub)" <gi...@apache.org> on 2023/03/27 09:01:40 UTC

[GitHub] [flink] zentol commented on a diff in pull request #22271: [FLINK-28372][rpc] Migrate to Akka Artery

zentol commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1148989474


##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java:
##########
@@ -46,7 +46,7 @@ class MessageSerializationTest {
     private static RpcService akkaRpcService1;
     private static RpcService akkaRpcService2;
 
-    private static final int maxFrameSize = 32000;
+    private static final int maxFrameSize = 32768;

Review Comment:
   Does artery enforce this to be a power of 2?



##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
             Configuration configuration,
             int port,
             int externalPort) {
-        final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
-        final String startupTimeout =
-                TimeUtils.getStringInMillis(
-                        TimeUtils.parseDuration(
-                                configuration.getString(
-                                        AkkaOptions.STARTUP_TIMEOUT,
-                                        TimeUtils.getStringInMillis(
-                                                akkaAskTimeout.multipliedBy(10L)))));
-
         final String akkaTCPTimeout =
                 TimeUtils.getStringInMillis(
                         TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
 
         final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE);
 
-        final int clientSocketWorkerPoolPoolSizeMin =
-                configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int clientSocketWorkerPoolPoolSizeMax =
-                configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double clientSocketWorkerPoolPoolSizeFactor =
-                configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
-        final int serverSocketWorkerPoolPoolSizeMin =
-                configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int serverSocketWorkerPoolPoolSizeMax =
-                configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double serverSocketWorkerPoolPoolSizeFactor =
-                configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
-        final String logLifecycleEvents =
-                booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
-        final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+        final String outboundRestartBackoff =
+                configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
 
         akkaConfigBuilder
                 .add("akka {")
                 .add("  actor {")
-                .add("    provider = \"akka.remote.RemoteActorRefProvider\"")
+                .add("    provider = remote")
                 .add("  }")
-                .add("  remote.artery.enabled = false")
-                .add("  remote.startup-timeout = " + startupTimeout)
                 .add("  remote.warn-about-direct-use = off")
                 .add("  remote.use-unsafe-remote-features-outside-cluster = on")
-                .add("  remote.classic {")
-                .add("    # disable the transport failure detector by setting very high values")
-                .add("    transport-failure-detector{")
-                .add("      acceptable-heartbeat-pause = 6000 s")
-                .add("      heartbeat-interval = 1000 s")
-                .add("      threshold = 300")
+                .add("  remote.artery {")
+                .add("    canonical.port = " + externalPort)
+                .add("    bind.port = " + port)
+                .add("    advanced {")
+                .add("      maximum-frame-size = " + akkaFramesize)
+                .add("      outbound-restart-backoff = " + outboundRestartBackoff)
+                .add("      tcp.connection-timeout = " + akkaTCPTimeout)
                 .add("    }")
-                .add("    enabled-transports = [\"akka.remote.classic.netty.tcp\"]")
-                .add("    netty {")
-                .add("      tcp {")
-                .add("        transport-class = \"akka.remote.transport.netty.NettyTransport\"")
-                .add("        port = " + externalPort)
-                .add("        bind-port = " + port)
-                .add("        connection-timeout = " + akkaTCPTimeout)
-                .add("        maximum-frame-size = " + akkaFramesize)
-                .add("        tcp-nodelay = on")
-                .add("        client-socket-worker-pool {")
-                .add("          pool-size-min = " + clientSocketWorkerPoolPoolSizeMin)
-                .add("          pool-size-max = " + clientSocketWorkerPoolPoolSizeMax)
-                .add("          pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor)
-                .add("        }")
-                .add("        server-socket-worker-pool {")
-                .add("          pool-size-min = " + serverSocketWorkerPoolPoolSizeMin)
-                .add("          pool-size-max = " + serverSocketWorkerPoolPoolSizeMax)
-                .add("          pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor)
-                .add("        }")
-                .add("      }")
-                .add("    }")
-                .add("    log-remote-lifecycle-events = " + logLifecycleEvents)

Review Comment:
   what happened to this option?



##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -362,26 +308,18 @@ private static void addSslRemoteAkkaConfig(
 
         akkaConfigBuilder
                 .add("akka {")
-                .add("  remote.classic {")
-                .add("    enabled-transports = [\"akka.remote.classic.netty.ssl\"]")
-                .add("    netty {")
-                .add("      ssl = ${akka.remote.classic.netty.tcp}")
-                .add("      ssl {")
-                .add("        enable-ssl = " + akkaEnableSSL)
-                .add("        ssl-engine-provider = " + sslEngineProviderName)
-                .add("        security {")
-                .add("          key-store = \"" + akkaSSLKeyStore + "\"")
-                .add("          key-store-password = \"" + akkaSSLKeyStorePassword + "\"")
-                .add("          key-password = \"" + akkaSSLKeyPassword + "\"")
-                .add("          trust-store = \"" + akkaSSLTrustStore + "\"")
-                .add("          trust-store-password = \"" + akkaSSLTrustStorePassword + "\"")
-                .add("          protocol = " + akkaSSLProtocol + "")
-                .add("          enabled-algorithms = " + akkaSSLAlgorithms + "")
-                .add("          random-number-generator = \"\"")
-                .add("          require-mutual-authentication = on")

Review Comment:
   Is this now always on or what happened to it?



##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java:
##########
@@ -46,7 +46,7 @@ private AkkaRpcServiceConfiguration(
             boolean captureAskCallStack,
             boolean forceRpcInvocationSerialization) {
 
-        checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
+        checkArgument(maximumFramesize >= 32768L, "Maximum framesize must be at least 32 KiB.");

Review Comment:
   new minimum enforced by artery?



##########
flink-rpc/flink-rpc-akka/pom.xml:
##########
@@ -96,11 +96,6 @@ under the License.
 			<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
 			<version>${akka.version}</version>
 		</dependency>
-		<dependency>
-			<groupId>io.netty</groupId>
-			<artifactId>netty</artifactId>
-			<version>3.10.6.Final</version>

Review Comment:
   needs a NOTICE update



##########
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java:
##########
@@ -178,13 +170,12 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
                     .defaultValue(true)
                     .withDescription("Exit JVM on fatal Akka errors.");
 
-    /** Milliseconds a gate should be closed for after a remote connection was disconnected. */
-    public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
-            ConfigOptions.key("akka.retry-gate-closed-for")
-                    .longType()
-                    .defaultValue(50L)
-                    .withDescription(
-                            "Milliseconds a gate should be closed for after a remote connection was disconnected.");
+    /** Retry outbound connection only after this backoff. */
+    public static final ConfigOption<String> OUTBOUND_RESTART_BACKOFF =

Review Comment:
   Is this a de-facto replacement `RETRY_GATE_CLOSED_FOR`?



##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
             Configuration configuration,
             int port,
             int externalPort) {
-        final Duration akkaAskTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
-        final String startupTimeout =
-                TimeUtils.getStringInMillis(
-                        TimeUtils.parseDuration(
-                                configuration.getString(
-                                        AkkaOptions.STARTUP_TIMEOUT,
-                                        TimeUtils.getStringInMillis(
-                                                akkaAskTimeout.multipliedBy(10L)))));
-
         final String akkaTCPTimeout =
                 TimeUtils.getStringInMillis(
                         TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
 
         final String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE);
 
-        final int clientSocketWorkerPoolPoolSizeMin =
-                configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int clientSocketWorkerPoolPoolSizeMax =
-                configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double clientSocketWorkerPoolPoolSizeFactor =
-                configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
-        final int serverSocketWorkerPoolPoolSizeMin =
-                configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
-        final int serverSocketWorkerPoolPoolSizeMax =
-                configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
-        final double serverSocketWorkerPoolPoolSizeFactor =
-                configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
-        final String logLifecycleEvents =
-                booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
-        final long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+        final String outboundRestartBackoff =
+                configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
 
         akkaConfigBuilder
                 .add("akka {")
                 .add("  actor {")
-                .add("    provider = \"akka.remote.RemoteActorRefProvider\"")
+                .add("    provider = remote")
                 .add("  }")
-                .add("  remote.artery.enabled = false")
-                .add("  remote.startup-timeout = " + startupTimeout)
                 .add("  remote.warn-about-direct-use = off")
                 .add("  remote.use-unsafe-remote-features-outside-cluster = on")
-                .add("  remote.classic {")
-                .add("    # disable the transport failure detector by setting very high values")
-                .add("    transport-failure-detector{")
-                .add("      acceptable-heartbeat-pause = 6000 s")
-                .add("      heartbeat-interval = 1000 s")
-                .add("      threshold = 300")
+                .add("  remote.artery {")
+                .add("    canonical.port = " + externalPort)
+                .add("    bind.port = " + port)
+                .add("    advanced {")
+                .add("      maximum-frame-size = " + akkaFramesize)
+                .add("      outbound-restart-backoff = " + outboundRestartBackoff)
+                .add("      tcp.connection-timeout = " + akkaTCPTimeout)
                 .add("    }")
-                .add("    enabled-transports = [\"akka.remote.classic.netty.tcp\"]")
-                .add("    netty {")
-                .add("      tcp {")
-                .add("        transport-class = \"akka.remote.transport.netty.NettyTransport\"")
-                .add("        port = " + externalPort)
-                .add("        bind-port = " + port)
-                .add("        connection-timeout = " + akkaTCPTimeout)
-                .add("        maximum-frame-size = " + akkaFramesize)
-                .add("        tcp-nodelay = on")
-                .add("        client-socket-worker-pool {")

Review Comment:
   With artery there's no additional thread pool? Do we potentially need to adjust the akka thread pools to accomodate?



##########
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java:
##########
@@ -235,6 +254,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
                                     .text("Min number of threads to cap factor-based number to.")
                                     .build());
 
+    /** @deprecated Don't use this option anymore. It has no effect on Flink. */
+    @Deprecated

Review Comment:
   needs regeneration of the docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org