You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/06 13:23:50 UTC

[5/7] flink git commit: [FLINK-8835] [taskmanager] Cleanup TaskManager config keys

[FLINK-8835] [taskmanager] Cleanup TaskManager config keys

This closes #5808.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b0f590c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b0f590c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b0f590c

Branch: refs/heads/master
Commit: 8b0f590c52d698b3439a2c3524889802c893e985
Parents: cdd2022
Author: zhangminglei <zm...@163.com>
Authored: Wed Apr 4 17:05:22 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 6 15:23:02 2018 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 20 +++----
 .../flink/configuration/TaskManagerOptions.java | 55 +++++++++++---------
 .../clusterframework/BootstrapTools.java        |  2 +-
 .../runtime/io/network/netty/NettyConfig.java   |  2 +-
 .../taskexecutor/TaskManagerConfiguration.java  | 16 +++---
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../TaskManagerRegistrationTest.java            |  8 +--
 .../runtime/io/InputProcessorUtil.java          |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  4 +-
 9 files changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b716d9e..f148360 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -299,7 +299,7 @@ public final class ConfigConstants {
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread";
@@ -316,7 +316,7 @@ public final class ConfigConstants {
 	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
 	 * exceeded without a successful registration, then the TaskManager terminates.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
@@ -325,7 +325,7 @@ public final class ConfigConstants {
 	 * The initial registration pause between two consecutive registration attempts. The pause
 	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause";
@@ -333,7 +333,7 @@ public final class ConfigConstants {
 	/**
 	 * The maximum registration pause between two consecutive registration attempts.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause";
@@ -341,7 +341,7 @@ public final class ConfigConstants {
 	/**
 	 * The pause after a registration has been refused by the job manager before retrying to connect.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause";
@@ -1441,7 +1441,7 @@ public final class ConfigConstants {
 	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+	 * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
 	 */
 	@Deprecated
 	public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -1457,7 +1457,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's maximum registration duration.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
@@ -1465,7 +1465,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's initial registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
@@ -1473,7 +1473,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's maximum registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s";
@@ -1481,7 +1481,7 @@ public final class ConfigConstants {
 	/**
 	 * The default task manager's refused registration pause.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+	 * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
 	 */
 	@Deprecated
 	public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c7b0782..2bd3091 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -104,40 +104,44 @@ public class TaskManagerOptions {
 				" global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true");
 
 	/**
-	 * The initial registration pause between two consecutive registration attempts. The pause
-	 * is doubled for each new registration attempt until it reaches the maximum registration pause.
+	 * The initial registration backoff between two consecutive registration attempts. The backoff
+	 * is doubled for each new registration attempt until it reaches the maximum registration backoff.
 	 */
-	public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE =
-		key("taskmanager.initial-registration-pause")
+	public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
+		key("taskmanager.registration.initial-backoff")
 			.defaultValue("500 ms")
-			.withDescription("The initial registration pause between two consecutive registration attempts. The pause" +
-				" is doubled for each new registration attempt until it reaches the maximum registration pause.");
+			.withDeprecatedKeys("taskmanager.initial-registration-pause")
+			.withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
+				" is doubled for each new registration attempt until it reaches the maximum registration backoff.");
 
 	/**
-	 * The maximum registration pause between two consecutive registration attempts.
+	 * The maximum registration backoff between two consecutive registration attempts.
 	 */
-	public static final ConfigOption<String> MAX_REGISTRATION_PAUSE =
-		key("taskmanager.max-registration-pause")
+	public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
+		key("taskmanager.registration.max-backoff")
 			.defaultValue("30 s")
-			.withDescription("The maximum registration pause between two consecutive registration attempts. The max" +
-				" registration pause requires a time unit specifier (ms/s/min/h/d).");
+			.withDeprecatedKeys("taskmanager.max-registration-pause")
+			.withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
+				" registration backoff requires a time unit specifier (ms/s/min/h/d).");
 
 	/**
-	 * The pause after a registration has been refused by the job manager before retrying to connect.
+	 * The backoff after a registration has been refused by the job manager before retrying to connect.
 	 */
-	public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE =
-		key("taskmanager.refused-registration-pause")
+	public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
+		key("taskmanager.registration.refused-backoff")
 			.defaultValue("10 s")
-			.withDescription("The pause after a registration has been refused by the job manager before retrying to connect.");
+			.withDeprecatedKeys("taskmanager.refused-registration-pause")
+			.withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");
 
 	/**
-	 * Defines the maximum time it can take for the TaskManager registration. If the duration is
+	 * Defines the timeout it can take for the TaskManager registration. If the duration is
 	 * exceeded without a successful registration, then the TaskManager terminates.
 	 */
-	public static final ConfigOption<String> MAX_REGISTRATION_DURATION =
-		key("taskmanager.maxRegistrationDuration")
+	public static final ConfigOption<String> REGISTRATION_TIMEOUT =
+		key("taskmanager.registration.timeout")
 			.defaultValue("Inf")
-			.withDescription("Defines the maximum time it can take for the TaskManager registration. If the duration is" +
+			.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
+			.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
 				" exceeded without a successful registration, then the TaskManager terminates.");
 
 	/**
@@ -153,14 +157,16 @@ public class TaskManagerOptions {
 				" is typically proportional to the number of physical CPU cores that the TaskManager's machine has" +
 				" (e.g., equal to the number of cores, or half the number of cores).");
 
-	public static final ConfigOption<Boolean> DEBUG_MEMORY_USAGE_START_LOG_THREAD =
-		key("taskmanager.debug.memory.startLogThread")
+	public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
+		key("taskmanager.debug.memory.log")
 			.defaultValue(false)
+			.withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
 			.withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.");
 
 	public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
-		key("taskmanager.debug.memory.logIntervalMs")
+		key("taskmanager.debug.memory.log-interval")
 			.defaultValue(5000L)
+			.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
 			.withDescription("The interval (in ms) for the log thread to log the current memory usage.");
 
 	// ------------------------------------------------------------------------
@@ -321,9 +327,10 @@ public class TaskManagerOptions {
 	 * credit-based flow control.
 	 */
 	@Deprecated
-	public static final ConfigOption<Boolean> NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED =
-			key("taskmanager.network.credit-based-flow-control.enabled")
+	public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
+			key("taskmanager.network.credit-model")
 			.defaultValue(true)
+			.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
 			.withDescription("Boolean flag to enable/disable network credit-based flow control.");
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index eab7382..102274d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -245,7 +245,7 @@ public class BootstrapTools {
 			cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 		}
 
-		cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, registrationTimeout.toString());
+		cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
 		if (numSlots != -1){
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 8572361..18527c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -225,7 +225,7 @@ public class NettyConfig {
 	}
 
 	public boolean isCreditBasedEnabled() {
-		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED);
+		return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index cb6fe51..1bf42ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -185,7 +185,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		final Time finiteRegistrationDuration;
 
 		try {
-			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION));
+			Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
 			if (maxRegistrationDuration.isFinite()) {
 				finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
 			} else {
@@ -193,12 +193,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e);
+				TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);
 		}
 
 		final Time initialRegistrationPause;
 		try {
-			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE));
+			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
 			if (pause.isFinite()) {
 				initialRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -206,13 +206,13 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final Time maxRegistrationPause;
 		try {
 			Duration pause = Duration.create(configuration.getString(
-				TaskManagerOptions.MAX_REGISTRATION_PAUSE));
+				TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
 			if (pause.isFinite()) {
 				maxRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -220,12 +220,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final Time refusedRegistrationPause;
 		try {
-			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE));
+			Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
 			if (pause.isFinite()) {
 				refusedRegistrationPause = Time.milliseconds(pause.toMillis());
 			} else {
@@ -233,7 +233,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 			}
 		} catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Invalid format for parameter " +
-				TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+				TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
 		}
 
 		final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0aaeae3..071a333 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1884,7 +1884,7 @@ object TaskManager {
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(
-        TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+        TaskManagerOptions.DEBUG_MEMORY_LOG))
       {
         LOG.info("Starting periodic memory usage logger")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 6b65095..ad32a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -267,7 +267,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			try {
 				// registration timeout of 1 second
 				Configuration tmConfig = new Configuration();
-				tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms");
+				tmConfig.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms");
 
 				highAvailabilityServices.setJobMasterLeaderRetriever(
 					HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -325,7 +325,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS);
 				Configuration tmConfig = new Configuration(config);
-				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString());
+				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause.toString());
 
 				highAvailabilityServices.setJobMasterLeaderRetriever(
 					HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -407,8 +407,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				long maxDelay = 30000;
 
 				Configuration tmConfig = new Configuration(config);
-				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms");
-				tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms");
+				tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause + " ms");
+				tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, initialRegistrationPause + " ms");
 
 				// we make the test actor (the test kit) the JobManager to intercept
 				// the messages

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 1ae34b3..d1c5b72 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -51,7 +51,7 @@ public class InputProcessorUtil {
 					+ " must be positive or -1 (infinite)");
 			}
 
-			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED)) {
+			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
 				barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
 			} else {
 				barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b0f590c/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1a3419b..5fe7b1d 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
-    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms")
-    config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms")
+    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
+    config.setString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF, "100 ms")
 
     val cluster = new TestingCluster(config, singleActorSystem = false)