You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/10 20:53:42 UTC

[3/3] flink git commit: [FLINK-5040] [taskmanager] Adjust partition request backoffs

[FLINK-5040] [taskmanager] Adjust partition request backoffs

The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bd8e027
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bd8e027
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bd8e027

Branch: refs/heads/release-1.1
Commit: 0bd8e027934fc34302c5ddb48f9e9aa448a58721
Parents: 55c506f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 21:53:31 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 11 +++
 .../ResultPartitionDeploymentDescriptor.java    | 10 +--
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../partition/consumer/SingleInputGate.java     |  8 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../NetworkEnvironmentConfiguration.scala       | 12 +--
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++
 ...ResultPartitionDeploymentDescriptorTest.java |  2 +-
 .../ExecutionVertexDeploymentTest.java          |  2 +-
 .../io/network/NetworkEnvironmentTest.java      |  3 +-
 .../partition/consumer/SingleInputGateTest.java | 83 ++++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java | 10 +--
 .../runtime/taskmanager/TaskManagerTest.java    |  4 +
 13 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d9ccb35..1431eae 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -216,6 +216,11 @@ public final class ConfigConstants {
 	 */
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
+	/** Minimum backoff for partition requests of input channels. */
+	public static final String NETWORK_REQUEST_BACKOFF_INITIAL_KEY = "taskmanager.net.request-backoff.initial";
+
+	public static final String NETWORK_REQUEST_BACKOFF_MAX_KEY = "taskmanager.net.request-backoff.max";
+
 	/**
 	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
 	 */
@@ -823,6 +828,12 @@ public final class ConfigConstants {
 	 */
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
 
+	/** Initial backoff for partition requests of input channels. */
+	public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL = 100;
+
+	/** Maximum backoff for partition requests of input channels. */
+	public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_MAX = 10000;
+
 	/**
 	 * Default size of memory segments in the network stack and the memory manager.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..3edd279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,14 +49,14 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 	private final int numberOfSubpartitions;
 	
 	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
-	private final boolean lazyScheduling;
+	private final boolean sendScheduleOrUpdateConsumersMessage;
 
 	public ResultPartitionDeploymentDescriptor(
 			IntermediateDataSetID resultId,
 			IntermediateResultPartitionID partitionId,
 			ResultPartitionType partitionType,
 			int numberOfSubpartitions,
-			boolean lazyScheduling) {
+			boolean sendScheduleOrUpdateConsumersMessage) {
 
 		this.resultId = checkNotNull(resultId);
 		this.partitionId = checkNotNull(partitionId);
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 		checkArgument(numberOfSubpartitions >= 1);
 		this.numberOfSubpartitions = numberOfSubpartitions;
-		this.lazyScheduling = lazyScheduling;
+		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return numberOfSubpartitions;
 	}
 
-	public boolean allowLazyScheduling() {
-		return lazyScheduling;
+	public boolean sendScheduleOrUpdateConsumersMessage() {
+		return sendScheduleOrUpdateConsumersMessage;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 11661cc..d3715ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -148,7 +148,7 @@ public class NetworkEnvironment {
 	}
 
 	public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
-		return configuration.partitionRequestInitialAndMaxBackoff();
+		return configuration.partitionRequestInitialMaxBackoff();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 351181a..212aade 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -496,6 +497,13 @@ public class SingleInputGate implements InputGate {
 
 	// ------------------------------------------------------------------------
 
+	@VisibleForTesting
+	Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+		return inputChannels;
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Creates an input gate and all of its input channels.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2179fc1..56aea1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -337,7 +337,7 @@ public class Task implements Runnable {
 					networkEnvironment.getPartitionConsumableNotifier(),
 					ioManager,
 					networkEnvironment.getDefaultIOMode(),
-					desc.allowLazyScheduling());
+					desc.sendScheduleOrUpdateConsumersMessage());
 
 			writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 619da96..b7fa140 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -24,9 +24,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-  numNetworkBuffers: Int,
-  networkBufferSize: Int,
-  memoryType: MemoryType,
-  ioMode: IOMode,
-  nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))
+    numNetworkBuffers: Int,
+    networkBufferSize: Int,
+    memoryType: MemoryType,
+    ioMode: IOMode,
+    partitionRequestInitialMaxBackoff : (Integer, Integer),
+    nettyConfig: Option[NettyConfig] = None)

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c6759c1..40ae234 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2137,11 +2137,19 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
 
+    val initialRequestBackoff = configuration.getInteger(
+      ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY,
+      ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL)
+    val maxRequestBackoff = configuration.getInteger(
+      ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY,
+      ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_MAX)
+
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
       memType,
       ioMode,
+      (initialRequestBackoff, maxRequestBackoff),
       nettyConfig)
 
     // ----> timeouts, library caching, profiling

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
 		assertEquals(partitionId, copy.getPartitionId());
 		assertEquals(partitionType, copy.getPartitionType());
 		assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
-		assertTrue(copy.allowLazyScheduling());
+		assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 1f5c915..b3e6b63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -374,7 +374,7 @@ public class ExecutionVertexDeploymentTest {
 
 			assertEquals(1, producedPartitions.size());
 			ResultPartitionDeploymentDescriptor desc = producedPartitions.get(0);
-			assertEquals(mode.allowLazyDeployment(), desc.allowLazyScheduling());
+			assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a659be3..ca4b7fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -62,8 +62,7 @@ public class NetworkEnvironmentTest {
 			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration());
 			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
 					NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
-					IOManager.IOMode.SYNC, new Some<>(nettyConf),
-					new Tuple2<>(0, 0));
+					IOManager.IOMode.SYNC, new Tuple2<>(0, 0), new Some<>(nettyConf));
 
 			NetworkEnvironment env = new NetworkEnvironment(
 				TestingUtils.defaultExecutionContext(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 05427a1..9c8be81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -43,9 +46,12 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -269,6 +275,83 @@ public class SingleInputGateTest {
 	}
 
 	/**
+	 * Tests request back off configuration is correctly forwarded to the channels.
+	 */
+	@Test
+	public void testRequestBackoffConfiguration() throws Exception {
+		ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+			new ResultPartitionID(),
+			new ResultPartitionID(),
+			new ResultPartitionID()
+		};
+
+		InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
+			// Local
+			new InputChannelDeploymentDescriptor(
+				partitionIds[0],
+				ResultPartitionLocation.createLocal()),
+			// Remote
+			new InputChannelDeploymentDescriptor(
+				partitionIds[1],
+				ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+			// Unknown
+			new InputChannelDeploymentDescriptor(
+				partitionIds[2],
+				ResultPartitionLocation.createUnknown())};
+
+		InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+
+		int initialBackoff = 137;
+		int maxBackoff = 1001;
+
+		NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+		when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager());
+		when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
+		when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+		when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff));
+		when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
+
+		SingleInputGate gate = SingleInputGate.create(
+			"TestTask",
+			new JobID(),
+			new ExecutionAttemptID(),
+			gateDesc,
+			netEnv,
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
+
+		assertEquals(3, channelMap.size());
+		InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
+		assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+		InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
+		assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
+
+		InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
+		assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
+
+		InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
+		for (InputChannel ch : channels) {
+			assertEquals(0, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+			assertFalse(ch.increaseBackoff());
+		}
+	}
+
+	/**
 	 * Returns whether the stack trace represents a Thread in a blocking queue
 	 * poll call.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 60bf8e7..b4c456c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
@@ -42,7 +40,6 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
-
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -54,6 +51,10 @@ import scala.concurrent.duration.FiniteDuration;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TaskManagerComponentsStartupShutdownTest {
 
 	/**
@@ -98,8 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 					config);
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
-					new Tuple2<Integer, Integer>(0, 0));
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, new Tuple2<>(0, 0), Option.<NettyConfig>empty());
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 431cbb8..f2fd859 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -903,6 +903,8 @@ public class TaskManagerTest extends TestLogger {
 				final int dataPort = NetUtils.getAvailablePort();
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
 
 				taskManager = TestingUtils.createTaskManager(
 						system,
@@ -999,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
 
 				final int dataPort = NetUtils.getAvailablePort();
 				final Configuration config = new Configuration();
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
 
 				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);