You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/22 19:40:28 UTC
[1/8] flink git commit: Rebuild webUI
Repository: flink
Updated Branches:
refs/heads/master 9a9e193a1 -> 7efa8ad34
Rebuild webUI
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7efa8ad3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7efa8ad3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7efa8ad3
Branch: refs/heads/master
Commit: 7efa8ad3427011ed87d3570ba0647b881936a1d0
Parents: b3e1642
Author: zentol <ch...@apache.org>
Authored: Fri May 19 12:10:07 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html | 4 ++--
.../web-dashboard/web/partials/taskmanager/index.html | 2 +-
.../web/partials/taskmanager/taskmanager.metrics.html | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7efa8ad3/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
index c52d67f..79ff0d3 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
@@ -28,8 +28,8 @@ limitations under the License.
<nav class="navbar navbar-default navbar-secondary-additional">
<ul class="nav nav-tabs">
<li ui-sref-active="active"><a ui-sref=".subtasks({nodeid: nodeid})">Subtasks</a></li>
- <li ui-sref-active="active"><a ui-sref=".taskmanagers({nodeid: nodeid})">TaskManagers</a></li>
- <li ui-sref-active="active"><a ui-sref=".metrics({nodeid: nodeid})">Metrics</a></li>
+ <li ui-sref-active="active"><a ui-sref=".taskmanagers({nodeid: nodeid})">Subtasks by TaskManager</a></li>
+ <li ui-sref-active="active"><a ui-sref=".metrics({nodeid: nodeid})">Task Metrics</a></li>
<li ui-sref-active="active"><a ui-sref=".watermarks({nodeid: nodeid})">Watermarks</a></li>
<li ui-sref-active="active"><a ui-sref=".accumulators({nodeid: nodeid})">Accumulators</a></li>
<li ui-sref-active="active"><a ui-sref=".checkpoints({nodeid: nodeid})">Checkpoints</a></li>
http://git-wip-us.apache.org/repos/asf/flink/blob/7efa8ad3/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
index f56dcbe..94614d8 100644
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
@@ -32,7 +32,7 @@ limitations under the License.
<th>Free Slots</th>
<th>CPU Cores</th>
<th>Physical Memory</th>
- <th>Free Memory</th>
+ <th>JVM Heap Size</th>
<th>Flink Managed Memory</th>
</tr>
</thead>
http://git-wip-us.apache.org/repos/asf/flink/blob/7efa8ad3/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
index 3372db8..9be77e9 100644
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
@@ -27,7 +27,7 @@ limitations under the License.
<th>Free Slots</th>
<th>CPU Cores</th>
<th>Physical Memory</th>
- <th>Free Memory</th>
+ <th>JVM Heap Size</th>
<th>Flink Managed Memory</th>
</tr>
</thead>
[8/8] flink git commit: [FLINK-6450][web] Rename "TaskManagers" tab
to "Subtasks by Taskmanager"
Posted by ch...@apache.org.
[FLINK-6450][web] Rename "TaskManagers" tab to "Subtasks by Taskmanager"
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec21ce67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec21ce67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec21ce67
Branch: refs/heads/master
Commit: ec21ce6726be0c7bca50f744da75ec20d1e1fc46
Parents: b2db8e8
Author: zentol <ch...@apache.org>
Authored: Fri May 19 12:03:45 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec21ce67/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index 6c4cf0b..00a5889 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -28,7 +28,7 @@ split
a(ui-sref=".subtasks({nodeid: nodeid})") Subtasks
li(ui-sref-active='active')
- a(ui-sref=".taskmanagers({nodeid: nodeid})") TaskManagers
+ a(ui-sref=".taskmanagers({nodeid: nodeid})") Subtasks by TaskManager
li(ui-sref-active='active')
a(ui-sref=".metrics({nodeid: nodeid})") Metrics
[6/8] flink git commit: [hotfix] Fix @deprecated javadoc in
ConfigConstants
Posted by ch...@apache.org.
[hotfix] Fix @deprecated javadoc in ConfigConstants
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2db8e8e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2db8e8e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2db8e8e
Branch: refs/heads/master
Commit: b2db8e8e053006daeb290eaaaeb8466410af8333
Parents: 60fa0d4
Author: zentol <ch...@apache.org>
Authored: Mon May 22 16:40:07 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/configuration/ConfigConstants.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b2db8e8e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 65e6c76..476797e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1442,7 +1442,7 @@ public final class ConfigConstants {
/**
* Time after which cached stats are cleaned up.
*
- * @@deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead
+ * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead
*/
@Deprecated
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
[2/8] flink git commit: [FLINK-6451][web] Rename "Metrics" tab to
"Task Metrics"
Posted by ch...@apache.org.
[FLINK-6451][web] Rename "Metrics" tab to "Task Metrics"
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b89e2fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b89e2fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b89e2fd
Branch: refs/heads/master
Commit: 8b89e2fdb855d78cea8b08c060f884a4660187fe
Parents: ec21ce6
Author: zentol <ch...@apache.org>
Authored: Fri May 19 12:04:21 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8b89e2fd/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index 00a5889..c7d3142 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -31,7 +31,7 @@ split
a(ui-sref=".taskmanagers({nodeid: nodeid})") Subtasks by TaskManager
li(ui-sref-active='active')
- a(ui-sref=".metrics({nodeid: nodeid})") Metrics
+ a(ui-sref=".metrics({nodeid: nodeid})") Task Metrics
li(ui-sref-active='active')
a(ui-sref=".watermarks({nodeid: nodeid})") Watermarks
[3/8] flink git commit: [FLINK-6610][web] Allow uploadDir to be null
in WebFrontendBootstrap
Posted by ch...@apache.org.
[FLINK-6610][web] Allow uploadDir to be null in WebFrontendBootstrap
This closes #3947.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60fa0d4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60fa0d4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60fa0d4d
Branch: refs/heads/master
Commit: 60fa0d4da38eb0e724304903fe42471264ff7e2d
Parents: 302c674
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:24:09 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/webmonitor/HttpRequestHandler.java | 2 +-
.../flink/runtime/webmonitor/utils/WebFrontendBootstrap.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/60fa0d4d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index d14b7a2..bde9976 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -126,7 +126,7 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject>
// IF SOMETHING EVER NEEDS POST PARAMETERS, THIS WILL BE THE PLACE TO HANDLE IT
// all fields values will be passed with type Attribute.
- if (data.getHttpDataType() == HttpDataType.FileUpload) {
+ if (data.getHttpDataType() == HttpDataType.FileUpload && tmpDir != null) {
DiskFileUpload file = (DiskFileUpload) data;
if (file.isCompleted()) {
String name = file.getFilename();
http://git-wip-us.apache.org/repos/asf/flink/blob/60fa0d4d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 19ec08a..3b3d6cb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -62,7 +62,7 @@ public class WebFrontendBootstrap {
final Configuration config) throws InterruptedException {
this.router = Preconditions.checkNotNull(router);
this.log = Preconditions.checkNotNull(log);
- this.uploadDir = Preconditions.checkNotNull(directory);
+ this.uploadDir = directory;
this.serverSSLContext = sslContext;
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
[5/8] flink git commit: [FLINK-6495] Migrate Akka configuration
options
Posted by ch...@apache.org.
[FLINK-6495] Migrate Akka configuration options
This closes #3935.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302c6741
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302c6741
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302c6741
Branch: refs/heads/master
Commit: 302c6741f862f13c6ea3d5490a31fadc20e976c8
Parents: 9a9e193
Author: zjureel <zj...@gmail.com>
Authored: Thu May 18 12:34:40 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
.../client/program/ClientConnectionTest.java | 5 +-
.../apache/flink/client/program/ClientTest.java | 3 +-
.../org/apache/flink/storm/api/FlinkClient.java | 5 +-
.../apache/flink/configuration/AkkaOptions.java | 102 +++++++++++++++++--
.../flink/configuration/ConfigConstants.java | 92 +++++++++++++++++
.../MesosTaskManagerRunner.java | 3 +-
.../webmonitor/metrics/MetricFetcher.java | 4 +-
.../client/JobAttachmentClientActor.java | 4 +-
.../client/JobSubmissionClientActor.java | 4 +-
.../clusterframework/FlinkResourceManager.java | 4 +-
.../restart/FailureRateRestartStrategy.java | 3 +-
.../restart/FixedDelayRestartStrategy.java | 6 +-
.../restart/RestartStrategyFactory.java | 6 +-
.../runtime/query/QueryableStateClient.java | 8 +-
.../ResourceManagerConfiguration.java | 8 +-
.../slotmanager/SlotManagerConfiguration.java | 12 +--
.../runtime/rpc/akka/AkkaRpcServiceUtils.java | 6 +-
.../taskexecutor/TaskManagerConfiguration.java | 3 +-
.../apache/flink/runtime/akka/AkkaUtils.scala | 65 ++++--------
.../runtime/minicluster/FlinkMiniCluster.scala | 8 +-
.../clusterframework/ResourceManagerTest.java | 6 +-
.../PartialConsumePipelinedResultTest.java | 3 +-
.../runtime/jobmanager/JobManagerTest.java | 3 +-
...askManagerComponentsStartupShutdownTest.java | 7 +-
.../TaskManagerRegistrationTest.java | 9 +-
.../runtime/testutils/ZooKeeperTestUtils.java | 9 +-
.../flink/runtime/akka/AkkaSslITCase.scala | 6 +-
.../jobmanager/JobManagerConnectionTest.scala | 4 +-
.../runtime/jobmanager/RecoveryITCase.scala | 4 +-
.../runtime/testingUtils/TestingUtils.scala | 4 +-
.../apache/flink/test/util/TestBaseUtils.java | 11 +-
.../accumulators/AccumulatorLiveITCase.java | 3 +-
.../test/cancelling/CancelingTestBase.java | 3 +-
.../EventTimeAllWindowCheckpointingITCase.java | 5 +-
.../jar/StreamingCustomInputSplitProgram.java | 4 +-
.../RemoteEnvironmentITCase.java | 5 +-
...ctTaskManagerProcessFailureRecoveryTest.java | 11 +-
.../flink/test/recovery/ChaosMonkeyITCase.java | 7 +-
.../recovery/ProcessFailureCancelingITCase.java | 10 +-
.../TaskManagerFailureRecoveryITCase.java | 7 +-
.../ZooKeeperLeaderElectionITCase.java | 3 +-
.../flink/yarn/YarnTaskExecutorRunner.java | 3 +-
.../flink/yarn/YarnTaskManagerRunner.java | 3 +-
43 files changed, 324 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 246a75c..eb9f3c5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
@@ -92,8 +93,8 @@ public class ClientConnectionTest extends TestLogger {
private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception {
final Configuration config = new Configuration();
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms");
- config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms");
+ config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms");
+ config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms");
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index b7ade2a..13a2564 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
@@ -97,7 +98,7 @@ public class ClientTest extends TestLogger {
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+ config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
try {
scala.Tuple2<String, Object> address = new scala.Tuple2<String, Object>("localhost", freePort);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 21794f9..626335d 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -39,6 +39,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -269,7 +270,7 @@ public class FlinkClient {
JobID getTopologyJobId(final String id) {
final Configuration configuration = GlobalConfiguration.loadConfiguration();
if (this.timeout != null) {
- configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout);
}
try {
@@ -309,7 +310,7 @@ public class FlinkClient {
private FiniteDuration getTimeout() {
final Configuration configuration = GlobalConfiguration.loadConfiguration();
if (this.timeout != null) {
- configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, this.timeout);
}
return AkkaUtils.getClientTimeout(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 97b209e..9bfc237 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -29,30 +29,114 @@ import org.apache.flink.annotation.PublicEvolving;
public class AkkaOptions {
/**
- * Timeout for akka ask calls
+ * Timeout for akka ask calls.
*/
- public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions
+ public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
.key("akka.ask.timeout")
.defaultValue("10 s");
/**
+ * The Akka death watch heartbeat interval.
+ */
+ public static final ConfigOption<String> WATCH_HEARTBEAT_INTERVAL = ConfigOptions
+ .key("akka.watch.heartbeat.interval")
+ .defaultValue(ASK_TIMEOUT.defaultValue());
+
+ /**
+ * The maximum acceptable Akka death watch heartbeat pause.
+ */
+ public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE = ConfigOptions
+ .key("akka.watch.heartbeat.pause")
+ .defaultValue(ASK_TIMEOUT.defaultValue());
+
+ /**
* The Akka tcp connection timeout.
*/
- public static final ConfigOption<String> AKKA_TCP_TIMEOUT = ConfigOptions
+ public static final ConfigOption<String> TCP_TIMEOUT = ConfigOptions
.key("akka.tcp.timeout")
.defaultValue("20 s");
/**
- * The Akka death watch heartbeat interval.
+ * Timeout for the startup of the actor system.
*/
- public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_INTERVAL = ConfigOptions
- .key("akka.watch.heartbeat.interval")
+ public static final ConfigOption<String> STARTUP_TIMEOUT = ConfigOptions
+ .key("akka.startup-timeout")
+ .noDefaultValue();
+
+ /**
+ * Heartbeat interval of the transport failure detector.
+ */
+ public static final ConfigOption<String> TRANSPORT_HEARTBEAT_INTERVAL = ConfigOptions
+ .key("akka.transport.heartbeat.interval")
+ .defaultValue("1000 s");
+
+ /**
+ * Allowed heartbeat pause for the transport failure detector.
+ */
+ public static final ConfigOption<String> TRANSPORT_HEARTBEAT_PAUSE = ConfigOptions
+ .key("akka.transport.heartbeat.pause")
+ .defaultValue("6000 s");
+
+ /**
+ * Detection threshold of transport failure detector.
+ */
+ public static final ConfigOption<Double> TRANSPORT_THRESHOLD = ConfigOptions
+ .key("akka.transport.threshold")
+ .defaultValue(300.0);
+
+ /**
+ * Detection threshold for the phi accrual watch failure detector.
+ */
+ public static final ConfigOption<Integer> WATCH_THRESHOLD = ConfigOptions
+ .key("akka.watch.threshold")
+ .defaultValue(12);
+
+ /**
+ * Override SSL support for the Akka transport.
+ */
+ public static final ConfigOption<Boolean> SSL_ENABLED = ConfigOptions
+ .key("akka.ssl.enabled")
+ .defaultValue(true);
+
+ /**
+ * Maximum framesize of akka messages.
+ */
+ public static final ConfigOption<String> FRAMESIZE = ConfigOptions
+ .key("akka.framesize")
+ .defaultValue("10485760b");
+
+ /**
+ * Maximum number of messages until another actor is executed by the same thread.
+ */
+ public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT = ConfigOptions
+ .key("akka.throughput")
+ .defaultValue(15);
+
+ /**
+ * Log lifecycle events.
+ */
+ public static final ConfigOption<Boolean> LOG_LIFECYCLE_EVENTS = ConfigOptions
+ .key("akka.log.lifecycle.events")
+ .defaultValue(false);
+
+ /**
+ * Timeout for all blocking calls that look up remote actors.
+ */
+ public static final ConfigOption<String> LOOKUP_TIMEOUT = ConfigOptions
+ .key("akka.lookup.timeout")
.defaultValue("10 s");
/**
- * The maximum acceptable Akka death watch heartbeat pause.
+ * Timeout for all blocking calls on the client side.
*/
- public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_PAUSE = ConfigOptions
- .key("akka.watch.heartbeat.pause")
+ public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
+ .key("akka.client.timeout")
.defaultValue("60 s");
+
+ /**
+ * Exit JVM on fatal Akka errors.
+ */
+ public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR = ConfigOptions
+ .key("akka.jvm-exit-on-fatal-error")
+ .defaultValue(true);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b5b5486..65e6c76 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -708,82 +708,130 @@ public final class ConfigConstants {
/**
* Timeout for the startup of the actor system
+ *
+ * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
*/
+ @Deprecated
public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";
/**
* Heartbeat interval of the transport failure detector
+ *
+ * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
*/
+ @Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval";
/**
* Allowed heartbeat pause for the transport failure detector
+ *
+ * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
*/
+ @Deprecated
public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = "akka.transport.heartbeat.pause";
/**
* Detection threshold of transport failure detector
+ *
+ * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
*/
+ @Deprecated
public static final String AKKA_TRANSPORT_THRESHOLD = "akka.transport.threshold";
/**
* Heartbeat interval of watch failure detector
+ *
+ * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead.
*/
+ @Deprecated
public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = "akka.watch.heartbeat.interval";
/**
* Allowed heartbeat pause for the watch failure detector
+ *
+ * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead.
*/
+ @Deprecated
public static final String AKKA_WATCH_HEARTBEAT_PAUSE = "akka.watch.heartbeat.pause";
/**
* Detection threshold for the phi accrual watch failure detector
+ *
+ * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
*/
+ @Deprecated
public static final String AKKA_WATCH_THRESHOLD = "akka.watch.threshold";
/**
* Akka TCP timeout
+ *
+ * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
*/
+ @Deprecated
public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout";
/**
* Override SSL support for the Akka transport
+ *
+ * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
*/
+ @Deprecated
public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled";
/**
* Maximum framesize of akka messages
+ *
+ * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
*/
+ @Deprecated
public static final String AKKA_FRAMESIZE = "akka.framesize";
/**
* Maximum number of messages until another actor is executed by the same thread
+ *
+ * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
*/
+ @Deprecated
public static final String AKKA_DISPATCHER_THROUGHPUT = "akka.throughput";
/**
* Log lifecycle events
+ *
+ * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
*/
+ @Deprecated
public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events";
/**
* Timeout for all blocking calls on the cluster side
+ *
+ * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
*/
+ @Deprecated
public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
/**
* Timeout for all blocking calls that look up remote actors
+ *
+ * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
*/
+ @Deprecated
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
/**
* Timeout for all blocking calls on the client side
+ *
+ * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
*/
+ @Deprecated
public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
/**
* Exit JVM on fatal Akka errors
+ *
+ * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
*/
+ @Deprecated
public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
// ----------------------------- Transport SSL Settings--------------------
@@ -1425,26 +1473,70 @@ public final class ConfigConstants {
// ------------------------------ Akka Values ------------------------------
+ /**
+ * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} instead.
+ */
+ @Deprecated
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
+ /**
+ * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} instead.
+ */
+ @Deprecated
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
+ /**
+ * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
+ */
+ @Deprecated
public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
+ /**
+ * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
+ */
+ @Deprecated
public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
+ /**
+ * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
+ */
+ @Deprecated
public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
+ /**
+ * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
+ */
+ @Deprecated
public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
+ /**
+ * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
+ */
+ @Deprecated
public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
+ /**
+ * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
+ */
+ @Deprecated
public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
+ /**
+ * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
+ */
+ @Deprecated
public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
+ /**
+ * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
+ */
+ @Deprecated
public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
+ /**
+ * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
+ */
+ @Deprecated
public static boolean DEFAULT_AKKA_SSL_ENABLED = true;
// ----------------------------- SSL Values --------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 206c71b..625880b 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -108,7 +109,7 @@ public class MesosTaskManagerRunner {
}
// tell akka to die in case of an error
- configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+ configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
// Infer the resource identifier from the environment variable
String containerID = Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 4f92148..c0dcc99 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -23,7 +23,7 @@ import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -61,7 +61,7 @@ public class MetricFetcher {
private final ActorSystem actorSystem;
private final JobManagerRetriever retriever;
private final ExecutionContext ctx;
- private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
+ private final FiniteDuration timeout = new FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS);
private MetricStore metrics = new MetricStore();
private MetricDumpDeserializer deserializer = new MetricDumpDeserializer();
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
index ffab9cc..9451e20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -23,7 +23,7 @@ import akka.actor.Props;
import akka.actor.Status;
import akka.dispatch.Futures;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
@@ -114,7 +114,7 @@ public class JobAttachmentClientActor extends JobClientActor {
client.tell(
decorateMessage(new Status.Failure(
new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " +
- "timed out. " + "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT +
+ "timed out. " + "You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() +
"' in case the JobManager needs more time to confirm the job client registration."))),
getSelf());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index a3fee21..babb0f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -22,7 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.dispatch.Futures;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -119,7 +119,7 @@ public class JobSubmissionClientActor extends JobClientActor {
client.tell(
decorateMessage(new Status.Failure(
new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " +
- "You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " +
+ "You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager " +
"needs more time to configure and confirm the job submission."))),
getSelf());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 77dbad4..f9c39c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -26,7 +26,7 @@ import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
@@ -148,7 +148,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
}
catch (Exception e) {
lt = new FiniteDuration(
- Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(),
+ Duration.apply(AkkaOptions.LOOKUP_TIMEOUT.defaultValue()).toMillis(),
TimeUnit.MILLISECONDS);
}
this.messageTimeout = lt;
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index d95e1c3..36d81e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
@@ -92,7 +93,7 @@ public class FailureRateRestartStrategy implements RestartStrategy {
String failuresIntervalString = configuration.getString(
ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
);
- String timeoutString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+ String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);
Duration failuresInterval = Duration.apply(failuresIntervalString);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index f51ea7c..2b62c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph.restart;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
@@ -72,8 +73,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
String timeoutString = configuration.getString(
- ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
- ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+ AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
String delayString = configuration.getString(
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
@@ -87,7 +87,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
} catch (NumberFormatException nfe) {
if (delayString.equals(timeoutString)) {
throw new Exception("Invalid config value for " +
- ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
+ AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + timeoutString +
". Value must be a valid duration (such as '10 s' or '1 min')");
} else {
throw new Exception("Invalid config value for " +
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 27ee9b6..d1f547f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
@@ -88,8 +89,7 @@ public abstract class RestartStrategyFactory implements Serializable {
// support deprecated ConfigConstants values
final int numberExecutionRetries = configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY,
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
- String pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
- ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+ String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
String delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
pauseString);
@@ -100,7 +100,7 @@ public abstract class RestartStrategyFactory implements Serializable {
} catch (NumberFormatException nfe) {
if (delayString.equals(pauseString)) {
throw new Exception("Invalid config value for " +
- ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + pauseString +
+ AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
". Value must be a valid duration (such as '10 s' or '1 min')");
} else {
throw new Exception("Invalid config value for " +
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 9b05273..003d803 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -24,7 +24,7 @@ import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.QueryableStateOptions;
@@ -114,13 +114,11 @@ public class QueryableStateClient {
LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
// Get the ask timeout
- String askTimeoutString = config.getString(
- ConfigConstants.AKKA_ASK_TIMEOUT,
- ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+ String askTimeoutString = config.getString(AkkaOptions.ASK_TIMEOUT);
Duration timeout = FiniteDuration.apply(askTimeoutString);
if (!timeout.isFinite()) {
- throw new IllegalConfigurationException(ConfigConstants.AKKA_ASK_TIMEOUT
+ throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key()
+ " is not a finite timeout ('" + askTimeoutString + "')");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index 2c64f08..0216789 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -53,24 +53,24 @@ public class ResourceManagerConfiguration {
// --------------------------------------------------------------------------
public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
- final String strTimeout = configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT);
+ final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time timeout;
try {
timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's timeout " +
- "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', e);
+ "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
}
- final String strHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
+ final String strHeartbeatInterval = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
final Time heartbeatInterval;
try {
heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " +
- "value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
+ "value " + AkkaOptions.WATCH_HEARTBEAT_INTERVAL + '.', e);
}
return new ResourceManagerConfiguration(timeout, heartbeatInterval);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index a651168..75cad07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -19,9 +19,7 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;
@@ -55,18 +53,14 @@ public class SlotManagerConfiguration {
}
public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
- ConfigOption<String> timeoutOption = ConfigOptions
- .key(ConfigConstants.AKKA_ASK_TIMEOUT)
- .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
-
- final String strTimeout = configuration.getString(timeoutOption);
+ final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
final Time timeout;
try {
timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's timeout " +
- "value " + timeoutOption + '.', e);
+ "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
}
return new SlotManagerConfiguration(timeout, timeout, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 8789eed..810efff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -130,9 +130,7 @@ public class AkkaRpcServiceUtils {
checkNotNull(config, "config is null");
- final boolean sslEnabled = config.getBoolean(
- ConfigConstants.AKKA_SSL_ENABLED,
- ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+ final boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.getSSLEnabled(config);
return getRpcUrl(
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index a6e4748..ea9f576 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -146,7 +147,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
} catch (Exception e) {
throw new IllegalArgumentException(
- "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT +
+ "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +
"'.Use formats like '50 s' or '1 min' to specify the timeout.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 62fa73d..60a33ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -184,13 +184,11 @@ object AkkaUtils {
* @return Flink's basic Akka config
*/
private def getBasicAkkaConfig(configuration: Configuration): Config = {
- val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
- ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
- val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
- ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+ val akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT)
+ val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
val jvmExitOnFatalError = if (
- configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true)){
+ configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)){
"on"
} else {
"off"
@@ -269,48 +267,36 @@ object AkkaUtils {
bindAddress: String, port: Int,
externalHostname: String, externalPort: Int): Config = {
- val akkaAskTimeout = Duration(configuration.getString(
- ConfigConstants.AKKA_ASK_TIMEOUT,
- ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
+ val akkaAskTimeout = Duration(configuration.getString(AkkaOptions.ASK_TIMEOUT))
val startupTimeout = configuration.getString(
- ConfigConstants.AKKA_STARTUP_TIMEOUT,
+ AkkaOptions.STARTUP_TIMEOUT,
(akkaAskTimeout * 10).toString)
val transportHeartbeatInterval = configuration.getString(
- ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
- ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL)
+ AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL)
val transportHeartbeatPause = configuration.getString(
- ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE,
- ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE)
+ AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE)
- val transportThreshold = configuration.getDouble(
- ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
- ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
+ val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)
- val watchHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
+ val watchHeartbeatInterval = configuration.getString(
+ AkkaOptions.WATCH_HEARTBEAT_INTERVAL)
- val watchHeartbeatPause = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_PAUSE);
+ val watchHeartbeatPause = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE)
- val watchThreshold = configuration.getDouble(
- ConfigConstants.AKKA_WATCH_THRESHOLD,
- ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
+ val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD)
- val akkaTCPTimeout = configuration.getString(AkkaOptions.AKKA_TCP_TIMEOUT);
+ val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT)
- val akkaFramesize = configuration.getString(
- ConfigConstants.AKKA_FRAMESIZE,
- ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
+ val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE)
- val lifecycleEvents = configuration.getBoolean(
- ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
- ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+ val lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
- val akkaEnableSSLConfig = configuration.getBoolean(ConfigConstants.AKKA_SSL_ENABLED,
- ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+ val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.getSSLEnabled(configuration)
val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
@@ -588,14 +574,13 @@ object AkkaUtils {
}
def getTimeout(config: Configuration): FiniteDuration = {
- val duration = Duration(config.getString(ConfigConstants.AKKA_ASK_TIMEOUT,
- ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
+ val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
def getDefaultTimeout: Time = {
- val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+ val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue())
Time.milliseconds(duration.toMillis)
}
@@ -607,30 +592,24 @@ object AkkaUtils {
}
def getLookupTimeout(config: Configuration): FiniteDuration = {
- val duration = Duration(config.getString(
- ConfigConstants.AKKA_LOOKUP_TIMEOUT,
- ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT))
+ val duration = Duration(config.getString(AkkaOptions.LOOKUP_TIMEOUT))
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
def getDefaultLookupTimeout: FiniteDuration = {
- val duration = Duration(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT)
+ val duration = Duration(AkkaOptions.LOOKUP_TIMEOUT.defaultValue())
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
def getClientTimeout(config: Configuration): FiniteDuration = {
- val duration = Duration(
- config.getString(
- ConfigConstants.AKKA_CLIENT_TIMEOUT,
- ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT
- ))
+ val duration = Duration(config.getString(AkkaOptions.CLIENT_TIMEOUT))
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
def getDefaultClientTimeout: FiniteDuration = {
- val duration = Duration(ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT)
+ val duration = Duration(AkkaOptions.CLIENT_TIMEOUT.defaultValue())
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 2ace8db..abc8946 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -265,9 +265,9 @@ abstract class FlinkMiniCluster(
// https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables
if (sys.env.contains("CI")) {
// Only set if nothing specified in config
- if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) {
- val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, s"${duration.toSeconds}s")
+ if (!config.contains(AkkaOptions.ASK_TIMEOUT)) {
+ val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) * 10
+ config.setString(AkkaOptions.ASK_TIMEOUT, s"${duration.toSeconds}s")
LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 5aa31ff..f1bc43b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
@@ -192,7 +192,7 @@ public class ResourceManagerTest extends TestLogger {
// set a short timeout for lookups
Configuration shortTimeoutConfig = config.clone();
- shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
+ shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "1 s");
fakeJobManager = TestingUtils.createForwardingActor(
system,
@@ -234,7 +234,7 @@ public class ResourceManagerTest extends TestLogger {
// set a long timeout for lookups such that the test fails in case of timeouts
Configuration shortTimeoutConfig = config.clone();
- shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
+ shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "99999 s");
fakeJobManager = TestingUtils.createForwardingActor(
system,
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f19ca4e..0346e48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -51,7 +52,7 @@ public class PartialConsumePipelinedResultTest {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
flink = new TestingCluster(config, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index c8459e7..1a4396e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.typesafe.config.Config;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
@@ -601,7 +602,7 @@ public class JobManagerTest extends TestLogger {
Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
Configuration config = new Configuration();
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
+ config.setString(AkkaOptions.ASK_TIMEOUT, "100ms");
ActorRef jobManagerActor = JobManager.startJobManagerActors(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 9dcfc70..7234fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
@@ -77,9 +78,9 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
final int BUFFER_SIZE = 32 * 1024;
Configuration config = new Configuration();
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "1 s");
- config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1);
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "1 s");
+ config.setInteger(AkkaOptions.WATCH_THRESHOLD, 1);
ActorSystem actorSystem = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 0844aad..3953072 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
import akka.actor.InvalidActorNameException;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -90,10 +91,10 @@ public class TaskManagerRegistrationTest extends TestLogger {
@BeforeClass
public static void startActorSystem() {
config = new Configuration();
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
- config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+ config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s");
+ config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2);
actorSystem = AkkaUtils.createLocalActorSystem(config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 42338cd..48eb392 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.testutils;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -86,10 +87,10 @@ public class ZooKeeperTestUtils {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
// Akka failure detection and execution retries
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
- config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
+ config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
+ config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");
return config;
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 9f8e3e1..daf0f47 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, ScalaTestingUtils}
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -119,7 +119,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s")
+ config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore")
@@ -141,7 +141,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s")
+ config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 97a001d..6d7d87c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -23,7 +23,7 @@ import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.util.NetUtils
import org.junit.Assert._
@@ -122,7 +122,7 @@ class JobManagerConnectionTest {
private def createConfigWithLowTimeout() : Configuration = {
val config = new Configuration()
- config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT,
+ config.setString(AkkaOptions.LOOKUP_TIMEOUT,
Duration(timeout, TimeUnit.MILLISECONDS).toSeconds + " s")
config
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index f3ab409..4fc4042 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
@@ -60,7 +60,7 @@ class RecoveryITCase(_system: ActorSystem)
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay")
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1)
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, heartbeatTimeout)
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index c8977f0..858bbbb 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors
import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logger
import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
+import org.apache.flink.configuration._
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -89,7 +89,7 @@ object TestingUtils {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+ config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
val cluster = new TestingCluster(config)
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 437dd5f..5f6f5c4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -146,8 +147,8 @@ public class TestBaseUtils extends TestLogger {
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE);
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
- config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
+ config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
+ config.setString(AkkaOptions.STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
config.setInteger(JobManagerOptions.WEB_PORT, 8081);
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
@@ -287,7 +288,7 @@ public class TestBaseUtils extends TestLogger {
String resultPath,
String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException {
-
+
checkArgument(resultPath != null, "resultPath cannot be be null");
final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles);
@@ -328,8 +329,8 @@ public class TestBaseUtils extends TestLogger {
String msg = String.format(
"Different elements in arrays: expected %d elements and received %d\n" +
"files: %s\n expected: %s\n received: %s",
- expected.length, result.length,
- Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)),
+ expected.length, result.length,
+ Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)),
Arrays.toString(expected), Arrays.toString(result));
fail(msg);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49ff744..92e5768 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
@@ -120,7 +121,7 @@ public class AccumulatorLiveITCase extends TestLogger {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
TestingCluster testingCluster = new TestingCluster(config, false, true);
testingCluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 06233d6..2767312 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -20,6 +20,7 @@
package org.apache.flink.test.cancelling;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -88,7 +89,7 @@ public abstract class CancelingTestBase extends TestLogger {
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+ config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index a573be6..bda1679 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -74,8 +75,8 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
- config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
- config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
+ config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
+ config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index e7bd522..4905d43 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -44,7 +44,7 @@ public class StreamingCustomInputSplitProgram {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
- config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+ config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 7c6f73a..85961db 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
@@ -78,7 +79,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
@Test(expected=FlinkException.class)
public void testInvalidAkkaConfiguration() throws Throwable {
Configuration config = new Configuration();
- config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
+ config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
cluster.getHostname(),
@@ -103,7 +104,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
@Test
public void testUserSpecificParallelism() throws Exception {
Configuration config = new Configuration();
- config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
+ config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
cluster.getHostname(),
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index c7c07ce..5c65a7f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,6 +24,7 @@ import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
@@ -127,11 +128,11 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
Configuration jmConfig = new Configuration();
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
- jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+ jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+ jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
+ jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
- jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+ jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
@@ -409,7 +410,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
- cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+ cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
ResourceID.generate(), TaskManager.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 6d53b9f..c8c8d2a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -156,9 +157,9 @@ public class ChaosMonkeyITCase extends TestLogger {
ZooKeeper.getConnectString(), FileStateBackendBasePath.toURI().toString());
// Akka and restart timeouts
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
- config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
+ config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
if (checkpointingIntervalMs >= killEvery.toMillis()) {
throw new IllegalArgumentException("Relax! You want to kill processes every " +
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 9d2806c..59d5a51 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -101,10 +101,10 @@ public class ProcessFailureCancelingITCase extends TestLogger {
Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
Configuration jmConfig = new Configuration();
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s");
- jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s");
- jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10);
- jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+ jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
+ jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s");
+ jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
+ jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1());
jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index bafdd9f..93d369a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -77,9 +78,9 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
- config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
- config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");
+ config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "20 s");
+ config.setInteger(AkkaOptions.WATCH_THRESHOLD, 20);
cluster = new LocalFlinkMiniCluster(config, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 40a8f09..37e89e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import akka.actor.Kill;
import akka.actor.PoisonPill;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -151,7 +152,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
- configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString());
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString());
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
[4/8] flink git commit: [FLINK-6495] Migrate Akka configuration
options
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 1da52d4..398a5eb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -127,7 +128,7 @@ public class YarnTaskExecutorRunner {
}
// tell akka to die in case of an error
- configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+ configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
String keytabPath = null;
if(remoteKeytabPath != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 849a8a6..047a1fa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
@@ -91,7 +92,7 @@ public class YarnTaskManagerRunner {
}
// tell akka to die in case of an error
- configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+ configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
String localKeytabPath = null;
if(remoteKeytabPath != null) {
[7/8] flink git commit: [FLINK-6448][web] Rename "Free Memory" field
to "JVM Heap Size"
Posted by ch...@apache.org.
[FLINK-6448][web] Rename "Free Memory" field to "JVM Heap Size"
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3e1642f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3e1642f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3e1642f
Branch: refs/heads/master
Commit: b3e1642f33e9a6e3df523aae5c4b04205f1151d8
Parents: 8b89e2f
Author: zentol <ch...@apache.org>
Authored: Fri May 19 12:05:10 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon May 22 20:28:22 2017 +0200
----------------------------------------------------------------------
.../web-dashboard/app/partials/taskmanager/index.jade | 2 +-
.../app/partials/taskmanager/taskmanager.metrics.jade | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b3e1642f/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade b/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade
index a2b715d..3251fb4 100644
--- a/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade
@@ -33,7 +33,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
th Free Slots
th CPU Cores
th Physical Memory
- th Free Memory
+ th JVM Heap Size
th Flink Managed Memory
tbody
http://git-wip-us.apache.org/repos/asf/flink/blob/b3e1642f/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
index e348a5b..fbf170d 100644
--- a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
@@ -25,7 +25,7 @@ div(ng-if="metrics.id")
th Free Slots
th CPU Cores
th Physical Memory
- th Free Memory
+ th JVM Heap Size
th Flink Managed Memory
tbody
tr