You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/10 20:53:42 UTC
[3/3] flink git commit: [FLINK-5040] [taskmanager] Adjust partition
request backoffs
[FLINK-5040] [taskmanager] Adjust partition request backoffs
The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bd8e027
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bd8e027
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bd8e027
Branch: refs/heads/release-1.1
Commit: 0bd8e027934fc34302c5ddb48f9e9aa448a58721
Parents: 55c506f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 21:53:31 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 11 +++
.../ResultPartitionDeploymentDescriptor.java | 10 +--
.../runtime/io/network/NetworkEnvironment.java | 2 +-
.../partition/consumer/SingleInputGate.java | 8 ++
.../apache/flink/runtime/taskmanager/Task.java | 2 +-
.../NetworkEnvironmentConfiguration.scala | 12 +--
.../flink/runtime/taskmanager/TaskManager.scala | 8 ++
...ResultPartitionDeploymentDescriptorTest.java | 2 +-
.../ExecutionVertexDeploymentTest.java | 2 +-
.../io/network/NetworkEnvironmentTest.java | 3 +-
.../partition/consumer/SingleInputGateTest.java | 83 ++++++++++++++++++++
...askManagerComponentsStartupShutdownTest.java | 10 +--
.../runtime/taskmanager/TaskManagerTest.java | 4 +
13 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d9ccb35..1431eae 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -216,6 +216,11 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
+ /** Minimum backoff for partition requests of input channels. */
+ public static final String NETWORK_REQUEST_BACKOFF_INITIAL_KEY = "taskmanager.net.request-backoff.initial";
+
+ public static final String NETWORK_REQUEST_BACKOFF_MAX_KEY = "taskmanager.net.request-backoff.max";
+
/**
* Config parameter defining the size of memory buffers used by the network stack and the memory manager.
*/
@@ -823,6 +828,12 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
+ /** Initial backoff for partition requests of input channels. */
+ public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL = 100;
+
+ /** Maximum backoff for partition requests of input channels. */
+ public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_MAX = 10000;
+
/**
* Default size of memory segments in the network stack and the memory manager.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..3edd279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,14 +49,14 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
private final int numberOfSubpartitions;
/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
- private final boolean lazyScheduling;
+ private final boolean sendScheduleOrUpdateConsumersMessage;
public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
- boolean lazyScheduling) {
+ boolean sendScheduleOrUpdateConsumersMessage) {
this.resultId = checkNotNull(resultId);
this.partitionId = checkNotNull(partitionId);
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
- this.lazyScheduling = lazyScheduling;
+ this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
}
public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
return numberOfSubpartitions;
}
- public boolean allowLazyScheduling() {
- return lazyScheduling;
+ public boolean sendScheduleOrUpdateConsumersMessage() {
+ return sendScheduleOrUpdateConsumersMessage;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 11661cc..d3715ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -148,7 +148,7 @@ public class NetworkEnvironment {
}
public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
- return configuration.partitionRequestInitialAndMaxBackoff();
+ return configuration.partitionRequestInitialMaxBackoff();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 351181a..212aade 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -496,6 +497,13 @@ public class SingleInputGate implements InputGate {
// ------------------------------------------------------------------------
+ @VisibleForTesting
+ Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+ return inputChannels;
+ }
+
+ // ------------------------------------------------------------------------
+
/**
* Creates an input gate and all of its input channels.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2179fc1..56aea1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -337,7 +337,7 @@ public class Task implements Runnable {
networkEnvironment.getPartitionConsumableNotifier(),
ioManager,
networkEnvironment.getDefaultIOMode(),
- desc.allowLazyScheduling());
+ desc.sendScheduleOrUpdateConsumersMessage());
writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 619da96..b7fa140 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -24,9 +24,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.network.netty.NettyConfig
case class NetworkEnvironmentConfiguration(
- numNetworkBuffers: Int,
- networkBufferSize: Int,
- memoryType: MemoryType,
- ioMode: IOMode,
- nettyConfig: Option[NettyConfig] = None,
- partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))
+ numNetworkBuffers: Int,
+ networkBufferSize: Int,
+ memoryType: MemoryType,
+ ioMode: IOMode,
+ partitionRequestInitialMaxBackoff : (Integer, Integer),
+ nettyConfig: Option[NettyConfig] = None)
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c6759c1..40ae234 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2137,11 +2137,19 @@ object TaskManager {
val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
+ val initialRequestBackoff = configuration.getInteger(
+ ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY,
+ ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL)
+ val maxRequestBackoff = configuration.getInteger(
+ ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY,
+ ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_MAX)
+
val networkConfig = NetworkEnvironmentConfiguration(
numNetworkBuffers,
pageSize,
memType,
ioMode,
+ (initialRequestBackoff, maxRequestBackoff),
nettyConfig)
// ----> timeouts, library caching, profiling
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
assertEquals(partitionId, copy.getPartitionId());
assertEquals(partitionType, copy.getPartitionType());
assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
- assertTrue(copy.allowLazyScheduling());
+ assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 1f5c915..b3e6b63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -374,7 +374,7 @@ public class ExecutionVertexDeploymentTest {
assertEquals(1, producedPartitions.size());
ResultPartitionDeploymentDescriptor desc = producedPartitions.get(0);
- assertEquals(mode.allowLazyDeployment(), desc.allowLazyScheduling());
+ assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a659be3..ca4b7fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -62,8 +62,7 @@ public class NetworkEnvironmentTest {
NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration());
NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
- IOManager.IOMode.SYNC, new Some<>(nettyConf),
- new Tuple2<>(0, 0));
+ IOManager.IOMode.SYNC, new Tuple2<>(0, 0), new Some<>(nettyConf));
NetworkEnvironment env = new NetworkEnvironment(
TestingUtils.defaultExecutionContext(),
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 05427a1..9c8be81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -43,9 +46,12 @@ import org.junit.Test;
import scala.Tuple2;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -269,6 +275,83 @@ public class SingleInputGateTest {
}
/**
+ * Tests request back off configuration is correctly forwarded to the channels.
+ */
+ @Test
+ public void testRequestBackoffConfiguration() throws Exception {
+ ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+ new ResultPartitionID(),
+ new ResultPartitionID(),
+ new ResultPartitionID()
+ };
+
+ InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
+ // Local
+ new InputChannelDeploymentDescriptor(
+ partitionIds[0],
+ ResultPartitionLocation.createLocal()),
+ // Remote
+ new InputChannelDeploymentDescriptor(
+ partitionIds[1],
+ ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+ // Unknown
+ new InputChannelDeploymentDescriptor(
+ partitionIds[2],
+ ResultPartitionLocation.createUnknown())};
+
+ InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+
+ int initialBackoff = 137;
+ int maxBackoff = 1001;
+
+ NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+ when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager());
+ when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
+ when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+ when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff));
+ when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
+
+ SingleInputGate gate = SingleInputGate.create(
+ "TestTask",
+ new JobID(),
+ new ExecutionAttemptID(),
+ gateDesc,
+ netEnv,
+ new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+ Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
+
+ assertEquals(3, channelMap.size());
+ InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
+ assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+ InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
+ assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
+
+ InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
+ assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
+
+ InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
+ for (InputChannel ch : channels) {
+ assertEquals(0, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+ assertFalse(ch.increaseBackoff());
+ }
+ }
+
+ /**
* Returns whether the stack trace represents a Thread in a blocking queue
* poll call.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 60bf8e7..b4c456c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.taskmanager;
-import static org.junit.Assert.*;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
@@ -42,7 +40,6 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
-
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -54,6 +51,10 @@ import scala.concurrent.duration.FiniteDuration;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class TaskManagerComponentsStartupShutdownTest {
/**
@@ -98,8 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest {
config);
final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
- 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
- new Tuple2<Integer, Integer>(0, 0));
+ 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, new Tuple2<>(0, 0), Option.<NettyConfig>empty());
final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 431cbb8..f2fd859 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -903,6 +903,8 @@ public class TaskManagerTest extends TestLogger {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+ config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+ config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
taskManager = TestingUtils.createTaskManager(
system,
@@ -999,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
final int dataPort = NetUtils.getAvailablePort();
final Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+ config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);