You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:39 UTC

[5/6] flink git commit: [FLINK-5169] [network] Make consumption of InputChannels fair

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
deleted file mode 100644
index c86697f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * View over a spilled subpartition.
- *
- * <p> Reads are done synchronously.
- */
-class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
-
-	/** The subpartition this view belongs to. */
-	private final ResultSubpartition parent;
-
-	/** The synchronous file reader to do the actual I/O. */
-	private final BufferFileReader fileReader;
-
-	/** The buffer pool to read data into. */
-	private final SpillReadBufferPool bufferPool;
-
-	/** Flag indicating whether all resources have been released. */
-	private AtomicBoolean isReleased = new AtomicBoolean();
-
-	/** Spilled file size */
-	private final long fileSize;
-
-	SpilledSubpartitionViewSyncIO(
-			ResultSubpartition parent,
-			int memorySegmentSize,
-			FileIOChannel.ID channelId,
-			long initialSeekPosition) throws IOException {
-
-		checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0.");
-
-		this.parent = checkNotNull(parent);
-
-		this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
-
-		this.fileReader = new SynchronousBufferFileReader(channelId, false);
-
-		if (initialSeekPosition > 0) {
-			fileReader.seekToPosition(initialSeekPosition);
-		}
-
-		this.fileSize = fileReader.getSize();
-	}
-
-	@Override
-	public Buffer getNextBuffer() throws IOException, InterruptedException {
-
-		if (fileReader.hasReachedEndOfFile()) {
-			return null;
-		}
-
-		// It's OK to request the buffer in a blocking fashion as the buffer pool is NOT shared
-		// among all consumed subpartitions.
-		final Buffer buffer = bufferPool.requestBufferBlocking();
-
-		fileReader.readInto(buffer);
-
-		return buffer;
-	}
-
-	@Override
-	public boolean registerListener(NotificationListener listener) throws IOException {
-		return false;
-	}
-
-	@Override
-	public void notifySubpartitionConsumed() throws IOException {
-		parent.onConsumedSubpartition();
-	}
-
-	@Override
-	public void releaseAllResources() throws IOException {
-		if (isReleased.compareAndSet(false, true)) {
-			fileReader.close();
-			bufferPool.destroy();
-		}
-	}
-
-	@Override
-	public boolean isReleased() {
-		return parent.isReleased() || isReleased.get();
-	}
-
-	@Override
-	public Throwable getFailureCause() {
-		return parent.getFailureCause();
-	}
-
-	@Override
-	public String toString() {
-		return String.format("SpilledSubpartitionView[sync](index: %d, file size: %d bytes) of ResultPartition %s",
-				parent.index,
-				fileSize,
-				parent.parent.getPartitionId());
-	}
-
-	/**
-	 * A buffer pool to provide buffer to read the file into.
-	 *
-	 * <p> This pool ensures that a consuming input gate makes progress in all cases, even when all
-	 * buffers of the input gate buffer pool have been requested by remote input channels.
-	 *
-	 * TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
-	 * consumed subpartition.
-	 */
-	private static class SpillReadBufferPool implements BufferRecycler {
-
-		private final Queue<Buffer> buffers;
-
-		private boolean isDestroyed;
-
-		public SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
-			this.buffers = new ArrayDeque<Buffer>(numberOfBuffers);
-
-			synchronized (buffers) {
-				for (int i = 0; i < numberOfBuffers; i++) {
-					buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
-				}
-			}
-		}
-
-		@Override
-		public void recycle(MemorySegment memorySegment) {
-			synchronized (buffers) {
-				if (isDestroyed) {
-					memorySegment.free();
-				}
-				else {
-					buffers.add(new Buffer(memorySegment, this));
-					buffers.notifyAll();
-				}
-			}
-		}
-
-		private Buffer requestBufferBlocking() throws InterruptedException {
-			synchronized (buffers) {
-				while (true) {
-					if (isDestroyed) {
-						return null;
-					}
-
-					Buffer buffer = buffers.poll();
-
-					if (buffer != null) {
-						return buffer;
-					}
-					// Else: wait for a buffer
-					buffers.wait();
-				}
-			}
-		}
-
-		private void destroy() {
-			synchronized (buffers) {
-				isDestroyed = true;
-				buffers.notifyAll();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index 885e738..3e93ae6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -34,18 +34,35 @@ public class BufferOrEvent {
 
 	private final AbstractEvent event;
 
+	/**
+	 * Indicate availability of further instances for the union input gate.
+	 * This is not needed outside of the input gate unioning logic and cannot
+	 * be set outside of the consumer package.
+	 */
+	private final boolean moreAvailable;
+
 	private int channelIndex;
 
-	public BufferOrEvent(Buffer buffer, int channelIndex) {
+	BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) {
 		this.buffer = checkNotNull(buffer);
 		this.event = null;
 		this.channelIndex = channelIndex;
+		this.moreAvailable = moreAvailable;
 	}
 
-	public BufferOrEvent(AbstractEvent event, int channelIndex) {
+	BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) {
 		this.buffer = null;
 		this.event = checkNotNull(event);
 		this.channelIndex = channelIndex;
+		this.moreAvailable = moreAvailable;
+	}
+
+	public BufferOrEvent(Buffer buffer, int channelIndex) {
+		this(buffer, channelIndex, true);
+	}
+
+	public BufferOrEvent(AbstractEvent event, int channelIndex) {
+		this(event, channelIndex, true);
 	}
 
 	public boolean isBuffer() {
@@ -73,6 +90,10 @@ public class BufferOrEvent {
 		this.channelIndex = channelIndex;
 	}
 
+	boolean moreAvailable() {
+		return moreAvailable;
+	}
+
 	@Override
 	public String toString() {
 		return String.format("BufferOrEvent [%s, channelIndex = %d]",

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 35094e2..f46abfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -101,10 +101,19 @@ public abstract class InputChannel {
 	}
 
 	/**
-	 * Notifies the owning {@link SingleInputGate} about an available {@link Buffer} instance.
+	 * Notifies the owning {@link SingleInputGate} that this channel became non-empty.
+	 * 
+	 * <p>This is guaranteed to be called only when a Buffer was added to a previously
+	 * empty input channel. The notion of empty is atomically consistent with the flag
+	 * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer
+	 * from this channel.
+	 * 
+	 * <p><b>Note:</b> When the input channel observes an exception, this
+	 * method is called regardless of whether the channel was empty before. That ensures
+	 * that the parent InputGate will always be notified about the exception.
 	 */
-	protected void notifyAvailableBuffer() {
-		inputGate.onAvailableBuffer(this);
+	protected void notifyChannelNonEmpty() {
+		inputGate.notifyChannelNonEmpty(this);
 	}
 
 	// ------------------------------------------------------------------------
@@ -123,7 +132,7 @@ public abstract class InputChannel {
 	/**
 	 * Returns the next buffer from the consumed subpartition.
 	 */
-	abstract Buffer getNextBuffer() throws IOException, InterruptedException;
+	abstract BufferAndAvailability getNextBuffer() throws IOException, InterruptedException;
 
 	// ------------------------------------------------------------------------
 	// Task events
@@ -182,7 +191,7 @@ public abstract class InputChannel {
 	protected void setError(Throwable cause) {
 		if (this.cause.compareAndSet(null, checkNotNull(cause))) {
 			// Notify the input gate.
-			notifyAvailableBuffer();
+			notifyChannelNonEmpty();
 		}
 	}
 
@@ -225,4 +234,28 @@ public abstract class InputChannel {
 		// Reached maximum backoff
 		return false;
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A combination of a {@link Buffer} and a flag indicating availability of further buffers.
+	 */
+	public static final class BufferAndAvailability {
+
+		private final Buffer buffer;
+		private final boolean moreAvailable;
+
+		public BufferAndAvailability(Buffer buffer, boolean moreAvailable) {
+			this.buffer = checkNotNull(buffer);
+			this.moreAvailable = moreAvailable;
+		}
+
+		public Buffer buffer() {
+			return buffer;
+		}
+
+		public boolean moreAvailable() {
+			return moreAvailable;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 1cd5fc5..1f2182e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
@@ -77,7 +76,7 @@ public interface InputGate {
 
 	void sendTaskEvent(TaskEvent event) throws IOException;
 
-	void registerListener(EventListener<InputGate> listener);
+	void registerListener(InputGateListener listener);
 
 	int getPageSize();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
new file mode 100644
index 0000000..00fa782
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+/**
+ * Listener interface implemented by consumers of {@link InputGate} instances
+ * that want to be notified of availability of buffer or event instances.
+ */
+public interface InputGateListener {
+
+	/**
+	 * Notification callback if the input gate moves from zero to non-zero
+	 * available input channels with data.
+	 *
+	 * @param inputGate Input Gate that became available.
+	 */
+	void notifyInputGateNonEmpty(InputGate inputGate);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 55ff539..d5308a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,24 +18,23 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -43,11 +42,13 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * An input channel, which requests a local subpartition.
  */
-public class LocalInputChannel extends InputChannel implements NotificationListener {
+public class LocalInputChannel extends InputChannel implements BufferAvailabilityListener {
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
 
-	private final Object requestLock = new Object();
+	// ------------------------------------------------------------------------
+
+	private final Object requestReleaseLock = new Object();
 
 	/** The local partition manager. */
 	private final ResultPartitionManager partitionManager;
@@ -55,39 +56,41 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	/** Task event dispatcher for backwards events. */
 	private final TaskEventDispatcher taskEventDispatcher;
 
+	/** Number of available buffers used to keep track of non-empty gate notifications. */
+	private final AtomicLong numBuffersAvailable;
+
 	/** The consumed subpartition */
 	private volatile ResultSubpartitionView subpartitionView;
 
 	private volatile boolean isReleased;
 
-	private volatile Buffer lookAhead;
-
 	LocalInputChannel(
-			SingleInputGate inputGate,
-			int channelIndex,
-			ResultPartitionID partitionId,
-			ResultPartitionManager partitionManager,
-			TaskEventDispatcher taskEventDispatcher,
-			TaskIOMetricGroup metrics) {
+		SingleInputGate inputGate,
+		int channelIndex,
+		ResultPartitionID partitionId,
+		ResultPartitionManager partitionManager,
+		TaskEventDispatcher taskEventDispatcher,
+		TaskIOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
-				0, 0, metrics);
+			0, 0, metrics);
 	}
 
 	LocalInputChannel(
-			SingleInputGate inputGate,
-			int channelIndex,
-			ResultPartitionID partitionId,
-			ResultPartitionManager partitionManager,
-			TaskEventDispatcher taskEventDispatcher,
-			int initialBackoff,
-			int maxBackoff,
-			TaskIOMetricGroup metrics) {
+		SingleInputGate inputGate,
+		int channelIndex,
+		ResultPartitionID partitionId,
+		ResultPartitionManager partitionManager,
+		TaskEventDispatcher taskEventDispatcher,
+		int initialBackoff,
+		int maxBackoff,
+		TaskIOMetricGroup metrics) {
 
 		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
 
 		this.partitionManager = checkNotNull(partitionManager);
 		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
+		this.numBuffersAvailable = new AtomicLong();
 	}
 
 	// ------------------------------------------------------------------------
@@ -97,30 +100,36 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	@Override
 	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		// The lock is required to request only once in the presence of retriggered requests.
-		synchronized (requestLock) {
+		synchronized (requestReleaseLock) {
+			checkState(!isReleased, "released");
+
 			if (subpartitionView == null) {
 				LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
-						this, subpartitionIndex, partitionId);
+					this, subpartitionIndex, partitionId);
 
 				try {
-					subpartitionView = partitionManager.createSubpartitionView(
-							partitionId, subpartitionIndex, inputGate.getBufferProvider());
-				}
-				catch (PartitionNotFoundException notFound) {
+					ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView(
+						partitionId, subpartitionIndex, inputGate.getBufferProvider(), this);
+
+					if (subpartitionView == null) {
+						throw new IOException("Error requesting subpartition.");
+					}
+
+					// make the subpartition view visible
+					this.subpartitionView = subpartitionView;
+
+					// check if the channel was released in the meantime
+					if (isReleased) {
+						subpartitionView.releaseAllResources();
+						this.subpartitionView = null;
+					}
+				} catch (PartitionNotFoundException notFound) {
 					if (increaseBackoff()) {
 						inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
-						return;
-					}
-					else {
+					} else {
 						throw notFound;
 					}
 				}
-
-				if (subpartitionView == null) {
-					throw new IOException("Error requesting subpartition.");
-				}
-
-				getNextLookAhead();
 			}
 		}
 	}
@@ -128,17 +137,16 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	/**
 	 * Retriggers a subpartition request.
 	 */
-	void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException {
-		synchronized (requestLock) {
-			checkState(subpartitionView == null, "Already requested partition.");
+	void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) {
+		synchronized (requestReleaseLock) {
+			checkState(subpartitionView == null, "already requested partition");
 
 			timer.schedule(new TimerTask() {
 				@Override
 				public void run() {
 					try {
 						requestSubpartition(subpartitionIndex);
-					}
-					catch (Throwable t) {
+					} catch (Throwable t) {
 						setError(t);
 					}
 				}
@@ -147,29 +155,49 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	}
 
 	@Override
-	Buffer getNextBuffer() throws IOException, InterruptedException {
+	BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
 		checkError();
-		checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
 
-		// After subscribe notification
-		if (lookAhead == null) {
-			lookAhead = subpartitionView.getNextBuffer();
+		ResultSubpartitionView subpartitionView = this.subpartitionView;
+		if (subpartitionView == null) {
+			// this can happen if the request for the partition was triggered asynchronously
+			// by the time trigger
+			// would be good to avoid that, by guaranteeing that the requestPartition() and
+			// getNextBuffer() always come from the same thread
+			// we could do that by letting the timer insert a special "requesting channel" into the input gate's queue
+			subpartitionView = checkAndWaitForSubpartitionView();
 		}
 
-		Buffer next = lookAhead;
-		lookAhead = null;
+		Buffer next = subpartitionView.getNextBuffer();
+		long remaining = numBuffersAvailable.decrementAndGet();
 
-		if (!next.isBuffer() && EventSerializer
-				.fromBuffer(next, getClass().getClassLoader())
-				.getClass() == EndOfPartitionEvent.class) {
-
-				return next;
+		if (remaining >= 0) {
+			numBytesIn.inc(next.getSize());
+			return new BufferAndAvailability(next, remaining > 0);
+		} else if (subpartitionView.isReleased()) {
+			throw new ProducerFailedException(subpartitionView.getFailureCause());
+		} else {
+			throw new IllegalStateException("No buffer available and producer partition not released.");
 		}
+	}
 
-		getNextLookAhead();
+	@Override
+	public void notifyBuffersAvailable(long numBuffers) {
+		// if this request made the channel non-empty, notify the input gate
+		if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
+			notifyChannelNonEmpty();
+		}
+	}
 
-		numBytesIn.inc(next.getSize());
-		return next;
+	private ResultSubpartitionView checkAndWaitForSubpartitionView() {
+		// synchronizing on the request lock means this blocks until the asynchronous request
+		// for the partition view has been completed
+		// by then the subpartition view is visible or the channel is released
+		synchronized (requestReleaseLock) {
+			checkState(!isReleased, "released");
+			checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
+			return subpartitionView;
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -208,18 +236,15 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	 */
 	@Override
 	void releaseAllResources() throws IOException {
-		if (!isReleased) {
-			if (lookAhead != null) {
-				lookAhead.recycle();
-				lookAhead = null;
-			}
+		synchronized (requestReleaseLock) {
+			if (!isReleased) {
+				isReleased = true;
 
-			if (subpartitionView != null) {
-				subpartitionView.releaseAllResources();
-				subpartitionView = null;
+				if (subpartitionView != null) {
+					subpartitionView.releaseAllResources();
+					subpartitionView = null;
+				}
 			}
-
-			isReleased = true;
 		}
 	}
 
@@ -227,55 +252,4 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	public String toString() {
 		return "LocalInputChannel [" + partitionId + "]";
 	}
-
-	// ------------------------------------------------------------------------
-	// Queue iterator listener (called by producing or disk I/O thread)
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void onNotification() {
-		if (isReleased) {
-			return;
-		}
-
-		try {
-			getNextLookAhead();
-		}
-		catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private void getNextLookAhead() throws IOException, InterruptedException {
-
-		final ResultSubpartitionView view = subpartitionView;
-
-		if (view == null) {
-			return;
-		}
-
-		while (true) {
-			lookAhead = view.getNextBuffer();
-
-			if (lookAhead != null) {
-				notifyAvailableBuffer();
-				break;
-			}
-
-			if (view.registerListener(this)) {
-				return;
-			}
-			else if (view.isReleased()) {
-				Throwable cause = view.getFailureCause();
-
-				if (cause != null) {
-					setError(new ProducerFailedException(cause));
-				}
-
-				return;
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 13a71a9..ed3122e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -27,8 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -43,8 +41,6 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class RemoteInputChannel extends InputChannel {
 
-	private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
-
 	/** ID to distinguish this channel from other channels sharing the same TCP connection. */
 	private final InputChannelID id = new InputChannelID();
 
@@ -58,7 +54,7 @@ public class RemoteInputChannel extends InputChannel {
 	 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
 	 * is consumed by the receiving task thread.
 	 */
-	private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
+	private final Queue<Buffer> receivedBuffers = new ArrayDeque<>();
 
 	/**
 	 * Flag indicating whether this channel has been released. Either called by the receiving task
@@ -76,28 +72,27 @@ public class RemoteInputChannel extends InputChannel {
 	private int expectedSequenceNumber = 0;
 
 	public RemoteInputChannel(
-			SingleInputGate inputGate,
-			int channelIndex,
-			ResultPartitionID partitionId,
-			ConnectionID connectionId,
-			ConnectionManager connectionManager,
-			TaskIOMetricGroup metrics) {
-
-		this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
-				0, 0, metrics);
+		SingleInputGate inputGate,
+		int channelIndex,
+		ResultPartitionID partitionId,
+		ConnectionID connectionId,
+		ConnectionManager connectionManager,
+		TaskIOMetricGroup metrics) {
+
+		this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics);
 	}
 
 	public RemoteInputChannel(
-			SingleInputGate inputGate,
-			int channelIndex,
-			ResultPartitionID partitionId,
-			ConnectionID connectionId,
-			ConnectionManager connectionManager,
-			int initialBackoff,
-			int maxBackoff,
-			TaskIOMetricGroup metrics) {
+		SingleInputGate inputGate,
+		int channelIndex,
+		ResultPartitionID partitionId,
+		ConnectionID connectionId,
+		ConnectionManager connectionManager,
+		int initialBackOff,
+		int maxBackoff,
+		TaskIOMetricGroup metrics) {
 
-		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter());
+		super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -115,7 +110,7 @@ public class RemoteInputChannel extends InputChannel {
 		if (partitionRequestClient == null) {
 			// Create a client and request the partition
 			partitionRequestClient = connectionManager
-					.createPartitionRequestClient(connectionId);
+				.createPartitionRequestClient(connectionId);
 
 			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
 		}
@@ -129,31 +124,29 @@ public class RemoteInputChannel extends InputChannel {
 
 		if (increaseBackoff()) {
 			partitionRequestClient.requestSubpartition(
-					partitionId, subpartitionIndex, this, getCurrentBackoff());
-		}
-		else {
+				partitionId, subpartitionIndex, this, getCurrentBackoff());
+		} else {
 			failPartitionRequest();
 		}
 	}
 
 	@Override
-	Buffer getNextBuffer() throws IOException {
+	BufferAndAvailability getNextBuffer() throws IOException {
 		checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
 		checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
 
 		checkError();
 
-		synchronized (receivedBuffers) {
-			Buffer buffer = receivedBuffers.poll();
-
-			// Sanity check that channel is only queried after a notification
-			if (buffer == null) {
-				throw new IOException("Queried input channel for data although non is available.");
-			}
+		final Buffer next;
+		final int remaining;
 
-			numBytesIn.inc(buffer.getSize());
-			return buffer;
+		synchronized (receivedBuffers) {
+			next = receivedBuffers.poll();
+			remaining = receivedBuffers.size();
 		}
+
+		numBytesIn.inc(next.getSize());
+		return new BufferAndAvailability(next, remaining > 0);
 	}
 
 	// ------------------------------------------------------------------------
@@ -201,14 +194,13 @@ public class RemoteInputChannel extends InputChannel {
 			// buffers received concurrently with closing are properly recycled.
 			if (partitionRequestClient != null) {
 				partitionRequestClient.close(this);
-			}
-			else {
+			} else {
 				connectionManager.closeOpenChannelConnections(connectionId);
 			}
 		}
 	}
 
-	public void failPartitionRequest() {
+	private void failPartitionRequest() {
 		setError(new PartitionNotFoundException(partitionId));
 	}
 
@@ -246,20 +238,22 @@ public class RemoteInputChannel extends InputChannel {
 			synchronized (receivedBuffers) {
 				if (!isReleased.get()) {
 					if (expectedSequenceNumber == sequenceNumber) {
+						int available = receivedBuffers.size();
+
 						receivedBuffers.add(buffer);
 						expectedSequenceNumber++;
 
-						notifyAvailableBuffer();
+						if (available == 0) {
+							notifyChannelNonEmpty();
+						}
 
 						success = true;
-					}
-					else {
+					} else {
 						onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
 					}
 				}
 			}
-		}
-		finally {
+		} finally {
 			if (!success) {
 				buffer.recycle();
 			}
@@ -271,8 +265,7 @@ public class RemoteInputChannel extends InputChannel {
 			if (!isReleased.get()) {
 				if (expectedSequenceNumber == sequenceNumber) {
 					expectedSequenceNumber++;
-				}
-				else {
+				} else {
 					onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
 				}
 			}
@@ -287,7 +280,7 @@ public class RemoteInputChannel extends InputChannel {
 		setError(cause);
 	}
 
-	public static class BufferReorderingException extends IOException {
+	private static class BufferReorderingException extends IOException {
 
 		private static final long serialVersionUID = -888282210356266816L;
 
@@ -295,7 +288,7 @@ public class RemoteInputChannel extends InputChannel {
 
 		private final int actualSequenceNumber;
 
-		public BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
+		BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
 			this.expectedSequenceNumber = expectedSequenceNumber;
 			this.actualSequenceNumber = actualSequenceNumber;
 		}
@@ -303,7 +296,7 @@ public class RemoteInputChannel extends InputChannel {
 		@Override
 		public String getMessage() {
 			return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.",
-					expectedSequenceNumber, actualSequenceNumber);
+				expectedSequenceNumber, actualSequenceNumber);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d7ed33c..bcbb2c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import com.google.common.collect.Maps;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -36,22 +34,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -136,7 +134,7 @@ public class SingleInputGate implements InputGate {
 	private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
 
 	/** Channels, which notified this input gate about available data. */
-	private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
+	private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>();
 
 	private final BitSet channelsWithEndOfPartitionEvents;
 
@@ -158,9 +156,9 @@ public class SingleInputGate implements InputGate {
 	private volatile boolean isReleased;
 
 	/** Registered listener to forward buffer notifications to. */
-	private volatile EventListener<InputGate> registeredListener;
+	private volatile InputGateListener inputGateListener;
 
-	private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
+	private final List<TaskEvent> pendingEvents = new ArrayList<>();
 
 	private int numberOfUninitializedChannels;
 
@@ -168,14 +166,14 @@ public class SingleInputGate implements InputGate {
 	private Timer retriggerLocalRequestTimer;
 
 	public SingleInputGate(
-			String owningTaskName,
-			JobID jobId,
-			ExecutionAttemptID executionId,
-			IntermediateDataSetID consumedResultId,
-			int consumedSubpartitionIndex,
-			int numberOfInputChannels,
-			TaskActions taskActions,
-			TaskIOMetricGroup metrics) {
+		String owningTaskName,
+		JobID jobId,
+		ExecutionAttemptID executionId,
+		IntermediateDataSetID consumedResultId,
+		int consumedSubpartitionIndex,
+		int numberOfInputChannels,
+		TaskActions taskActions,
+		TaskIOMetricGroup metrics) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
@@ -263,7 +261,7 @@ public class SingleInputGate implements InputGate {
 		this.bufferPool = checkNotNull(bufferPool);
 	}
 
-	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
+	void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
 		synchronized (requestLock) {
 			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null
 					&& inputChannel.getClass() == UnknownInputChannel.class) {
@@ -355,6 +353,7 @@ public class SingleInputGate implements InputGate {
 	}
 
 	public void releaseAllResources() throws IOException {
+		boolean released = false;
 		synchronized (requestLock) {
 			if (!isReleased) {
 				try {
@@ -381,9 +380,16 @@ public class SingleInputGate implements InputGate {
 				}
 				finally {
 					isReleased = true;
+					released = true;
 				}
 			}
 		}
+
+		if (released) {
+			synchronized (inputChannelsWithData) {
+				inputChannelsWithData.notifyAll();
+			}
+		}
 	}
 
 	@Override
@@ -429,32 +435,50 @@ public class SingleInputGate implements InputGate {
 
 	@Override
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-
 		if (hasReceivedAllEndOfPartitionEvents) {
 			return null;
 		}
 
+		if (isReleased) {
+			throw new IllegalStateException("Released");
+		}
+
 		requestPartitions();
 
-		InputChannel currentChannel = null;
-		while (currentChannel == null) {
-			if (isReleased) {
-				throw new IllegalStateException("Released");
+		InputChannel currentChannel;
+		boolean moreAvailable;
+
+		synchronized (inputChannelsWithData) {
+			while (inputChannelsWithData.size() == 0) {
+				if (isReleased) {
+					throw new IllegalStateException("Released");
+				}
+
+				inputChannelsWithData.wait();
 			}
 
-			currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
+			currentChannel = inputChannelsWithData.remove();
+			moreAvailable = inputChannelsWithData.size() > 0;
 		}
 
-		final Buffer buffer = currentChannel.getNextBuffer();
+		final BufferAndAvailability result = currentChannel.getNextBuffer();
 
 		// Sanity check that notifications only happen when data is available
-		if (buffer == null) {
+		if (result == null) {
 			throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
 					"notified by channel about available data, but none was available.");
 		}
 
+		// this channel was now removed from the non-empty channels queue
+		// we re-add it in case it has more data, because in that case no "non-empty" notification
+		// will come for that channel
+		if (result.moreAvailable()) {
+			queueChannel(currentChannel);
+		}
+
+		final Buffer buffer = result.buffer();
 		if (buffer.isBuffer()) {
-			return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
+			return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
 		}
 		else {
 			final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -471,7 +495,7 @@ public class SingleInputGate implements InputGate {
 				currentChannel.releaseAllResources();
 			}
 
-			return new BufferOrEvent(event, currentChannel.getChannelIndex());
+			return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
 		}
 	}
 
@@ -493,29 +517,45 @@ public class SingleInputGate implements InputGate {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void registerListener(EventListener<InputGate> listener) {
-		if (registeredListener == null) {
-			registeredListener = listener;
-		}
-		else {
+	public void registerListener(InputGateListener inputGateListener) {
+		if (this.inputGateListener == null) {
+			this.inputGateListener = inputGateListener;
+		} else {
 			throw new IllegalStateException("Multiple listeners");
 		}
 	}
 
-	public void onAvailableBuffer(InputChannel channel) {
-		inputChannelsWithData.add(channel);
-		EventListener<InputGate> listener = registeredListener;
-		if (listener != null) {
-			listener.onEvent(this);
-		}
+	void notifyChannelNonEmpty(InputChannel channel) {
+		queueChannel(checkNotNull(channel));
 	}
 
 	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
 		taskActions.triggerPartitionStateCheck(
-				jobId,
-				executionId,
-				consumedResultId,
-				partitionId);
+			jobId,
+			executionId,
+			consumedResultId,
+			partitionId);
+	}
+
+	private void queueChannel(InputChannel channel) {
+		int availableChannels;
+
+		synchronized (inputChannelsWithData) {
+			availableChannels = inputChannelsWithData.size();
+
+			inputChannelsWithData.add(channel);
+
+			if (availableChannels == 0) {
+				inputChannelsWithData.notify();
+			}
+		}
+
+		if (availableChannels == 0) {
+			InputGateListener listener = inputGateListener;
+			if (listener != null) {
+				listener.notifyInputGateNonEmpty(this);
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -531,13 +571,13 @@ public class SingleInputGate implements InputGate {
 	 * Creates an input gate and all of its input channels.
 	 */
 	public static SingleInputGate create(
-			String owningTaskName,
-			JobID jobId,
-			ExecutionAttemptID executionId,
-			InputGateDeploymentDescriptor igdd,
-			NetworkEnvironment networkEnvironment,
-			TaskActions taskActions,
-			TaskIOMetricGroup metrics) {
+		String owningTaskName,
+		JobID jobId,
+		ExecutionAttemptID executionId,
+		InputGateDeploymentDescriptor igdd,
+		NetworkEnvironment networkEnvironment,
+		TaskActions taskActions,
+		TaskIOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
 
@@ -547,8 +587,8 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
-				icdd.length, taskActions, metrics);
+			owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+			icdd.length, taskActions, metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index b1b8911..e8ccbb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -22,15 +22,11 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.ArrayDeque;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,19 +59,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It is possible to recursively union union input gates.
  */
-public class UnionInputGate implements InputGate {
+public class UnionInputGate implements InputGate, InputGateListener {
 
 	/** The input gates to union. */
 	private final InputGate[] inputGates;
 
 	private final Set<InputGate> inputGatesWithRemainingData;
 
-	/** Data availability listener across all unioned input gates. */
-	private final InputGateListener inputGateListener;
+	/** Gates, which notified this input gate about available data. */
+	private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();
 
 	/** The total number of input channels across all unioned input gates. */
 	private final int totalNumberOfInputChannels;
 
+	/** Registered listener to forward input gate notifications to. */
+	private volatile InputGateListener inputGateListener;
+
 	/**
 	 * A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0
 	 * (inclusive) to the total number of input channels (exclusive).
@@ -100,11 +99,12 @@ public class UnionInputGate implements InputGate {
 			inputGatesWithRemainingData.add(inputGate);
 
 			currentNumberOfInputChannels += inputGate.getNumberOfInputChannels();
+
+			// Register the union gate as a listener for all input gates
+			inputGate.registerListener(this);
 		}
 
 		this.totalNumberOfInputChannels = currentNumberOfInputChannels;
-
-		this.inputGateListener = new InputGateListener(inputGates, this);
 	}
 
 	/**
@@ -139,7 +139,6 @@ public class UnionInputGate implements InputGate {
 
 	@Override
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-
 		if (inputGatesWithRemainingData.isEmpty()) {
 			return null;
 		}
@@ -147,17 +146,31 @@ public class UnionInputGate implements InputGate {
 		// Make sure to request the partitions, if they have not been requested before.
 		requestPartitions();
 
-		final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom();
+		final InputGate inputGate;
+		synchronized (inputGatesWithData) {
+			while (inputGatesWithData.size() == 0) {
+				inputGatesWithData.wait();
+			}
+
+			inputGate = inputGatesWithData.remove();
+		}
 
 		final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
 
+		if (bufferOrEvent.moreAvailable()) {
+			// this buffer or event was now removed from the non-empty gates queue
+			// we re-add it in case it has more data, because in that case no "non-empty" notification
+			// will come for that gate
+			queueInputGate(inputGate);
+		}
+
 		if (bufferOrEvent.isEvent()
-				&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
-				&& inputGate.isFinished()) {
+			&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
+			&& inputGate.isFinished()) {
 
 			if (!inputGatesWithRemainingData.remove(inputGate)) {
 				throw new IllegalStateException("Couldn't find input gate in set of remaining " +
-						"input gates.");
+					"input gates.");
 			}
 		}
 
@@ -177,9 +190,12 @@ public class UnionInputGate implements InputGate {
 	}
 
 	@Override
-	public void registerListener(EventListener<InputGate> listener) {
-		// This method is called from the consuming task thread.
-		inputGateListener.registerListener(listener);
+	public void registerListener(InputGateListener listener) {
+		if (this.inputGateListener == null) {
+			this.inputGateListener = listener;
+		} else {
+			throw new IllegalStateException("Multiple listeners");
+		}
 	}
 
 	@Override
@@ -195,45 +211,29 @@ public class UnionInputGate implements InputGate {
 		return pageSize;
 	}
 
-	/**
-	 * Data availability listener at all unioned input gates.
-	 *
-	 * <p> The listener registers itself at each input gate and is notified for *each incoming
-	 * buffer* at one of the unioned input gates.
-	 */
-	private static class InputGateListener implements EventListener<InputGate> {
-
-		private final UnionInputGate unionInputGate;
-
-		private final BlockingQueue<InputGate> inputGatesWithData = new LinkedBlockingQueue<InputGate>();
+	@Override
+	public void notifyInputGateNonEmpty(InputGate inputGate) {
+		queueInputGate(checkNotNull(inputGate));
+	}
 
-		private final List<EventListener<InputGate>> registeredListeners = new CopyOnWriteArrayList<EventListener<InputGate>>();
+	private void queueInputGate(InputGate inputGate) {
+		int availableInputGates;
 
-		public InputGateListener(InputGate[] inputGates, UnionInputGate unionInputGate) {
-			for (InputGate inputGate : inputGates) {
-				inputGate.registerListener(this);
-			}
+		synchronized (inputGatesWithData) {
+			availableInputGates = inputGatesWithData.size();
 
-			this.unionInputGate = unionInputGate;
-		}
-
-		@Override
-		public void onEvent(InputGate inputGate) {
-			// This method is called from the input channel thread, which can be either the same
-			// thread as the consuming task thread or a different one.
 			inputGatesWithData.add(inputGate);
 
-			for (int i = 0; i < registeredListeners.size(); i++) {
-				registeredListeners.get(i).onEvent(unionInputGate);
+			if (availableInputGates == 0) {
+				inputGatesWithData.notify();
 			}
 		}
 
-		InputGate getNextInputGateToReadFrom() throws InterruptedException {
-			return inputGatesWithData.take();
-		}
-
-		public void registerListener(EventListener<InputGate> listener) {
-			registeredListeners.add(checkNotNull(listener));
+		if (availableInputGates == 0) {
+			InputGateListener listener = inputGateListener;
+			if (listener != null) {
+				listener.notifyInputGateNonEmpty(this);
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 08b5044..d887ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -23,8 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 
@@ -36,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * An input channel place holder to be replaced by either a {@link RemoteInputChannel}
  * or {@link LocalInputChannel} at runtime.
  */
-public class UnknownInputChannel extends InputChannel {
+class UnknownInputChannel extends InputChannel {
 
 	private final ResultPartitionManager partitionManager;
 
@@ -78,9 +76,9 @@ public class UnknownInputChannel extends InputChannel {
 	}
 
 	@Override
-	public Buffer getNextBuffer() throws IOException {
+	public BufferAndAvailability getNextBuffer() throws IOException {
 		// Nothing to do here
-		return null;
+		throw new UnsupportedOperationException("Cannot retrieve a buffer from an UnknownInputChannel");
 	}
 
 	@Override
@@ -93,8 +91,7 @@ public class UnknownInputChannel extends InputChannel {
 	 * <p>
 	 * <strong>Important</strong>: It is important that the method correctly
 	 * always <code>false</code> for unknown input channels in order to not
-	 * finish the consumption of an intermediate result partition early in
-	 * {@link BufferReader}.
+	 * finish the consumption of an intermediate result partition early.
 	 */
 	@Override
 	public boolean isReleased() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bd8c196..14ef1bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -347,7 +347,6 @@ public class Task implements Runnable, TaskActions {
 				networkEnvironment.getResultPartitionManager(),
 				resultPartitionConsumableNotifier,
 				ioManager,
-				networkEnvironment.getDefaultIOMode(),
 				desc.sendScheduleOrUpdateConsumersMessage());
 
 			writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);