You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/29 11:42:45 UTC

[flink] branch release-1.11 updated: [FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 8b03294  [FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout.
8b03294 is described below

commit 8b032941888079d5015fb40ad755429aa5297d90
Author: wangtong <wt...@163.com>
AuthorDate: Fri May 15 21:56:42 2020 +0800

    [FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout.
    
    This closes #12179.
---
 docs/_includes/generated/akka_configuration.html   |  6 ---
 docs/_includes/generated/client_configuration.html | 24 ++++++++++
 .../generated/execution_configuration.html         | 12 -----
 docs/ops/config.md                                 |  4 ++
 docs/ops/config.zh.md                              |  4 ++
 .../org/apache/flink/client/cli/CliFrontend.java   |  3 +-
 .../org/apache/flink/client/cli/ClientOptions.java | 45 ++++++++++++++++++
 .../ApplicationDispatcherBootstrap.java            |  6 +--
 .../application/executors/EmbeddedExecutor.java    |  4 +-
 .../executors/EmbeddedExecutorFactory.java         |  4 +-
 .../apache/flink/client/cli/ClientOptionsTest.java | 53 ++++++++++++++++++++++
 .../apache/flink/configuration/AkkaOptions.java    |  6 ++-
 .../flink/configuration/ConfigConstants.java       |  4 +-
 .../flink/configuration/ExecutionOptions.java      | 15 ------
 .../configuration/ConfigOptionsDocGenerator.java   |  3 +-
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  |  4 --
 .../apache/flink/runtime/akka/AkkaUtilsTest.scala  |  2 +-
 17 files changed, 148 insertions(+), 51 deletions(-)

diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html
index 2baeb6f..3e59c13 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -39,12 +39,6 @@
             <td>Min number of threads to cap factor-based number to.</td>
         </tr>
         <tr>
-            <td><h5>akka.client.timeout</h5></td>
-            <td style="word-wrap: break-word;">"60 s"</td>
-            <td>String</td>
-            <td>Timeout for all blocking calls on the client side.</td>
-        </tr>
-        <tr>
             <td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
             <td style="word-wrap: break-word;">2.0</td>
             <td>Double</td>
diff --git a/docs/_includes/generated/client_configuration.html b/docs/_includes/generated/client_configuration.html
new file mode 100644
index 0000000..c623dde
--- /dev/null
+++ b/docs/_includes/generated/client_configuration.html
@@ -0,0 +1,24 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>client.retry-period</h5></td>
+            <td style="word-wrap: break-word;">2 s</td>
+            <td>Duration</td>
+            <td>The interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).</td>
+        </tr>
+        <tr>
+            <td><h5>client.timeout</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>Timeout on the client side.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/_includes/generated/execution_configuration.html b/docs/_includes/generated/execution_configuration.html
index f893a75..037960c 100644
--- a/docs/_includes/generated/execution_configuration.html
+++ b/docs/_includes/generated/execution_configuration.html
@@ -20,17 +20,5 @@
             <td>Boolean</td>
             <td>Tells if we should use compression for the state snapshot data or not</td>
         </tr>
-        <tr>
-            <td><h5>execution.embedded-rpc-retry-period</h5></td>
-            <td style="word-wrap: break-word;">2 s</td>
-            <td>Duration</td>
-            <td>The retry period (in ms) between consecutive attempts to get the job status when executing applications in "Application Mode".</td>
-        </tr>
-        <tr>
-            <td><h5>execution.embedded-rpc-timeout</h5></td>
-            <td style="word-wrap: break-word;">1 h</td>
-            <td>Duration</td>
-            <td>The rpc timeout (in ms) when executing applications in "Application Mode". This affects all rpc's available through the Job Client and job submission.</td>
-        </tr>
     </tbody>
 </table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index f33d03f..9432f5d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -423,6 +423,10 @@ These options may be removed in a future release.
 
 # Backup
 
+#### Client
+
+{% include generated/client_configuration.html %}
+
 #### Execution
 
 {% include generated/deployment_configuration.html %}
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index 80244fd..a4dd3d7 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -423,6 +423,10 @@ These options may be removed in a future release.
 
 # Backup
 
+#### Client
+
+{% include generated/client_configuration.html %}
+
 #### Execution
 
 {% include generated/deployment_configuration.html %}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 7218977..1e2bae0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -46,7 +46,6 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -141,7 +140,7 @@ public class CliFrontend {
 			customCommandLine.addRunOptions(customCommandLineOptions);
 		}
 
-		this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
+		this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
 		this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
new file mode 100644
index 0000000..063421e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/**
+ * Describes a client configuration parameter.
+ */
+@PublicEvolving
+public class ClientOptions {
+
+	public static final ConfigOption<Duration> CLIENT_TIMEOUT =
+			ConfigOptions.key("client.timeout")
+					.durationType()
+					.defaultValue(Duration.ofSeconds(60))
+					.withDeprecatedKeys("akka.client.timeout") // the deprecated AkkaOptions.CLIENT_TIMEOUT
+					.withDescription("Timeout on the client side.");
+
+	public static final ConfigOption<Duration> CLIENT_RETRY_PERIOD =
+			ConfigOptions.key("client.retry-period")
+					.durationType()
+					.defaultValue(Duration.ofMillis(2000))
+					.withDescription("The interval (in ms) between consecutive retries of failed attempts to execute " +
+							"commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).");
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index 9ff1f54..2b4ddf3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -23,11 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobCancellationException;
@@ -263,8 +263,8 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap
 			final JobID jobId,
 			final ScheduledExecutor scheduledExecutor) {
 
-		final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
-		final Time retryPeriod = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_RETRY_PERIOD).toMillis());
+		final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
+		final Time retryPeriod = Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
 
 		return JobStatusPollingUtils.getJobResult(
 						dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index febf3ae..aa2db40 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -22,9 +22,9 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutor;
@@ -103,7 +103,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
 	}
 
 	private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
-		final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
+		final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
 
 		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
 		final JobID actualJobId = jobGraph.getJobID();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
index 4c11786..51d8bbc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
@@ -21,9 +21,9 @@ package org.apache.flink.client.deployment.application.executors;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.deployment.application.EmbeddedJobClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -80,7 +80,7 @@ public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
 				submittedJobIds,
 				dispatcherGateway,
 				jobId -> {
-					final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
+					final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
 					return new EmbeddedJobClient(jobId, dispatcherGateway, retryExecutor, timeout);
 				});
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java
new file mode 100644
index 0000000..9db564f
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * unit test for ClientOptions.
+ */
+@RunWith(JUnit4.class)
+public class ClientOptionsTest {
+
+	@Test
+	public void testGetClientTimeout() {
+		Configuration configuration = new Configuration();
+		configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(10));
+
+		assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(10));
+
+		configuration = new Configuration();
+		configuration.set(AkkaOptions.CLIENT_TIMEOUT, "20 s");
+		assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(20));
+
+		configuration = new Configuration();
+		assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), ClientOptions.CLIENT_TIMEOUT.defaultValue());
+	}
+
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index e9b3002..c553e0b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -156,12 +156,16 @@ public class AkkaOptions {
 
 	/**
 	 * Timeout for all blocking calls on the client side.
+	 *
+	 * @deprecated Use the {@code ClientOptions.CLIENT_TIMEOUT} instead.
 	 */
+	@Deprecated
 	public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
 		.key("akka.client.timeout")
 		.stringType()
 		.defaultValue("60 s")
-		.withDescription("Timeout for all blocking calls on the client side.");
+		.withDescription("DEPRECATED: Use the \"client.timeout\" instead." +
+				" Timeout for all blocking calls on the client side.");
 
 	/**
 	 * Exit JVM on fatal Akka errors.
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f1e2de8..4566211 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -939,7 +939,7 @@ public final class ConfigConstants {
 	/**
 	 * Timeout for all blocking calls on the client side.
 	 *
-	 * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
+	 * @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead.
 	 */
 	@Deprecated
 	public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
@@ -1787,7 +1787,7 @@ public final class ConfigConstants {
 	public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
 
 	/**
-	 * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
+	 * @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead.
 	 */
 	@Deprecated
 	public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
index cb34583..60f6cd8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
@@ -55,19 +55,4 @@ public class ExecutionOptions {
 						"throughput")
 				)
 				.build());
-
-	public static final ConfigOption<Duration> EMBEDDED_RPC_TIMEOUT =
-		ConfigOptions.key("execution.embedded-rpc-timeout")
-			.durationType()
-			.defaultValue(Duration.ofMillis(60 * 60 * 1000))
-			.withDescription("The rpc timeout (in ms) when executing applications in \"Application Mode\". " +
-					"This affects all rpc's available through the Job Client and job submission.");
-
-	public static final ConfigOption<Duration> EMBEDDED_RPC_RETRY_PERIOD =
-		ConfigOptions.key("execution.embedded-rpc-retry-period")
-			.durationType()
-			.defaultValue(Duration.ofMillis(2000))
-			.withDescription("The retry period (in ms) between consecutive attempts to get the job status " +
-					"when executing applications in \"Application Mode\".");
-
 }
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index b98b848..ac6b933 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -77,7 +77,8 @@ public class ConfigOptionsDocGenerator {
 		new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"),
 		new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config"),
 		new OptionsClassLocation("flink-python", "org.apache.flink.python"),
-		new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration")
+		new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration"),
+		new OptionsClassLocation("flink-clients", "org.apache.flink.client.cli")
 	};
 
 	static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index f046a19..ea3b4bf 100755
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -793,10 +793,6 @@ object AkkaUtils {
     TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT))
   }
 
-  def getClientTimeout(config: Configuration): time.Duration = {
-    TimeUtils.parseDuration(config.getString(AkkaOptions.CLIENT_TIMEOUT))
-  }
-
   /** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains
     * the port and the host under which the actor system is reachable
     *
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 4cb7557..0aa6a5e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -105,7 +105,7 @@ class AkkaUtilsTest
     val IPv4AddressString = "192.168.0.1"
     val port = 1234
     val address = new InetSocketAddress(IPv4AddressString, port)
-    
+
     val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"
 
     val result = AkkaUtils.getInetSocketAddressFromAkkaURL(url)