You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/27 23:05:41 UTC

[2/4] flink git commit: [FLINK-1636] [runtime] Add partition request backoff logic to LocalInputChannel

[FLINK-1636] [runtime] Add partition request backoff logic to LocalInputChannel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceb890f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceb890f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceb890f1

Branch: refs/heads/master
Commit: ceb890f1d54acbc62eeeb308be386dea3b2c457d
Parents: 0ef2159
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue May 26 15:37:35 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed May 27 23:04:58 2015 +0200

----------------------------------------------------------------------
 .../partition/consumer/InputChannel.java        | 112 +++++++++++++-
 .../partition/consumer/LocalInputChannel.java   |  75 ++++++++--
 .../partition/consumer/RemoteInputChannel.java  |  58 +------
 .../partition/consumer/SingleInputGate.java     |  34 ++++-
 .../partition/consumer/UnknownInputChannel.java |   4 +-
 .../NetworkEnvironmentConfiguration.scala       |   2 +-
 .../partition/consumer/InputChannelTest.java    | 150 +++++++++++++++++++
 .../consumer/LocalInputChannelTest.java         |  88 +++++++++++
 .../runtime/taskmanager/TaskManagerTest.java    |  85 ++++++++++-
 9 files changed, 530 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 0805066..f828e2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -22,8 +22,13 @@ import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * An input channel consumes a single {@link ResultSubpartitionView}.
@@ -43,10 +48,41 @@ public abstract class InputChannel {
 
 	protected final SingleInputGate inputGate;
 
-	protected InputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId) {
-		this.inputGate = inputGate;
+	// - Asynchronous error notification --------------------------------------
+
+	private final AtomicReference<Throwable> cause = new AtomicReference<Throwable>();
+
+	// - Partition request backoff --------------------------------------------
+
+	/** The initial backoff (in ms). */
+	private final int initialBackoff;
+
+	/** The maximum backoff (in ms). */
+	private final int maxBackoff;
+
+	/** The current backoff (in ms) */
+	private int currentBackoff;
+
+	protected InputChannel(
+			SingleInputGate inputGate,
+			int channelIndex,
+			ResultPartitionID partitionId,
+			Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
+		checkArgument(channelIndex >= 0);
+
+		int initial = initialAndMaxBackoff._1();
+		int max = initialAndMaxBackoff._2();
+
+		checkArgument(initial >= 0 && initial <= max);
+
+		this.inputGate = checkNotNull(inputGate);
 		this.channelIndex = channelIndex;
-		this.partitionId = partitionId;
+		this.partitionId = checkNotNull(partitionId);
+
+		this.initialBackoff = initial;
+		this.maxBackoff = max;
+		this.currentBackoff = initial == 0 ? -1 : 0;
 	}
 
 	// ------------------------------------------------------------------------
@@ -109,4 +145,74 @@ public abstract class InputChannel {
 	 */
 	abstract void releaseAllResources() throws IOException;
 
+	// ------------------------------------------------------------------------
+	// Error notification
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks for an error and rethrows it if one was reported.
+	 */
+	protected void checkError() throws IOException {
+		final Throwable t = cause.get();
+
+		if (t != null) {
+			if (t instanceof IOException) {
+				throw (IOException) t;
+			}
+			else {
+				throw new IOException(t);
+			}
+		}
+	}
+
+	/**
+	 * Atomically sets an error for this channel and notifies the input gate about available data to
+	 * trigger querying this channel by the task thread.
+	 */
+	protected void setError(Throwable cause) {
+		if (this.cause.compareAndSet(null, checkNotNull(cause))) {
+			// Notify the input gate.
+			notifyAvailableBuffer();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Partition request exponential backoff
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the current backoff in ms.
+	 */
+	protected int getCurrentBackoff() {
+		return currentBackoff <= 0 ? 0 : currentBackoff;
+	}
+
+	/**
+	 * Increases the current backoff and returns whether the operation was successful.
+	 *
+	 * @return <code>true</code>, iff the operation was successful. Otherwise, <code>false</code>.
+	 */
+	protected boolean increaseBackoff() {
+		// Backoff is disabled
+		if (currentBackoff < 0) {
+			return false;
+		}
+
+		// This is the first time backing off
+		if (currentBackoff == 0) {
+			currentBackoff = initialBackoff;
+
+			return true;
+		}
+
+		// Continue backing off
+		else if (currentBackoff < maxBackoff) {
+			currentBackoff = Math.min(currentBackoff * 2, maxBackoff);
+
+			return true;
+		}
+
+		// Reached maximum backoff
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 7cb62f8..65f6a36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -23,14 +23,18 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
@@ -42,6 +46,8 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
 
+	private final Object requestLock = new Object();
+
 	/** The local partition manager. */
 	private final ResultPartitionManager partitionManager;
 
@@ -62,7 +68,19 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher) {
 
-		super(inputGate, channelIndex, partitionId);
+		this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
+				new Tuple2<Integer, Integer>(0, 0));
+	}
+
+	LocalInputChannel(
+			SingleInputGate inputGate,
+			int channelIndex,
+			ResultPartitionID partitionId,
+			ResultPartitionManager partitionManager,
+			TaskEventDispatcher taskEventDispatcher,
+			Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
+		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -74,23 +92,59 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
-		if (subpartitionView == null) {
-			LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
-					this, subpartitionIndex, partitionId);
-
-			subpartitionView = partitionManager.createSubpartitionView(
-					partitionId, subpartitionIndex, inputGate.getBufferProvider());
-
+		// The lock is required to request only once in the presence of retriggered requests.
+		synchronized (requestLock) {
 			if (subpartitionView == null) {
-				throw new IOException("Error requesting subpartition.");
+				LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
+						this, subpartitionIndex, partitionId);
+
+				try {
+					subpartitionView = partitionManager.createSubpartitionView(
+							partitionId, subpartitionIndex, inputGate.getBufferProvider());
+				}
+				catch (PartitionNotFoundException notFound) {
+					if (increaseBackoff()) {
+						inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+						return;
+					}
+					else {
+						throw notFound;
+					}
+				}
+
+				if (subpartitionView == null) {
+					throw new IOException("Error requesting subpartition.");
+				}
+
+				getNextLookAhead();
 			}
+		}
+	}
 
-			getNextLookAhead();
+	/**
+	 * Retriggers a subpartition request.
+	 */
+	void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException {
+		synchronized (requestLock) {
+			checkState(subpartitionView == null, "Already requested partition.");
+
+			timer.schedule(new TimerTask() {
+				@Override
+				public void run() {
+					try {
+						requestSubpartition(subpartitionIndex);
+					}
+					catch (Throwable t) {
+						setError(t);
+					}
+				}
+			}, getCurrentBackoff());
 		}
 	}
 
 	@Override
 	Buffer getNextBuffer() throws IOException, InterruptedException {
+		checkError();
 		checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
 
 		// After subscribe notification
@@ -119,6 +173,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 
 	@Override
 	void sendTaskEvent(TaskEvent event) throws IOException {
+		checkError();
 		checkState(subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
 
 		if (!taskEventDispatcher.publish(partitionId, event)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 449b1cf..090e94d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -34,9 +34,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -57,12 +55,6 @@ public class RemoteInputChannel extends InputChannel {
 	private final ConnectionManager connectionManager;
 
 	/**
-	 * An asynchronous error notification. Set by either the network I/O thread or the thread
-	 * failing a partition request.
-	 */
-	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-	/**
 	 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
 	 * is consumed by the receiving task thread.
 	 */
@@ -83,12 +75,6 @@ public class RemoteInputChannel extends InputChannel {
 	 */
 	private int expectedSequenceNumber = 0;
 
-	/** The current backoff time (in ms) for partition requests. */
-	private int nextRequestBackoffMs;
-
-	/** The maximum backoff time (in ms) after which a request fails */
-	private final int maxRequestBackoffMs;
-
 	RemoteInputChannel(
 			SingleInputGate inputGate,
 			int channelIndex,
@@ -108,15 +94,10 @@ public class RemoteInputChannel extends InputChannel {
 			ConnectionManager connectionManager,
 			Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-		super(inputGate, channelIndex, partitionId);
+		super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
-
-		checkArgument(initialAndMaxBackoff._1() <= initialAndMaxBackoff._2());
-
-		this.nextRequestBackoffMs = initialAndMaxBackoff._1();
-		this.maxRequestBackoffMs = initialAndMaxBackoff._2();
 	}
 
 	// ------------------------------------------------------------------------
@@ -143,17 +124,9 @@ public class RemoteInputChannel extends InputChannel {
 	void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, InterruptedException {
 		checkState(partitionRequestClient != null, "Missing initial subpartition request.");
 
-		// Disabled
-		if (nextRequestBackoffMs == 0) {
-			failPartitionRequest();
-		}
-		else if (nextRequestBackoffMs <= maxRequestBackoffMs) {
-			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, nextRequestBackoffMs);
-
-			// Exponential backoff
-			nextRequestBackoffMs = nextRequestBackoffMs < maxRequestBackoffMs
-					? Math.min(nextRequestBackoffMs * 2, maxRequestBackoffMs)
-					: maxRequestBackoffMs + 1; // Fail the next request
+		if (increaseBackoff()) {
+			partitionRequestClient.requestSubpartition(
+					partitionId, subpartitionIndex, this, getCurrentBackoff());
 		}
 		else {
 			failPartitionRequest();
@@ -230,7 +203,7 @@ public class RemoteInputChannel extends InputChannel {
 	}
 
 	public void failPartitionRequest() {
-		onError(new PartitionNotFoundException(partitionId));
+		setError(new PartitionNotFoundException(partitionId));
 	}
 
 	@Override
@@ -305,26 +278,7 @@ public class RemoteInputChannel extends InputChannel {
 	}
 
 	public void onError(Throwable cause) {
-		if (error.compareAndSet(null, cause)) {
-			// Notify the input gate to trigger querying of this channel
-			notifyAvailableBuffer();
-		}
-	}
-
-	/**
-	 * Checks whether this channel got notified about an error.
-	 */
-	private void checkError() throws IOException {
-		final Throwable t = error.get();
-
-		if (t != null) {
-			if (t instanceof IOException) {
-				throw (IOException) t;
-			}
-			else {
-				throw new IOException(t);
-			}
-		}
+		setError(cause);
 	}
 
 	public static class BufferReorderingException extends IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b4a0845..78aa6f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -163,6 +164,9 @@ public class SingleInputGate implements InputGate {
 
 	private int numberOfUninitializedChannels;
 
+	/** A timer to retrigger local partition requests. Only initialized if actually needed. */
+	private Timer retriggerLocalRequestTimer;
+
 	public SingleInputGate(
 			String owningTaskName,
 			JobID jobId,
@@ -290,16 +294,25 @@ public class SingleInputGate implements InputGate {
 
 				checkNotNull(ch, "Unknown input channel with ID " + partitionId);
 
-				if (ch.getClass() != RemoteInputChannel.class) {
-					throw new IllegalArgumentException("Channel identified by " + partitionId
-							+ " is not a remote channel.");
-				}
+				LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
 
-				final RemoteInputChannel rch = (RemoteInputChannel) ch;
+				if (ch.getClass() == RemoteInputChannel.class) {
+					final RemoteInputChannel rch = (RemoteInputChannel) ch;
+					rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
+				}
+				else if (ch.getClass() == LocalInputChannel.class) {
+					final LocalInputChannel ich = (LocalInputChannel) ch;
 
-				LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
+					if (retriggerLocalRequestTimer == null) {
+						retriggerLocalRequestTimer = new Timer(true);
+					}
 
-				rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
+					ich.retriggerSubpartitionRequest(retriggerLocalRequestTimer, consumedSubpartitionIndex);
+				}
+				else {
+					throw new IllegalStateException(
+							"Unexpected type of channel to retrigger partition: " + ch.getClass());
+				}
 			}
 		}
 	}
@@ -310,6 +323,10 @@ public class SingleInputGate implements InputGate {
 				try {
 					LOG.debug("{}: Releasing {}.", owningTaskName, this);
 
+					if (retriggerLocalRequestTimer != null) {
+						retriggerLocalRequestTimer.cancel();
+					}
+
 					for (InputChannel inputChannel : inputChannels.values()) {
 						try {
 							inputChannel.releaseAllResources();
@@ -488,7 +505,8 @@ public class SingleInputGate implements InputGate {
 			if (partitionLocation.isLocal()) {
 				inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
 						networkEnvironment.getPartitionManager(),
-						networkEnvironment.getTaskEventDispatcher());
+						networkEnvironment.getTaskEventDispatcher(),
+						networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 0aa7ea3..e4b9e57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -56,7 +56,7 @@ public class UnknownInputChannel extends InputChannel {
 			ConnectionManager connectionManager,
 			Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff) {
 
-		super(gate, channelIndex, partitionId);
+		super(gate, channelIndex, partitionId, partitionRequestInitialAndMaxBackoff);
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
@@ -116,6 +116,6 @@ public class UnknownInputChannel extends InputChannel {
 	}
 
 	public LocalInputChannel toLocalInputChannel() {
-		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher);
+		return new LocalInputChannel(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, partitionRequestInitialAndMaxBackoff);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 51ca90d..5bcda60 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -26,4 +26,4 @@ case class NetworkEnvironmentConfiguration(
   networkBufferSize: Int,
   ioMode: IOMode,
   nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialAndMaxBackoff: Tuple2[Integer, Integer] = (50, 3000))
+  partitionRequestInitialAndMaxBackoff: Tuple2[Integer, Integer] = (500, 3000))

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
new file mode 100644
index 0000000..e95c774
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class InputChannelTest {
+
+	@Test
+	public void testExponentialBackoff() throws Exception {
+		InputChannel ch = createInputChannel(500, 4000);
+
+		assertEquals(0, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(500, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(1000, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(2000, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(4000, ch.getCurrentBackoff());
+
+		assertFalse(ch.increaseBackoff());
+		assertEquals(4000, ch.getCurrentBackoff());
+	}
+
+	@Test
+	public void testExponentialBackoffCappedAtMax() throws Exception {
+		InputChannel ch = createInputChannel(500, 3000);
+
+		assertEquals(0, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(500, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(1000, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(2000, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(3000, ch.getCurrentBackoff());
+
+		assertFalse(ch.increaseBackoff());
+		assertEquals(3000, ch.getCurrentBackoff());
+	}
+
+	@Test
+	public void testExponentialBackoffSingle() throws Exception {
+		InputChannel ch = createInputChannel(500, 500);
+
+		assertEquals(0, ch.getCurrentBackoff());
+
+		assertTrue(ch.increaseBackoff());
+		assertEquals(500, ch.getCurrentBackoff());
+
+		assertFalse(ch.increaseBackoff());
+		assertEquals(500, ch.getCurrentBackoff());
+	}
+
+	@Test
+	public void testExponentialNoBackoff() throws Exception {
+		InputChannel ch = createInputChannel(0, 0);
+
+		assertEquals(0, ch.getCurrentBackoff());
+
+		assertFalse(ch.increaseBackoff());
+		assertEquals(0, ch.getCurrentBackoff());
+	}
+
+	private InputChannel createInputChannel(int initialBackoff, int maxBackoff) {
+		return new MockInputChannel(
+				mock(SingleInputGate.class),
+				0,
+				new ResultPartitionID(),
+				new Tuple2<Integer, Integer>(initialBackoff, maxBackoff));
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	private static class MockInputChannel extends InputChannel {
+
+		private MockInputChannel(
+				SingleInputGate inputGate,
+				int channelIndex,
+				ResultPartitionID partitionId,
+				Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
+			super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
+		}
+
+		@Override
+		void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+		}
+
+		@Override
+		Buffer getNextBuffer() throws IOException, InterruptedException {
+			return null;
+		}
+
+		@Override
+		void sendTaskEvent(TaskEvent event) throws IOException {
+		}
+
+		@Override
+		boolean isReleased() {
+			return false;
+		}
+
+		@Override
+		void notifySubpartitionConsumed() throws IOException {
+		}
+
+		@Override
+		void releaseAllResources() throws IOException {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 9bef886..c25e1d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -23,11 +23,15 @@ import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -40,9 +44,15 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Tuple2;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -50,7 +60,14 @@ import java.util.concurrent.Future;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class LocalInputChannelTest {
 
@@ -159,6 +176,77 @@ public class LocalInputChannelTest {
 		}
 	}
 
+	@Test
+	public void testPartitionRequestExponentialBackoff() throws Exception {
+		// Config
+		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
+
+		// Start with initial backoff, then keep doubling, and cap at max.
+		int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
+
+		// Setup
+		SingleInputGate inputGate = mock(SingleInputGate.class);
+
+		BufferProvider bufferProvider = mock(BufferProvider.class);
+		when(inputGate.getBufferProvider()).thenReturn(bufferProvider);
+
+		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+		LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff);
+
+		when(partitionManager
+				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider)))
+				.thenThrow(new PartitionNotFoundException(ch.partitionId));
+
+		Timer timer = mock(Timer.class);
+		doAnswer(new Answer<Void>() {
+
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				((TimerTask) invocation.getArguments()[0]).run();
+				return null;
+			}
+		}).when(timer).schedule(any(TimerTask.class), anyLong());
+
+		// Initial request
+		ch.requestSubpartition(0);
+		verify(partitionManager)
+				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider));
+
+		// Request subpartition and verify that the actual requests are delayed.
+		for (long expected : expectedDelays) {
+			ch.retriggerSubpartitionRequest(timer, 0);
+
+			verify(timer).schedule(any(TimerTask.class), eq(expected));
+		}
+
+		// Exception after backoff is greater than the maximum backoff.
+		try {
+			ch.retriggerSubpartitionRequest(timer, 0);
+			ch.getNextBuffer();
+			fail("Did not throw expected exception.");
+		}
+		catch (Exception expected) {
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	private LocalInputChannel createLocalInputChannel(
+			SingleInputGate inputGate,
+			ResultPartitionManager partitionManager,
+			Tuple2<Integer, Integer> initialAndMaxRequestBackoff)
+			throws IOException, InterruptedException {
+
+		return new LocalInputChannel(
+				inputGate,
+				0,
+				new ResultPartitionID(),
+				partitionManager,
+				mock(TaskEventDispatcher.class),
+				initialAndMaxRequestBackoff);
+	}
+
 	/**
 	 * Returns the configured number of buffers for each channel in a random order.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb890f1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d33dcd7..c25b9a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -692,10 +692,10 @@ public class TaskManagerTest {
 	}
 
 	/**
-	 * Tests that repeated {@link PartitionNotFoundException}s fail the receiver.
+	 * Tests that repeated remote {@link PartitionNotFoundException}s ultimately fail the receiver.
 	 */
 	@Test
-	public void testPartitionNotFound() throws Exception {
+	public void testRemotePartitionNotFound() throws Exception {
 
 		new JavaTestKit(system){{
 
@@ -775,6 +775,87 @@ public class TaskManagerTest {
 		}};
 	}
 
+	/**
+	 *  Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
+	 */
+	@Test
+	public void testLocalPartitionNotFound() throws Exception {
+
+		new JavaTestKit(system){{
+
+			ActorRef jobManager = null;
+			ActorRef taskManager = null;
+
+			try {
+				final IntermediateDataSetID resultId = new IntermediateDataSetID();
+
+				// Create the JM
+				jobManager = system.actorOf(Props.create(
+						new SimplePartitionStateLookupJobManagerCreator(resultId, getTestActor())));
+
+				final int dataPort = NetUtils.getAvailablePort();
+				taskManager = createTaskManager(jobManager, true, true, dataPort);
+
+				// ---------------------------------------------------------------------------------
+
+				final ActorRef tm = taskManager;
+
+				final JobID jid = new JobID();
+				final JobVertexID vid = new JobVertexID();
+				final ExecutionAttemptID eid = new ExecutionAttemptID();
+
+				final ResultPartitionID partitionId = new ResultPartitionID();
+
+				// Local location (on the same TM though) for the partition
+				final ResultPartitionLocation loc = ResultPartitionLocation.createLocal();
+
+				final InputChannelDeploymentDescriptor[] icdd =
+						new InputChannelDeploymentDescriptor[] {
+								new InputChannelDeploymentDescriptor(partitionId, loc)};
+
+				final InputGateDeploymentDescriptor igdd =
+						new InputGateDeploymentDescriptor(resultId, 0, icdd);
+
+				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+						jid, vid, eid, "Receiver", 0, 1,
+						new Configuration(), new Configuration(),
+						Tasks.AgnosticReceiver.class.getName(),
+						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+						Collections.singletonList(igdd),
+						Collections.<BlobKey>emptyList(), 0);
+
+				new Within(d) {
+					@Override
+					protected void run() {
+						// Submit the task
+						tm.tell(new SubmitTask(tdd), getTestActor());
+						expectMsgClass(Messages.getAcknowledge().getClass());
+
+						// Wait to be notified about the final execution state by the mock JM
+						TaskExecutionState msg = expectMsgClass(TaskExecutionState.class);
+
+						// The task should fail after repeated requests
+						assertEquals(msg.getExecutionState(), ExecutionState.FAILED);
+						assertEquals(msg.getError(ClassLoader.getSystemClassLoader()).getClass(),
+								PartitionNotFoundException.class);
+					}
+				};
+			}
+			catch(Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+			finally {
+				if (taskManager != null) {
+					taskManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+
+				if (jobManager != null) {
+					jobManager.tell(Kill.getInstance(), ActorRef.noSender());
+				}
+			}
+		}};
+	}
 
 	// --------------------------------------------------------------------------------------------