You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/06 13:24:17 UTC
[3/7] flink git commit: [FLINK-8835] [taskmanager] Cleanup
TaskManager config keys
[FLINK-8835] [taskmanager] Cleanup TaskManager config keys
This closes #5808.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e2581e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e2581e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e2581e0
Branch: refs/heads/release-1.5
Commit: 9e2581e0443ff47124de41a8cdcd9c18e64b0fab
Parents: c8e0a31
Author: zhangminglei <zm...@163.com>
Authored: Wed Apr 4 17:05:22 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 20 +++----
.../flink/configuration/TaskManagerOptions.java | 55 +++++++++++---------
.../clusterframework/BootstrapTools.java | 2 +-
.../runtime/io/network/netty/NettyConfig.java | 2 +-
.../taskexecutor/TaskManagerConfiguration.java | 16 +++---
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../TaskManagerRegistrationTest.java | 8 +--
.../runtime/io/InputProcessorUtil.java | 2 +-
.../jobmanager/JobManagerFailsITCase.scala | 4 +-
9 files changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b716d9e..f148360 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -299,7 +299,7 @@ public final class ConfigConstants {
/**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
*
- * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+ * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
*/
@Deprecated
public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread";
@@ -316,7 +316,7 @@ public final class ConfigConstants {
* Defines the maximum time it can take for the TaskManager registration. If the duration is
* exceeded without a successful registration, then the TaskManager terminates.
*
- * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+ * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
*/
@Deprecated
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
@@ -325,7 +325,7 @@ public final class ConfigConstants {
* The initial registration pause between two consecutive registration attempts. The pause
* is doubled for each new registration attempt until it reaches the maximum registration pause.
*
- * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+ * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
*/
@Deprecated
public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause";
@@ -333,7 +333,7 @@ public final class ConfigConstants {
/**
* The maximum registration pause between two consecutive registration attempts.
*
- * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+ * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
*/
@Deprecated
public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause";
@@ -341,7 +341,7 @@ public final class ConfigConstants {
/**
* The pause after a registration has been refused by the job manager before retrying to connect.
*
- * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+ * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
*/
@Deprecated
public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause";
@@ -1441,7 +1441,7 @@ public final class ConfigConstants {
/**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
*
- * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+ * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
*/
@Deprecated
public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -1457,7 +1457,7 @@ public final class ConfigConstants {
/**
* The default task manager's maximum registration duration.
*
- * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+ * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
*/
@Deprecated
public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
@@ -1465,7 +1465,7 @@ public final class ConfigConstants {
/**
* The default task manager's initial registration pause.
*
- * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+ * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
*/
@Deprecated
public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
@@ -1473,7 +1473,7 @@ public final class ConfigConstants {
/**
* The default task manager's maximum registration pause.
*
- * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+ * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
*/
@Deprecated
public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s";
@@ -1481,7 +1481,7 @@ public final class ConfigConstants {
/**
* The default task manager's refused registration pause.
*
- * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+ * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
*/
@Deprecated
public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c7b0782..2bd3091 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -104,40 +104,44 @@ public class TaskManagerOptions {
" global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
/**
- * The initial registration pause between two consecutive registration attempts. The pause
- * is doubled for each new registration attempt until it reaches the maximum registration pause.
+ * The initial registration backoff between two consecutive registration attempts. The backoff
+ * is doubled for each new registration attempt until it reaches the maximum registration backoff.
*/
- public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE =
- key("taskmanager.initial-registration-pause")
+ public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
+ key("taskmanager.registration.initial-backoff")
.defaultValue("500 ms")
- .withDescription("The initial registration pause between two consecutive registration attempts. The pause" +
- " is doubled for each new registration attempt until it reaches the maximum registration pause.");
+ .withDeprecatedKeys("taskmanager.initial-registration-pause")
+ .withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
+ " is doubled for each new registration attempt until it reaches the maximum registration backoff.");
/**
- * The maximum registration pause between two consecutive registration attempts.
+ * The maximum registration backoff between two consecutive registration attempts.
*/
- public static final ConfigOption<String> MAX_REGISTRATION_PAUSE =
- key("taskmanager.max-registration-pause")
+ public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
+ key("taskmanager.registration.max-backoff")
.defaultValue("30 s")
- .withDescription("The maximum registration pause between two consecutive registration attempts. The max" +
- " registration pause requires a time unit specifier (ms/s/min/h/d).");
+ .withDeprecatedKeys("taskmanager.max-registration-pause")
+ .withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
+ " registration backoff requires a time unit specifier (ms/s/min/h/d).");
/**
- * The pause after a registration has been refused by the job manager before retrying to connect.
+ * The backoff after a registration has been refused by the job manager before retrying to connect.
*/
- public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE =
- key("taskmanager.refused-registration-pause")
+ public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
+ key("taskmanager.registration.refused-backoff")
.defaultValue("10 s")
- .withDescription("The pause after a registration has been refused by the job manager before retrying to connect.");
+ .withDeprecatedKeys("taskmanager.refused-registration-pause")
+ .withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");
/**
- * Defines the maximum time it can take for the TaskManager registration. If the duration is
+ * Defines the timeout it can take for the TaskManager registration. If the duration is
* exceeded without a successful registration, then the TaskManager terminates.
*/
- public static final ConfigOption<String> MAX_REGISTRATION_DURATION =
- key("taskmanager.maxRegistrationDuration")
+ public static final ConfigOption<String> REGISTRATION_TIMEOUT =
+ key("taskmanager.registration.timeout")
.defaultValue("Inf")
- .withDescription("Defines the maximum time it can take for the TaskManager registration. If the duration is" +
+ .withDeprecatedKeys("taskmanager.maxRegistrationDuration")
+ .withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
" exceeded without a successful registration, then the TaskManager terminates.");
/**
@@ -153,14 +157,16 @@ public class TaskManagerOptions {
" is typically proportional to the number of physical CPU cores that the TaskManager's machine has" +
" (e.g., equal to the number of cores, or half the number of cores).");
- public static final ConfigOption<Boolean> DEBUG_MEMORY_USAGE_START_LOG_THREAD =
- key("taskmanager.debug.memory.startLogThread")
+ public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
+ key("taskmanager.debug.memory.log")
.defaultValue(false)
+ .withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
.withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.");
public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
- key("taskmanager.debug.memory.logIntervalMs")
+ key("taskmanager.debug.memory.log-interval")
.defaultValue(5000L)
+ .withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
.withDescription("The interval (in ms) for the log thread to log the current memory usage.");
// ------------------------------------------------------------------------
@@ -321,9 +327,10 @@ public class TaskManagerOptions {
* credit-based flow control.
*/
@Deprecated
- public static final ConfigOption<Boolean> NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED =
- key("taskmanager.network.credit-based-flow-control.enabled")
+ public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
+ key("taskmanager.network.credit-model")
.defaultValue(true)
+ .withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
.withDescription("Boolean flag to enable/disable network credit-based flow control.");
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index eab7382..102274d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -245,7 +245,7 @@ public class BootstrapTools {
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
}
- cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, registrationTimeout.toString());
+ cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
if (numSlots != -1){
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 8572361..18527c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -225,7 +225,7 @@ public class NettyConfig {
}
public boolean isCreditBasedEnabled() {
- return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED);
+ return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index cb6fe51..1bf42ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -185,7 +185,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
final Time finiteRegistrationDuration;
try {
- Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION));
+ Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
if (maxRegistrationDuration.isFinite()) {
finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
} else {
@@ -193,12 +193,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
- TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e);
+ TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
}
final Time initialRegistrationPause;
try {
- Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE));
+ Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
if (pause.isFinite()) {
initialRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
@@ -206,13 +206,13 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
- TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+ TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
}
final Time maxRegistrationPause;
try {
Duration pause = Duration.create(configuration.getString(
- TaskManagerOptions.MAX_REGISTRATION_PAUSE));
+ TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
if (pause.isFinite()) {
maxRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
@@ -220,12 +220,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
- TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+ TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
}
final Time refusedRegistrationPause;
try {
- Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE));
+ Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
if (pause.isFinite()) {
refusedRegistrationPause = Time.milliseconds(pause.toMillis());
} else {
@@ -233,7 +233,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
- TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+ TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
}
final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0aaeae3..071a333 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1884,7 +1884,7 @@ object TaskManager {
// if desired, start the logging daemon that periodically logs the
// memory usage information
if (LOG.isInfoEnabled && configuration.getBoolean(
- TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+ TaskManagerOptions.DEBUG_MEMORY_LOG))
{
LOG.info("Starting periodic memory usage logger")
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 6b65095..ad32a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -267,7 +267,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
try {
// registration timeout of 1 second
Configuration tmConfig = new Configuration();
- tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms");
+ tmConfig.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms");
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -325,7 +325,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
Configuration tmConfig = new Configuration(config);
- tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
+ tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause.toString());
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -407,8 +407,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
long maxDelay = 30000;
Configuration tmConfig = new Configuration(config);
- tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms");
- tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms");
+ tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause + " ms");
+ tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, initialRegistrationPause + " ms");
// we make the test actor (the test kit) the JobManager to intercept
// the messages
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 1ae34b3..d1c5b72 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -51,7 +51,7 @@ public class InputProcessorUtil {
+ " must be positive or -1 (infinite)");
}
- if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED)) {
+ if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
} else {
barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1a3419b..5fe7b1d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setInteger(JobManagerOptions.PORT, 0)
- config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms")
- config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms")
+ config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
+ config.setString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF, "100 ms")
val cluster = new TestingCluster(config, singleActorSystem = false)