You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/29 11:42:45 UTC
[flink] branch release-1.11 updated: [FLINK-16144] get
client.timeout for the client, with a fallback to the akka.client.timeout.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 8b03294 [FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout.
8b03294 is described below
commit 8b032941888079d5015fb40ad755429aa5297d90
Author: wangtong <wt...@163.com>
AuthorDate: Fri May 15 21:56:42 2020 +0800
[FLINK-16144] get client.timeout for the client, with a fallback to the akka.client.timeout.
This closes #12179.
---
docs/_includes/generated/akka_configuration.html | 6 ---
docs/_includes/generated/client_configuration.html | 24 ++++++++++
.../generated/execution_configuration.html | 12 -----
docs/ops/config.md | 4 ++
docs/ops/config.zh.md | 4 ++
.../org/apache/flink/client/cli/CliFrontend.java | 3 +-
.../org/apache/flink/client/cli/ClientOptions.java | 45 ++++++++++++++++++
.../ApplicationDispatcherBootstrap.java | 6 +--
.../application/executors/EmbeddedExecutor.java | 4 +-
.../executors/EmbeddedExecutorFactory.java | 4 +-
.../apache/flink/client/cli/ClientOptionsTest.java | 53 ++++++++++++++++++++++
.../apache/flink/configuration/AkkaOptions.java | 6 ++-
.../flink/configuration/ConfigConstants.java | 4 +-
.../flink/configuration/ExecutionOptions.java | 15 ------
.../configuration/ConfigOptionsDocGenerator.java | 3 +-
.../org/apache/flink/runtime/akka/AkkaUtils.scala | 4 --
.../apache/flink/runtime/akka/AkkaUtilsTest.scala | 2 +-
17 files changed, 148 insertions(+), 51 deletions(-)
diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html
index 2baeb6f..3e59c13 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -39,12 +39,6 @@
<td>Min number of threads to cap factor-based number to.</td>
</tr>
<tr>
- <td><h5>akka.client.timeout</h5></td>
- <td style="word-wrap: break-word;">"60 s"</td>
- <td>String</td>
- <td>Timeout for all blocking calls on the client side.</td>
- </tr>
- <tr>
<td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
<td style="word-wrap: break-word;">2.0</td>
<td>Double</td>
diff --git a/docs/_includes/generated/client_configuration.html b/docs/_includes/generated/client_configuration.html
new file mode 100644
index 0000000..c623dde
--- /dev/null
+++ b/docs/_includes/generated/client_configuration.html
@@ -0,0 +1,24 @@
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>client.retry-period</h5></td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>The interval (in ms) between consecutive retries of failed attempts to execute commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).</td>
+ </tr>
+ <tr>
+ <td><h5>client.timeout</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>Timeout on the client side.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/_includes/generated/execution_configuration.html b/docs/_includes/generated/execution_configuration.html
index f893a75..037960c 100644
--- a/docs/_includes/generated/execution_configuration.html
+++ b/docs/_includes/generated/execution_configuration.html
@@ -20,17 +20,5 @@
<td>Boolean</td>
<td>Tells if we should use compression for the state snapshot data or not</td>
</tr>
- <tr>
- <td><h5>execution.embedded-rpc-retry-period</h5></td>
- <td style="word-wrap: break-word;">2 s</td>
- <td>Duration</td>
- <td>The retry period (in ms) between consecutive attempts to get the job status when executing applications in "Application Mode".</td>
- </tr>
- <tr>
- <td><h5>execution.embedded-rpc-timeout</h5></td>
- <td style="word-wrap: break-word;">1 h</td>
- <td>Duration</td>
- <td>The rpc timeout (in ms) when executing applications in "Application Mode". This affects all rpc's available through the Job Client and job submission.</td>
- </tr>
</tbody>
</table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index f33d03f..9432f5d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -423,6 +423,10 @@ These options may be removed in a future release.
# Backup
+#### Client
+
+{% include generated/client_configuration.html %}
+
#### Execution
{% include generated/deployment_configuration.html %}
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index 80244fd..a4dd3d7 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -423,6 +423,10 @@ These options may be removed in a future release.
# Backup
+#### Client
+
+{% include generated/client_configuration.html %}
+
#### Execution
{% include generated/deployment_configuration.html %}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 7218977..1e2bae0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -46,7 +46,6 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -141,7 +140,7 @@ public class CliFrontend {
customCommandLine.addRunOptions(customCommandLineOptions);
}
- this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
+ this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
new file mode 100644
index 0000000..063421e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/**
+ * Describes a client configuration parameter.
+ */
+@PublicEvolving
+public class ClientOptions {
+
+ public static final ConfigOption<Duration> CLIENT_TIMEOUT =
+ ConfigOptions.key("client.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDeprecatedKeys("akka.client.timeout") // the deprecated AkkaOptions.CLIENT_TIMEOUT
+ .withDescription("Timeout on the client side.");
+
+ public static final ConfigOption<Duration> CLIENT_RETRY_PERIOD =
+ ConfigOptions.key("client.retry-period")
+ .durationType()
+ .defaultValue(Duration.ofMillis(2000))
+ .withDescription("The interval (in ms) between consecutive retries of failed attempts to execute " +
+ "commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).");
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index 9ff1f54..2b4ddf3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -23,11 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutorServiceLoader;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobCancellationException;
@@ -263,8 +263,8 @@ public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap
final JobID jobId,
final ScheduledExecutor scheduledExecutor) {
- final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
- final Time retryPeriod = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_RETRY_PERIOD).toMillis());
+ final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
+ final Time retryPeriod = Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
return JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index febf3ae..aa2db40 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -22,9 +22,9 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
@@ -103,7 +103,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
}
private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
- final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
+ final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
final JobID actualJobId = jobGraph.getJobID();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
index 4c11786..51d8bbc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
@@ -21,9 +21,9 @@ package org.apache.flink.client.deployment.application.executors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -80,7 +80,7 @@ public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
submittedJobIds,
dispatcherGateway,
jobId -> {
- final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
+ final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
return new EmbeddedJobClient(jobId, dispatcherGateway, retryExecutor, timeout);
});
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java
new file mode 100644
index 0000000..9db564f
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/ClientOptionsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * unit test for ClientOptions.
+ */
+@RunWith(JUnit4.class)
+public class ClientOptionsTest {
+
+ @Test
+ public void testGetClientTimeout() {
+ Configuration configuration = new Configuration();
+ configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(10));
+
+ assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(10));
+
+ configuration = new Configuration();
+ configuration.set(AkkaOptions.CLIENT_TIMEOUT, "20 s");
+ assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), Duration.ofSeconds(20));
+
+ configuration = new Configuration();
+ assertEquals(configuration.get(ClientOptions.CLIENT_TIMEOUT), ClientOptions.CLIENT_TIMEOUT.defaultValue());
+ }
+
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index e9b3002..c553e0b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -156,12 +156,16 @@ public class AkkaOptions {
/**
* Timeout for all blocking calls on the client side.
+ *
+ * @deprecated Use the {@code ClientOptions.CLIENT_TIMEOUT} instead.
*/
+ @Deprecated
public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
.key("akka.client.timeout")
.stringType()
.defaultValue("60 s")
- .withDescription("Timeout for all blocking calls on the client side.");
+ .withDescription("DEPRECATED: Use the \"client.timeout\" instead." +
+ " Timeout for all blocking calls on the client side.");
/**
* Exit JVM on fatal Akka errors.
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f1e2de8..4566211 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -939,7 +939,7 @@ public final class ConfigConstants {
/**
* Timeout for all blocking calls on the client side.
*
- * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
+ * @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
@@ -1787,7 +1787,7 @@ public final class ConfigConstants {
public static final String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
/**
- * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
+ * @deprecated Use {@code ClientOptions#CLIENT_TIMEOUT} instead.
*/
@Deprecated
public static final String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
index cb34583..60f6cd8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
@@ -55,19 +55,4 @@ public class ExecutionOptions {
"throughput")
)
.build());
-
- public static final ConfigOption<Duration> EMBEDDED_RPC_TIMEOUT =
- ConfigOptions.key("execution.embedded-rpc-timeout")
- .durationType()
- .defaultValue(Duration.ofMillis(60 * 60 * 1000))
- .withDescription("The rpc timeout (in ms) when executing applications in \"Application Mode\". " +
- "This affects all rpc's available through the Job Client and job submission.");
-
- public static final ConfigOption<Duration> EMBEDDED_RPC_RETRY_PERIOD =
- ConfigOptions.key("execution.embedded-rpc-retry-period")
- .durationType()
- .defaultValue(Duration.ofMillis(2000))
- .withDescription("The retry period (in ms) between consecutive attempts to get the job status " +
- "when executing applications in \"Application Mode\".");
-
}
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index b98b848..ac6b933 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -77,7 +77,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"),
new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config"),
new OptionsClassLocation("flink-python", "org.apache.flink.python"),
- new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration")
+ new OptionsClassLocation("flink-kubernetes", "org.apache.flink.kubernetes.configuration"),
+ new OptionsClassLocation("flink-clients", "org.apache.flink.client.cli")
};
static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index f046a19..ea3b4bf 100755
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -793,10 +793,6 @@ object AkkaUtils {
TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT))
}
- def getClientTimeout(config: Configuration): time.Duration = {
- TimeUtils.parseDuration(config.getString(AkkaOptions.CLIENT_TIMEOUT))
- }
-
/** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains
* the port and the host under which the actor system is reachable
*
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 4cb7557..0aa6a5e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -105,7 +105,7 @@ class AkkaUtilsTest
val IPv4AddressString = "192.168.0.1"
val port = 1234
val address = new InetSocketAddress(IPv4AddressString, port)
-
+
val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"
val result = AkkaUtils.getInetSocketAddressFromAkkaURL(url)