You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/27 23:05:41 UTC
[2/4] flink git commit: [FLINK-1636] [runtime] Add partition request
backoff logic to LocalInputChannel
[FLINK-1636] [runtime] Add partition request backoff logic to LocalInputChannel
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceb890f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceb890f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceb890f1
Branch: refs/heads/master
Commit: ceb890f1d54acbc62eeeb308be386dea3b2c457d
Parents: 0ef2159
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue May 26 15:37:35 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed May 27 23:04:58 2015 +0200
----------------------------------------------------------------------
.../partition/consumer/InputChannel.java | 112 +++++++++++++-
.../partition/consumer/LocalInputChannel.java | 75 ++++++++--
.../partition/consumer/RemoteInputChannel.java | 58 +------
.../partition/consumer/SingleInputGate.java | 34 ++++-
.../partition/consumer/UnknownInputChannel.java | 4 +-
.../NetworkEnvironmentConfiguration.scala | 2 +-
.../partition/consumer/InputChannelTest.java | 150 +++++++++++++++++++
.../consumer/LocalInputChannelTest.java | 88 +++++++++++
.../runtime/taskmanager/TaskManagerTest.java | 85 ++++++++++-
9 files changed, 530 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 0805066..f828e2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -22,8 +22,13 @@ import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import scala.Tuple2;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* An input channel consumes a single {@link ResultSubpartitionView}.
@@ -43,10 +48,41 @@ public abstract class InputChannel {
protected final SingleInputGate inputGate;
- protected InputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId) {
- this.inputGate = inputGate;
+ // - Asynchronous error notification --------------------------------------
+
+ private final AtomicReference<Throwable> cause = new AtomicReference<Throwable>();
+
+ // - Partition request backoff --------------------------------------------
+
+ /** The initial backoff (in ms). */
+ private final int initialBackoff;
+
+ /** The maximum backoff (in ms). */
+ private final int maxBackoff;
+
+ /** The current backoff (in ms) */
+ private int currentBackoff;
+
+ protected InputChannel(
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
+ checkArgument(channelIndex >= 0);
+
+ int initial = initialAndMaxBackoff._1();
+ int max = initialAndMaxBackoff._2();
+
+ checkArgument(initial >= 0 && initial <= max);
+
+ this.inputGate = checkNotNull(inputGate);
this.channelIndex = channelIndex;
- this.partitionId = partitionId;
+ this.partitionId = checkNotNull(partitionId);
+
+ this.initialBackoff = initial;
+ this.maxBackoff = max;
+ this.currentBackoff = initial == 0 ? -1 : 0;
}
// ------------------------------------------------------------------------
@@ -109,4 +145,74 @@ public abstract class InputChannel {
*/
abstract void releaseAllResources() throws IOException;
+ // ------------------------------------------------------------------------
+ // Error notification
+ // ------------------------------------------------------------------------
+
+ /**
+ * Checks for an error and rethrows it if one was reported.
+ */
+ protected void checkError() throws IOException {
+ final Throwable t = cause.get();
+
+ if (t != null) {
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ else {
+ throw new IOException(t);
+ }
+ }
+ }
+
+ /**
+ * Atomically sets an error for this channel and notifies the input gate about available data to
+ * trigger querying this channel by the task thread.
+ */
+ protected void setError(Throwable cause) {
+ if (this.cause.compareAndSet(null, checkNotNull(cause))) {
+ // Notify the input gate.
+ notifyAvailableBuffer();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Partition request exponential backoff
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the current backoff in ms.
+ */
+ protected int getCurrentBackoff() {
+ return currentBackoff <= 0 ? 0 : currentBackoff;
+ }
+
+ /**
+ * Increases the current backoff and returns whether the operation was successful.
+ *
+ * @return <code>true</code>, iff the operation was successful. Otherwise, <code>false</code>.
+ */
+ protected boolean increaseBackoff() {
+ // Backoff is disabled
+ if (currentBackoff < 0) {
+ return false;
+ }
+
+ // This is the first time backing off
+ if (currentBackoff == 0) {
+ currentBackoff = initialBackoff;
+
+ return true;
+ }
+
+ // Continue backing off
+ else if (currentBackoff < maxBackoff) {
+ currentBackoff = Math.min(currentBackoff * 2, maxBackoff);
+
+ return true;
+ }
+
+ // Reached maximum backoff
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 7cb62f8..65f6a36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -23,14 +23,18 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Tuple2;
import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -42,6 +46,8 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
+ private final Object requestLock = new Object();
+
/** The local partition manager. */
private final ResultPartitionManager partitionManager;
@@ -62,7 +68,19 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher) {
- super(inputGate, channelIndex, partitionId);
+ this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
+ new Tuple2<Integer, Integer>(0, 0));
+ }
+
+ LocalInputChannel(
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ ResultPartitionManager partitionManager,
+ TaskEventDispatcher taskEventDispatcher,
+ Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
+ super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -74,23 +92,59 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
@Override
void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
- if (subpartitionView == null) {
- LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
- this, subpartitionIndex, partitionId);
-
- subpartitionView = partitionManager.createSubpartitionView(
- partitionId, subpartitionIndex, inputGate.getBufferProvider());
-
+ // The lock is required to request only once in the presence of retriggered requests.
+ synchronized (requestLock) {
if (subpartitionView == null) {
- throw new IOException("Error requesting subpartition.");
+ LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
+ this, subpartitionIndex, partitionId);
+
+ try {
+ subpartitionView = partitionManager.createSubpartitionView(
+ partitionId, subpartitionIndex, inputGate.getBufferProvider());
+ }
+ catch (PartitionNotFoundException notFound) {
+ if (increaseBackoff()) {
+ inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+ return;
+ }
+ else {
+ throw notFound;
+ }
+ }
+
+ if (subpartitionView == null) {
+ throw new IOException("Error requesting subpartition.");
+ }
+
+ getNextLookAhead();
}
+ }
+ }
- getNextLookAhead();
+ /**
+ * Retriggers a subpartition request.
+ */
+ void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException {
+ synchronized (requestLock) {
+ checkState(subpartitionView == null, "Already requested partition.");
+
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ requestSubpartition(subpartitionIndex);
+ }
+ catch (Throwable t) {
+ setError(t);
+ }
+ }
+ }, getCurrentBackoff());
}
}
@Override
Buffer getNextBuffer() throws IOException, InterruptedException {
+ checkError();
checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
// After subscribe notification
@@ -119,6 +173,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
@Override
void sendTaskEvent(TaskEvent event) throws IOException {
+ checkError();
checkState(subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
if (!taskEventDispatcher.publish(partitionId, event)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 449b1cf..090e94d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -34,9 +34,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -57,12 +55,6 @@ public class RemoteInputChannel extends InputChannel {
private final ConnectionManager connectionManager;
/**
- * An asynchronous error notification. Set by either the network I/O thread or the thread
- * failing a partition request.
- */
- private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
- /**
* The received buffers. Received buffers are enqueued by the network I/O thread and the queue
* is consumed by the receiving task thread.
*/
@@ -83,12 +75,6 @@ public class RemoteInputChannel extends InputChannel {
*/
private int expectedSequenceNumber = 0;
- /** The current backoff time (in ms) for partition requests. */
- private int nextRequestBackoffMs;
-
- /** The maximum backoff time (in ms) after which a request fails */
- private final int maxRequestBackoffMs;
-
RemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
@@ -108,15 +94,10 @@ public class RemoteInputChannel extends InputChannel {
ConnectionManager connectionManager,
Tuple2<Integer, Integer> initialAndMaxBackoff) {
- super(inputGate, channelIndex, partitionId);
+ super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);
-
- checkArgument(initialAndMaxBackoff._1() <= initialAndMaxBackoff._2());
-
- this.nextRequestBackoffMs = initialAndMaxBackoff._1();
- this.maxRequestBackoffMs = initialAndMaxBackoff._2();
}
// ------------------------------------------------------------------------
@@ -143,17 +124,9 @@ public class RemoteInputChannel extends InputChannel {
void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, InterruptedException {
checkState(partitionRequestClient != null, "Missing initial subpartition request.");
- // Disabled
- if (nextRequestBackoffMs == 0) {
- failPartitionRequest();
- }
- else if (nextRequestBackoffMs <= maxRequestBackoffMs) {
- partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, nextRequestBackoffMs);
-
- // Exponential backoff
- nextRequestBackoffMs = nextRequestBackoffMs < maxRequestBackoffMs
- ? Math.min(nextRequestBackoffMs * 2, maxRequestBackoffMs)
- : maxRequestBackoffMs + 1; // Fail the next request
+ if (increaseBackoff()) {
+ partitionRequestClient.requestSubpartition(
+ partitionId, subpartitionIndex, this, getCurrentBackoff());
}
else {
failPartitionRequest();
@@ -230,7 +203,7 @@ public class RemoteInputChannel extends InputChannel {
}
public void failPartitionRequest() {
- onError(new PartitionNotFoundException(partitionId));
+ setError(new PartitionNotFoundException(partitionId));
}
@Override
@@ -305,26 +278,7 @@ public class RemoteInputChannel extends InputChannel {
}
public void onError(Throwable cause) {
- if (error.compareAndSet(null, cause)) {
- // Notify the input gate to trigger querying of this channel
- notifyAvailableBuffer();
- }
- }
-
- /**
- * Checks whether this channel got notified about an error.
- */
- private void checkError() throws IOException {
- final Throwable t = error.get();
-
- if (t != null) {
- if (t instanceof IOException) {
- throw (IOException) t;
- }
- else {
- throw new IOException(t);
- }
- }
+ setError(cause);
}
public static class BufferReorderingException extends IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b4a0845..78aa6f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
@@ -163,6 +164,9 @@ public class SingleInputGate implements InputGate {
private int numberOfUninitializedChannels;
+ /** A timer to retrigger local partition requests. Only initialized if actually needed. */
+ private Timer retriggerLocalRequestTimer;
+
public SingleInputGate(
String owningTaskName,
JobID jobId,
@@ -290,16 +294,25 @@ public class SingleInputGate implements InputGate {
checkNotNull(ch, "Unknown input channel with ID " + partitionId);
- if (ch.getClass() != RemoteInputChannel.class) {
- throw new IllegalArgumentException("Channel identified by " + partitionId
- + " is not a remote channel.");
- }
+ LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
- final RemoteInputChannel rch = (RemoteInputChannel) ch;
+ if (ch.getClass() == RemoteInputChannel.class) {
+ final RemoteInputChannel rch = (RemoteInputChannel) ch;
+ rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
+ }
+ else if (ch.getClass() == LocalInputChannel.class) {
+ final LocalInputChannel ich = (LocalInputChannel) ch;
- LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
+ if (retriggerLocalRequestTimer == null) {
+ retriggerLocalRequestTimer = new Timer(true);
+ }
- rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
+ ich.retriggerSubpartitionRequest(retriggerLocalRequestTimer, consumedSubpartitionIndex);
+ }
+ else {
+ throw new IllegalStateException(
+ "Unexpected type of channel to retrigger partition: " + ch.getClass());
+ }
}
}
}
@@ -310,6 +323,10 @@ public class SingleInputGate implements InputGate {
try {
LOG.debug("{}: Releasing {}.", owningTaskName, this);
+ if (retriggerLocalRequestTimer != null) {
+ retriggerLocalRequestTimer.cancel();
+ }
+
for (InputChannel inputChannel : inputChannels.values()) {
try {
inputChannel.releaseAllResources();
@@ -488,7 +505,8 @@ public class SingleInputGate implements InputGate {
if (partitionLocation.isLocal()) {
inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
networkEnvironment.getPartitionManager(),
- networkEnvironment.getTaskEventDispatcher());
+ networkEnvironment.getTaskEventDispatcher(),
+ networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
}
else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 0aa7ea3..e4b9e57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -56,7 +56,7 @@ public class UnknownInputChannel extends InputChannel {
ConnectionManager connectionManager,
Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff) {
- super(gate, channelIndex, partitionId);
+ super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff);
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -116,6 +116,6 @@ public class UnknownInputChannel extends InputChannel {
}
public LocalInputChannel toLocalInputChannel() {
- return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher);
+ return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 51ca90d..5bcda60 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -26,4 +26,4 @@ case class NetworkEnvironmentConfiguration(
networkBufferSize: Int,
ioMode: IOMode,
nettyConfig: Option[NettyConfig] = None,
- partitionRequestInitialAndMaxBackoff: Tuple2[Integer, Integer] = (50, 3000))
+ partitionRequestInitialAndMaxBackoff: Tuple2[Integer, Integer] = (500, 3000))
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
new file mode 100644
index 0000000..e95c774
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class InputChannelTest {
+
+ @Test
+ public void testExponentialBackoff() throws Exception {
+ InputChannel ch = createInputChannel(500, 4000);
+
+ assertEquals(0, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(500, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(1000, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(2000, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(4000, ch.getCurrentBackoff());
+
+ assertFalse(ch.increaseBackoff());
+ assertEquals(4000, ch.getCurrentBackoff());
+ }
+
+ @Test
+ public void testExponentialBackoffCappedAtMax() throws Exception {
+ InputChannel ch = createInputChannel(500, 3000);
+
+ assertEquals(0, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(500, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(1000, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(2000, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(3000, ch.getCurrentBackoff());
+
+ assertFalse(ch.increaseBackoff());
+ assertEquals(3000, ch.getCurrentBackoff());
+ }
+
+ @Test
+ public void testExponentialBackoffSingle() throws Exception {
+ InputChannel ch = createInputChannel(500, 500);
+
+ assertEquals(0, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(500, ch.getCurrentBackoff());
+
+ assertFalse(ch.increaseBackoff());
+ assertEquals(500, ch.getCurrentBackoff());
+ }
+
+ @Test
+ public void testExponentialNoBackoff() throws Exception {
+ InputChannel ch = createInputChannel(0, 0);
+
+ assertEquals(0, ch.getCurrentBackoff());
+
+ assertFalse(ch.increaseBackoff());
+ assertEquals(0, ch.getCurrentBackoff());
+ }
+
+ private InputChannel createInputChannel(int initialBackoff, int maxBackoff) {
+ return new MockInputChannel(
+ mock(SingleInputGate.class),
+ 0,
+ new ResultPartitionID(),
+ new Tuple2<Integer, Integer>(initialBackoff, maxBackoff));
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ private static class MockInputChannel extends InputChannel {
+
+ private MockInputChannel(
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
+ super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
+ }
+
+ @Override
+ void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+ }
+
+ @Override
+ Buffer getNextBuffer() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ void sendTaskEvent(TaskEvent event) throws IOException {
+ }
+
+ @Override
+ boolean isReleased() {
+ return false;
+ }
+
+ @Override
+ void notifySubpartitionConsumed() throws IOException {
+ }
+
+ @Override
+ void releaseAllResources() throws IOException {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 9bef886..c25e1d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -23,11 +23,15 @@ import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -40,9 +44,15 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Tuple2;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -50,7 +60,14 @@ import java.util.concurrent.Future;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class LocalInputChannelTest {
@@ -159,6 +176,77 @@ public class LocalInputChannelTest {
}
}
+ @Test
+ public void testPartitionRequestExponentialBackoff() throws Exception {
+ // Config
+ Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
+
+ // Start with initial backoff, then keep doubling, and cap at max.
+ int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
+
+ // Setup
+ SingleInputGate inputGate = mock(SingleInputGate.class);
+
+ BufferProvider bufferProvider = mock(BufferProvider.class);
+ when(inputGate.getBufferProvider()).thenReturn(bufferProvider);
+
+ ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+ LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff);
+
+ when(partitionManager
+ .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider)))
+ .thenThrow(new PartitionNotFoundException(ch.partitionId));
+
+ Timer timer = mock(Timer.class);
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ((TimerTask) invocation.getArguments()[0]).run();
+ return null;
+ }
+ }).when(timer).schedule(any(TimerTask.class), anyLong());
+
+ // Initial request
+ ch.requestSubpartition(0);
+ verify(partitionManager)
+ .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider));
+
+ // Request subpartition and verify that the actual requests are delayed.
+ for (long expected : expectedDelays) {
+ ch.retriggerSubpartitionRequest(timer, 0);
+
+ verify(timer).schedule(any(TimerTask.class), eq(expected));
+ }
+
+ // Exception after backoff is greater than the maximum backoff.
+ try {
+ ch.retriggerSubpartitionRequest(timer, 0);
+ ch.getNextBuffer();
+ fail("Did not throw expected exception.");
+ }
+ catch (Exception expected) {
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ private LocalInputChannel createLocalInputChannel(
+ SingleInputGate inputGate,
+ ResultPartitionManager partitionManager,
+ Tuple2<Integer, Integer> initialAndMaxRequestBackoff)
+ throws IOException, InterruptedException {
+
+ return new LocalInputChannel(
+ inputGate,
+ 0,
+ new ResultPartitionID(),
+ partitionManager,
+ mock(TaskEventDispatcher.class),
+ initialAndMaxRequestBackoff);
+ }
+
/**
* Returns the configured number of buffers for each channel in a random order.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d33dcd7..c25b9a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -692,10 +692,10 @@ public class TaskManagerTest {
}
/**
- * Tests that repeated {@link PartitionNotFoundException}s fail the receiver.
+ * Tests that repeated remote {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@Test
- public void testPartitionNotFound() throws Exception {
+ public void testRemotePartitionNotFound() throws Exception {
new JavaTestKit(system){{
@@ -775,6 +775,87 @@ public class TaskManagerTest {
}};
}
+ /**
+ * Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
+ */
+ @Test
+ public void testLocalPartitionNotFound() throws Exception {
+
+ new JavaTestKit(system){{
+
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+
+ try {
+ final IntermediateDataSetID resultId = new IntermediateDataSetID();
+
+ // Create the JM
+ jobManager = system.actorOf(Props.create(
+ new SimplePartitionStateLookupJobManagerCreator(resultId, getTestActor())));
+
+ final int dataPort = NetUtils.getAvailablePort();
+ taskManager = createTaskManager(jobManager, true, true, dataPort);
+
+ // ---------------------------------------------------------------------------------
+
+ final ActorRef tm = taskManager;
+
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+ final ResultPartitionID partitionId = new ResultPartitionID();
+
+ // Local location (on the same TM though) for the partition
+ final ResultPartitionLocation loc = ResultPartitionLocation.createLocal();
+
+ final InputChannelDeploymentDescriptor[] icdd =
+ new InputChannelDeploymentDescriptor[] {
+ new InputChannelDeploymentDescriptor(partitionId, loc)};
+
+ final InputGateDeploymentDescriptor igdd =
+ new InputGateDeploymentDescriptor(resultId, 0, icdd);
+
+ final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+ jid, vid, eid, "Receiver", 0, 1,
+ new Configuration(), new Configuration(),
+ Tasks.AgnosticReceiver.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.singletonList(igdd),
+ Collections.<BlobKey>emptyList(), 0);
+
+ new Within(d) {
+ @Override
+ protected void run() {
+ // Submit the task
+ tm.tell(new SubmitTask(tdd), getTestActor());
+ expectMsgClass(Messages.getAcknowledge().getClass());
+
+ // Wait to be notified about the final execution state by the mock JM
+ TaskExecutionState msg = expectMsgClass(TaskExecutionState.class);
+
+ // The task should fail after repeated requests
+ assertEquals(msg.getExecutionState(), ExecutionState.FAILED);
+ assertEquals(msg.getError(ClassLoader.getSystemClassLoader()).getClass(),
+ PartitionNotFoundException.class);
+ }
+ };
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (taskManager != null) {
+ taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+
+ if (jobManager != null) {
+ jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }};
+ }
// --------------------------------------------------------------------------------------------