You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:39 UTC
[5/6] flink git commit: [FLINK-5169] [network] Make consumption of
InputChannels fair
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
deleted file mode 100644
index c86697f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * View over a spilled subpartition.
- *
- * <p> Reads are done synchronously.
- */
-class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
-
- /** The subpartition this view belongs to. */
- private final ResultSubpartition parent;
-
- /** The synchronous file reader to do the actual I/O. */
- private final BufferFileReader fileReader;
-
- /** The buffer pool to read data into. */
- private final SpillReadBufferPool bufferPool;
-
- /** Flag indicating whether all resources have been released. */
- private AtomicBoolean isReleased = new AtomicBoolean();
-
- /** Spilled file size */
- private final long fileSize;
-
- SpilledSubpartitionViewSyncIO(
- ResultSubpartition parent,
- int memorySegmentSize,
- FileIOChannel.ID channelId,
- long initialSeekPosition) throws IOException {
-
- checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0.");
-
- this.parent = checkNotNull(parent);
-
- this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
-
- this.fileReader = new SynchronousBufferFileReader(channelId, false);
-
- if (initialSeekPosition > 0) {
- fileReader.seekToPosition(initialSeekPosition);
- }
-
- this.fileSize = fileReader.getSize();
- }
-
- @Override
- public Buffer getNextBuffer() throws IOException, InterruptedException {
-
- if (fileReader.hasReachedEndOfFile()) {
- return null;
- }
-
- // It's OK to request the buffer in a blocking fashion as the buffer pool is NOT shared
- // among all consumed subpartitions.
- final Buffer buffer = bufferPool.requestBufferBlocking();
-
- fileReader.readInto(buffer);
-
- return buffer;
- }
-
- @Override
- public boolean registerListener(NotificationListener listener) throws IOException {
- return false;
- }
-
- @Override
- public void notifySubpartitionConsumed() throws IOException {
- parent.onConsumedSubpartition();
- }
-
- @Override
- public void releaseAllResources() throws IOException {
- if (isReleased.compareAndSet(false, true)) {
- fileReader.close();
- bufferPool.destroy();
- }
- }
-
- @Override
- public boolean isReleased() {
- return parent.isReleased() || isReleased.get();
- }
-
- @Override
- public Throwable getFailureCause() {
- return parent.getFailureCause();
- }
-
- @Override
- public String toString() {
- return String.format("SpilledSubpartitionView[sync](index: %d, file size: %d bytes) of ResultPartition %s",
- parent.index,
- fileSize,
- parent.parent.getPartitionId());
- }
-
- /**
- * A buffer pool to provide buffer to read the file into.
- *
- * <p> This pool ensures that a consuming input gate makes progress in all cases, even when all
- * buffers of the input gate buffer pool have been requested by remote input channels.
- *
- * TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
- * consumed subpartition.
- */
- private static class SpillReadBufferPool implements BufferRecycler {
-
- private final Queue<Buffer> buffers;
-
- private boolean isDestroyed;
-
- public SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
- this.buffers = new ArrayDeque<Buffer>(numberOfBuffers);
-
- synchronized (buffers) {
- for (int i = 0; i < numberOfBuffers; i++) {
- buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
- }
- }
- }
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- synchronized (buffers) {
- if (isDestroyed) {
- memorySegment.free();
- }
- else {
- buffers.add(new Buffer(memorySegment, this));
- buffers.notifyAll();
- }
- }
- }
-
- private Buffer requestBufferBlocking() throws InterruptedException {
- synchronized (buffers) {
- while (true) {
- if (isDestroyed) {
- return null;
- }
-
- Buffer buffer = buffers.poll();
-
- if (buffer != null) {
- return buffer;
- }
- // Else: wait for a buffer
- buffers.wait();
- }
- }
- }
-
- private void destroy() {
- synchronized (buffers) {
- isDestroyed = true;
- buffers.notifyAll();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index 885e738..3e93ae6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -34,18 +34,35 @@ public class BufferOrEvent {
private final AbstractEvent event;
+ /**
+ * Indicate availability of further instances for the union input gate.
+ * This is not needed outside of the input gate unioning logic and cannot
+ * be set outside of the consumer package.
+ */
+ private final boolean moreAvailable;
+
private int channelIndex;
- public BufferOrEvent(Buffer buffer, int channelIndex) {
+ BufferOrEvent(Buffer buffer, int channelIndex, boolean moreAvailable) {
this.buffer = checkNotNull(buffer);
this.event = null;
this.channelIndex = channelIndex;
+ this.moreAvailable = moreAvailable;
}
- public BufferOrEvent(AbstractEvent event, int channelIndex) {
+ BufferOrEvent(AbstractEvent event, int channelIndex, boolean moreAvailable) {
this.buffer = null;
this.event = checkNotNull(event);
this.channelIndex = channelIndex;
+ this.moreAvailable = moreAvailable;
+ }
+
+ public BufferOrEvent(Buffer buffer, int channelIndex) {
+ this(buffer, channelIndex, true);
+ }
+
+ public BufferOrEvent(AbstractEvent event, int channelIndex) {
+ this(event, channelIndex, true);
}
public boolean isBuffer() {
@@ -73,6 +90,10 @@ public class BufferOrEvent {
this.channelIndex = channelIndex;
}
+ boolean moreAvailable() {
+ return moreAvailable;
+ }
+
@Override
public String toString() {
return String.format("BufferOrEvent [%s, channelIndex = %d]",
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 35094e2..f46abfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -101,10 +101,19 @@ public abstract class InputChannel {
}
/**
- * Notifies the owning {@link SingleInputGate} about an available {@link Buffer} instance.
+ * Notifies the owning {@link SingleInputGate} that this channel became non-empty.
+ *
+ * <p>This is guaranteed to be called only when a Buffer was added to a previously
+ * empty input channel. The notion of empty is atomically consistent with the flag
+ * {@link BufferAndAvailability#moreAvailable()} when polling the next buffer
+ * from this channel.
+ *
+ * <p><b>Note:</b> When the input channel observes an exception, this
+ * method is called regardless of whether the channel was empty before. That ensures
+ * that the parent InputGate will always be notified about the exception.
*/
- protected void notifyAvailableBuffer() {
- inputGate.onAvailableBuffer(this);
+ protected void notifyChannelNonEmpty() {
+ inputGate.notifyChannelNonEmpty(this);
}
// ------------------------------------------------------------------------
@@ -123,7 +132,7 @@ public abstract class InputChannel {
/**
* Returns the next buffer from the consumed subpartition.
*/
- abstract Buffer getNextBuffer() throws IOException, InterruptedException;
+ abstract BufferAndAvailability getNextBuffer() throws IOException, InterruptedException;
// ------------------------------------------------------------------------
// Task events
@@ -182,7 +191,7 @@ public abstract class InputChannel {
protected void setError(Throwable cause) {
if (this.cause.compareAndSet(null, checkNotNull(cause))) {
// Notify the input gate.
- notifyAvailableBuffer();
+ notifyChannelNonEmpty();
}
}
@@ -225,4 +234,28 @@ public abstract class InputChannel {
// Reached maximum backoff
return false;
}
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A combination of a {@link Buffer} and a flag indicating availability of further buffers.
+ */
+ public static final class BufferAndAvailability {
+
+ private final Buffer buffer;
+ private final boolean moreAvailable;
+
+ public BufferAndAvailability(Buffer buffer, boolean moreAvailable) {
+ this.buffer = checkNotNull(buffer);
+ this.moreAvailable = moreAvailable;
+ }
+
+ public Buffer buffer() {
+ return buffer;
+ }
+
+ public boolean moreAvailable() {
+ return moreAvailable;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 1cd5fc5..1f2182e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
import java.io.IOException;
@@ -77,7 +76,7 @@ public interface InputGate {
void sendTaskEvent(TaskEvent event) throws IOException;
- void registerListener(EventListener<InputGate> listener);
+ void registerListener(InputGateListener listener);
int getPageSize();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
new file mode 100644
index 0000000..00fa782
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+/**
+ * Listener interface implemented by consumers of {@link InputGate} instances
+ * that want to be notified of availability of buffer or event instances.
+ */
+public interface InputGateListener {
+
+ /**
+ * Notification callback if the input gate moves from zero to non-zero
+ * available input channels with data.
+ *
+ * @param inputGate Input Gate that became available.
+ */
+ void notifyInputGateNonEmpty(InputGate inputGate);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 55ff539..d5308a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,24 +18,23 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -43,11 +42,13 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* An input channel, which requests a local subpartition.
*/
-public class LocalInputChannel extends InputChannel implements NotificationListener {
+public class LocalInputChannel extends InputChannel implements BufferAvailabilityListener {
private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
- private final Object requestLock = new Object();
+ // ------------------------------------------------------------------------
+
+ private final Object requestReleaseLock = new Object();
/** The local partition manager. */
private final ResultPartitionManager partitionManager;
@@ -55,39 +56,41 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
/** Task event dispatcher for backwards events. */
private final TaskEventDispatcher taskEventDispatcher;
+ /** Number of available buffers used to keep track of non-empty gate notifications. */
+ private final AtomicLong numBuffersAvailable;
+
/** The consumed subpartition */
private volatile ResultSubpartitionView subpartitionView;
private volatile boolean isReleased;
- private volatile Buffer lookAhead;
-
LocalInputChannel(
- SingleInputGate inputGate,
- int channelIndex,
- ResultPartitionID partitionId,
- ResultPartitionManager partitionManager,
- TaskEventDispatcher taskEventDispatcher,
- TaskIOMetricGroup metrics) {
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ ResultPartitionManager partitionManager,
+ TaskEventDispatcher taskEventDispatcher,
+ TaskIOMetricGroup metrics) {
this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
- 0, 0, metrics);
+ 0, 0, metrics);
}
LocalInputChannel(
- SingleInputGate inputGate,
- int channelIndex,
- ResultPartitionID partitionId,
- ResultPartitionManager partitionManager,
- TaskEventDispatcher taskEventDispatcher,
- int initialBackoff,
- int maxBackoff,
- TaskIOMetricGroup metrics) {
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ ResultPartitionManager partitionManager,
+ TaskEventDispatcher taskEventDispatcher,
+ int initialBackoff,
+ int maxBackoff,
+ TaskIOMetricGroup metrics) {
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
+ this.numBuffersAvailable = new AtomicLong();
}
// ------------------------------------------------------------------------
@@ -97,30 +100,36 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
@Override
void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
// The lock is required to request only once in the presence of retriggered requests.
- synchronized (requestLock) {
+ synchronized (requestReleaseLock) {
+ checkState(!isReleased, "released");
+
if (subpartitionView == null) {
LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
- this, subpartitionIndex, partitionId);
+ this, subpartitionIndex, partitionId);
try {
- subpartitionView = partitionManager.createSubpartitionView(
- partitionId, subpartitionIndex, inputGate.getBufferProvider());
- }
- catch (PartitionNotFoundException notFound) {
+ ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView(
+ partitionId, subpartitionIndex, inputGate.getBufferProvider(), this);
+
+ if (subpartitionView == null) {
+ throw new IOException("Error requesting subpartition.");
+ }
+
+ // make the subpartition view visible
+ this.subpartitionView = subpartitionView;
+
+ // check if the channel was released in the meantime
+ if (isReleased) {
+ subpartitionView.releaseAllResources();
+ this.subpartitionView = null;
+ }
+ } catch (PartitionNotFoundException notFound) {
if (increaseBackoff()) {
inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
- return;
- }
- else {
+ } else {
throw notFound;
}
}
-
- if (subpartitionView == null) {
- throw new IOException("Error requesting subpartition.");
- }
-
- getNextLookAhead();
}
}
}
@@ -128,17 +137,16 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
/**
* Retriggers a subpartition request.
*/
- void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException {
- synchronized (requestLock) {
- checkState(subpartitionView == null, "Already requested partition.");
+ void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) {
+ synchronized (requestReleaseLock) {
+ checkState(subpartitionView == null, "already requested partition");
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
requestSubpartition(subpartitionIndex);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
setError(t);
}
}
@@ -147,29 +155,49 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
}
@Override
- Buffer getNextBuffer() throws IOException, InterruptedException {
+ BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
checkError();
- checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
- // After subscribe notification
- if (lookAhead == null) {
- lookAhead = subpartitionView.getNextBuffer();
+ ResultSubpartitionView subpartitionView = this.subpartitionView;
+ if (subpartitionView == null) {
+ // this can happen if the request for the partition was triggered asynchronously
+ // by the time trigger
+ // would be good to avoid that, by guaranteeing that the requestPartition() and
+ // getNextBuffer() always come from the same thread
+ // we could do that by letting the timer insert a special "requesting channel" into the input gate's queue
+ subpartitionView = checkAndWaitForSubpartitionView();
}
- Buffer next = lookAhead;
- lookAhead = null;
+ Buffer next = subpartitionView.getNextBuffer();
+ long remaining = numBuffersAvailable.decrementAndGet();
- if (!next.isBuffer() && EventSerializer
- .fromBuffer(next, getClass().getClassLoader())
- .getClass() == EndOfPartitionEvent.class) {
-
- return next;
+ if (remaining >= 0) {
+ numBytesIn.inc(next.getSize());
+ return new BufferAndAvailability(next, remaining > 0);
+ } else if (subpartitionView.isReleased()) {
+ throw new ProducerFailedException(subpartitionView.getFailureCause());
+ } else {
+ throw new IllegalStateException("No buffer available and producer partition not released.");
}
+ }
- getNextLookAhead();
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+ // if this request made the channel non-empty, notify the input gate
+ if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
+ notifyChannelNonEmpty();
+ }
+ }
- numBytesIn.inc(next.getSize());
- return next;
+ private ResultSubpartitionView checkAndWaitForSubpartitionView() {
+ // synchronizing on the request lock means this blocks until the asynchronous request
+ // for the partition view has been completed
+ // by then the subpartition view is visible or the channel is released
+ synchronized (requestReleaseLock) {
+ checkState(!isReleased, "released");
+ checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
+ return subpartitionView;
+ }
}
// ------------------------------------------------------------------------
@@ -208,18 +236,15 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
*/
@Override
void releaseAllResources() throws IOException {
- if (!isReleased) {
- if (lookAhead != null) {
- lookAhead.recycle();
- lookAhead = null;
- }
+ synchronized (requestReleaseLock) {
+ if (!isReleased) {
+ isReleased = true;
- if (subpartitionView != null) {
- subpartitionView.releaseAllResources();
- subpartitionView = null;
+ if (subpartitionView != null) {
+ subpartitionView.releaseAllResources();
+ subpartitionView = null;
+ }
}
-
- isReleased = true;
}
}
@@ -227,55 +252,4 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
public String toString() {
return "LocalInputChannel [" + partitionId + "]";
}
-
- // ------------------------------------------------------------------------
- // Queue iterator listener (called by producing or disk I/O thread)
- // ------------------------------------------------------------------------
-
- @Override
- public void onNotification() {
- if (isReleased) {
- return;
- }
-
- try {
- getNextLookAhead();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- // ------------------------------------------------------------------------
-
- private void getNextLookAhead() throws IOException, InterruptedException {
-
- final ResultSubpartitionView view = subpartitionView;
-
- if (view == null) {
- return;
- }
-
- while (true) {
- lookAhead = view.getNextBuffer();
-
- if (lookAhead != null) {
- notifyAvailableBuffer();
- break;
- }
-
- if (view.registerListener(this)) {
- return;
- }
- else if (view.isReleased()) {
- Throwable cause = view.getFailureCause();
-
- if (cause != null) {
- setError(new ProducerFailedException(cause));
- }
-
- return;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 13a71a9..ed3122e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -27,8 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -43,8 +41,6 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public class RemoteInputChannel extends InputChannel {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
-
/** ID to distinguish this channel from other channels sharing the same TCP connection. */
private final InputChannelID id = new InputChannelID();
@@ -58,7 +54,7 @@ public class RemoteInputChannel extends InputChannel {
* The received buffers. Received buffers are enqueued by the network I/O thread and the queue
* is consumed by the receiving task thread.
*/
- private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
+ private final Queue<Buffer> receivedBuffers = new ArrayDeque<>();
/**
* Flag indicating whether this channel has been released. Either called by the receiving task
@@ -76,28 +72,27 @@ public class RemoteInputChannel extends InputChannel {
private int expectedSequenceNumber = 0;
public RemoteInputChannel(
- SingleInputGate inputGate,
- int channelIndex,
- ResultPartitionID partitionId,
- ConnectionID connectionId,
- ConnectionManager connectionManager,
- TaskIOMetricGroup metrics) {
-
- this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
- 0, 0, metrics);
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ ConnectionID connectionId,
+ ConnectionManager connectionManager,
+ TaskIOMetricGroup metrics) {
+
+ this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics);
}
public RemoteInputChannel(
- SingleInputGate inputGate,
- int channelIndex,
- ResultPartitionID partitionId,
- ConnectionID connectionId,
- ConnectionManager connectionManager,
- int initialBackoff,
- int maxBackoff,
- TaskIOMetricGroup metrics) {
+ SingleInputGate inputGate,
+ int channelIndex,
+ ResultPartitionID partitionId,
+ ConnectionID connectionId,
+ ConnectionManager connectionManager,
+ int initialBackOff,
+ int maxBackoff,
+ TaskIOMetricGroup metrics) {
- super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter());
+ super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);
@@ -115,7 +110,7 @@ public class RemoteInputChannel extends InputChannel {
if (partitionRequestClient == null) {
// Create a client and request the partition
partitionRequestClient = connectionManager
- .createPartitionRequestClient(connectionId);
+ .createPartitionRequestClient(connectionId);
partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
}
@@ -129,31 +124,29 @@ public class RemoteInputChannel extends InputChannel {
if (increaseBackoff()) {
partitionRequestClient.requestSubpartition(
- partitionId, subpartitionIndex, this, getCurrentBackoff());
- }
- else {
+ partitionId, subpartitionIndex, this, getCurrentBackoff());
+ } else {
failPartitionRequest();
}
}
@Override
- Buffer getNextBuffer() throws IOException {
+ BufferAndAvailability getNextBuffer() throws IOException {
checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
checkError();
- synchronized (receivedBuffers) {
- Buffer buffer = receivedBuffers.poll();
-
- // Sanity check that channel is only queried after a notification
- if (buffer == null) {
- throw new IOException("Queried input channel for data although non is available.");
- }
+ final Buffer next;
+ final int remaining;
- numBytesIn.inc(buffer.getSize());
- return buffer;
+ synchronized (receivedBuffers) {
+ next = receivedBuffers.poll();
+ remaining = receivedBuffers.size();
}
+
+ numBytesIn.inc(next.getSize());
+ return new BufferAndAvailability(next, remaining > 0);
}
// ------------------------------------------------------------------------
@@ -201,14 +194,13 @@ public class RemoteInputChannel extends InputChannel {
// buffers received concurrently with closing are properly recycled.
if (partitionRequestClient != null) {
partitionRequestClient.close(this);
- }
- else {
+ } else {
connectionManager.closeOpenChannelConnections(connectionId);
}
}
}
- public void failPartitionRequest() {
+ private void failPartitionRequest() {
setError(new PartitionNotFoundException(partitionId));
}
@@ -246,20 +238,22 @@ public class RemoteInputChannel extends InputChannel {
synchronized (receivedBuffers) {
if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
+ int available = receivedBuffers.size();
+
receivedBuffers.add(buffer);
expectedSequenceNumber++;
- notifyAvailableBuffer();
+ if (available == 0) {
+ notifyChannelNonEmpty();
+ }
success = true;
- }
- else {
+ } else {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
}
- }
- finally {
+ } finally {
if (!success) {
buffer.recycle();
}
@@ -271,8 +265,7 @@ public class RemoteInputChannel extends InputChannel {
if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
expectedSequenceNumber++;
- }
- else {
+ } else {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
}
}
@@ -287,7 +280,7 @@ public class RemoteInputChannel extends InputChannel {
setError(cause);
}
- public static class BufferReorderingException extends IOException {
+ private static class BufferReorderingException extends IOException {
private static final long serialVersionUID = -888282210356266816L;
@@ -295,7 +288,7 @@ public class RemoteInputChannel extends InputChannel {
private final int actualSequenceNumber;
- public BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
+ BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
this.expectedSequenceNumber = expectedSequenceNumber;
this.actualSequenceNumber = actualSequenceNumber;
}
@@ -303,7 +296,7 @@ public class RemoteInputChannel extends InputChannel {
@Override
public String getMessage() {
return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.",
- expectedSequenceNumber, actualSequenceNumber);
+ expectedSequenceNumber, actualSequenceNumber);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d7ed33c..bcbb2c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -36,22 +34,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.taskmanager.TaskActions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -136,7 +134,7 @@ public class SingleInputGate implements InputGate {
private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
/** Channels, which notified this input gate about available data. */
- private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
+ private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>();
private final BitSet channelsWithEndOfPartitionEvents;
@@ -158,9 +156,9 @@ public class SingleInputGate implements InputGate {
private volatile boolean isReleased;
/** Registered listener to forward buffer notifications to. */
- private volatile EventListener<InputGate> registeredListener;
+ private volatile InputGateListener inputGateListener;
- private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
+ private final List<TaskEvent> pendingEvents = new ArrayList<>();
private int numberOfUninitializedChannels;
@@ -168,14 +166,14 @@ public class SingleInputGate implements InputGate {
private Timer retriggerLocalRequestTimer;
public SingleInputGate(
- String owningTaskName,
- JobID jobId,
- ExecutionAttemptID executionId,
- IntermediateDataSetID consumedResultId,
- int consumedSubpartitionIndex,
- int numberOfInputChannels,
- TaskActions taskActions,
- TaskIOMetricGroup metrics) {
+ String owningTaskName,
+ JobID jobId,
+ ExecutionAttemptID executionId,
+ IntermediateDataSetID consumedResultId,
+ int consumedSubpartitionIndex,
+ int numberOfInputChannels,
+ TaskActions taskActions,
+ TaskIOMetricGroup metrics) {
this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
@@ -263,7 +261,7 @@ public class SingleInputGate implements InputGate {
this.bufferPool = checkNotNull(bufferPool);
}
- public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
+ void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
synchronized (requestLock) {
if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null
&& inputChannel.getClass() == UnknownInputChannel.class) {
@@ -355,6 +353,7 @@ public class SingleInputGate implements InputGate {
}
public void releaseAllResources() throws IOException {
+ boolean released = false;
synchronized (requestLock) {
if (!isReleased) {
try {
@@ -381,9 +380,16 @@ public class SingleInputGate implements InputGate {
}
finally {
isReleased = true;
+ released = true;
}
}
}
+
+ if (released) {
+ synchronized (inputChannelsWithData) {
+ inputChannelsWithData.notifyAll();
+ }
+ }
}
@Override
@@ -429,32 +435,50 @@ public class SingleInputGate implements InputGate {
@Override
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-
if (hasReceivedAllEndOfPartitionEvents) {
return null;
}
+ if (isReleased) {
+ throw new IllegalStateException("Released");
+ }
+
requestPartitions();
- InputChannel currentChannel = null;
- while (currentChannel == null) {
- if (isReleased) {
- throw new IllegalStateException("Released");
+ InputChannel currentChannel;
+ boolean moreAvailable;
+
+ synchronized (inputChannelsWithData) {
+ while (inputChannelsWithData.size() == 0) {
+ if (isReleased) {
+ throw new IllegalStateException("Released");
+ }
+
+ inputChannelsWithData.wait();
}
- currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
+ currentChannel = inputChannelsWithData.remove();
+ moreAvailable = inputChannelsWithData.size() > 0;
}
- final Buffer buffer = currentChannel.getNextBuffer();
+ final BufferAndAvailability result = currentChannel.getNextBuffer();
// Sanity check that notifications only happen when data is available
- if (buffer == null) {
+ if (result == null) {
throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
"notified by channel about available data, but none was available.");
}
+ // this channel was now removed from the non-empty channels queue
+ // we re-add it in case it has more data, because in that case no "non-empty" notification
+ // will come for that channel
+ if (result.moreAvailable()) {
+ queueChannel(currentChannel);
+ }
+
+ final Buffer buffer = result.buffer();
if (buffer.isBuffer()) {
- return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
+ return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -471,7 +495,7 @@ public class SingleInputGate implements InputGate {
currentChannel.releaseAllResources();
}
- return new BufferOrEvent(event, currentChannel.getChannelIndex());
+ return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
}
}
@@ -493,29 +517,45 @@ public class SingleInputGate implements InputGate {
// ------------------------------------------------------------------------
@Override
- public void registerListener(EventListener<InputGate> listener) {
- if (registeredListener == null) {
- registeredListener = listener;
- }
- else {
+ public void registerListener(InputGateListener inputGateListener) {
+ if (this.inputGateListener == null) {
+ this.inputGateListener = inputGateListener;
+ } else {
throw new IllegalStateException("Multiple listeners");
}
}
- public void onAvailableBuffer(InputChannel channel) {
- inputChannelsWithData.add(channel);
- EventListener<InputGate> listener = registeredListener;
- if (listener != null) {
- listener.onEvent(this);
- }
+ void notifyChannelNonEmpty(InputChannel channel) {
+ queueChannel(checkNotNull(channel));
}
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
taskActions.triggerPartitionStateCheck(
- jobId,
- executionId,
- consumedResultId,
- partitionId);
+ jobId,
+ executionId,
+ consumedResultId,
+ partitionId);
+ }
+
+ private void queueChannel(InputChannel channel) {
+ int availableChannels;
+
+ synchronized (inputChannelsWithData) {
+ availableChannels = inputChannelsWithData.size();
+
+ inputChannelsWithData.add(channel);
+
+ if (availableChannels == 0) {
+ inputChannelsWithData.notify();
+ }
+ }
+
+ if (availableChannels == 0) {
+ InputGateListener listener = inputGateListener;
+ if (listener != null) {
+ listener.notifyInputGateNonEmpty(this);
+ }
+ }
}
// ------------------------------------------------------------------------
@@ -531,13 +571,13 @@ public class SingleInputGate implements InputGate {
* Creates an input gate and all of its input channels.
*/
public static SingleInputGate create(
- String owningTaskName,
- JobID jobId,
- ExecutionAttemptID executionId,
- InputGateDeploymentDescriptor igdd,
- NetworkEnvironment networkEnvironment,
- TaskActions taskActions,
- TaskIOMetricGroup metrics) {
+ String owningTaskName,
+ JobID jobId,
+ ExecutionAttemptID executionId,
+ InputGateDeploymentDescriptor igdd,
+ NetworkEnvironment networkEnvironment,
+ TaskActions taskActions,
+ TaskIOMetricGroup metrics) {
final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
@@ -547,8 +587,8 @@ public class SingleInputGate implements InputGate {
final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
final SingleInputGate inputGate = new SingleInputGate(
- owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
- icdd.length, taskActions, metrics);
+ owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+ icdd.length, taskActions, metrics);
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index b1b8911..e8ccbb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -22,15 +22,11 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.util.event.EventListener;
import java.io.IOException;
-import java.util.List;
+import java.util.ArrayDeque;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,19 +59,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* It is possible to recursively union union input gates.
*/
-public class UnionInputGate implements InputGate {
+public class UnionInputGate implements InputGate, InputGateListener {
/** The input gates to union. */
private final InputGate[] inputGates;
private final Set<InputGate> inputGatesWithRemainingData;
- /** Data availability listener across all unioned input gates. */
- private final InputGateListener inputGateListener;
+ /** Gates, which notified this input gate about available data. */
+ private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();
/** The total number of input channels across all unioned input gates. */
private final int totalNumberOfInputChannels;
+ /** Registered listener to forward input gate notifications to. */
+ private volatile InputGateListener inputGateListener;
+
/**
* A mapping from input gate to (logical) channel index offset. Valid channel indexes go from 0
* (inclusive) to the total number of input channels (exclusive).
@@ -100,11 +99,12 @@ public class UnionInputGate implements InputGate {
inputGatesWithRemainingData.add(inputGate);
currentNumberOfInputChannels += inputGate.getNumberOfInputChannels();
+
+ // Register the union gate as a listener for all input gates
+ inputGate.registerListener(this);
}
this.totalNumberOfInputChannels = currentNumberOfInputChannels;
-
- this.inputGateListener = new InputGateListener(inputGates, this);
}
/**
@@ -139,7 +139,6 @@ public class UnionInputGate implements InputGate {
@Override
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-
if (inputGatesWithRemainingData.isEmpty()) {
return null;
}
@@ -147,17 +146,31 @@ public class UnionInputGate implements InputGate {
// Make sure to request the partitions, if they have not been requested before.
requestPartitions();
- final InputGate inputGate = inputGateListener.getNextInputGateToReadFrom();
+ final InputGate inputGate;
+ synchronized (inputGatesWithData) {
+ while (inputGatesWithData.size() == 0) {
+ inputGatesWithData.wait();
+ }
+
+ inputGate = inputGatesWithData.remove();
+ }
final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
+ if (bufferOrEvent.moreAvailable()) {
+ // this buffer or event was now removed from the non-empty gates queue
+ // we re-add it in case it has more data, because in that case no "non-empty" notification
+ // will come for that gate
+ queueInputGate(inputGate);
+ }
+
if (bufferOrEvent.isEvent()
- && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
- && inputGate.isFinished()) {
+ && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
+ && inputGate.isFinished()) {
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find input gate in set of remaining " +
- "input gates.");
+ "input gates.");
}
}
@@ -177,9 +190,12 @@ public class UnionInputGate implements InputGate {
}
@Override
- public void registerListener(EventListener<InputGate> listener) {
- // This method is called from the consuming task thread.
- inputGateListener.registerListener(listener);
+ public void registerListener(InputGateListener listener) {
+ if (this.inputGateListener == null) {
+ this.inputGateListener = listener;
+ } else {
+ throw new IllegalStateException("Multiple listeners");
+ }
}
@Override
@@ -195,45 +211,29 @@ public class UnionInputGate implements InputGate {
return pageSize;
}
- /**
- * Data availability listener at all unioned input gates.
- *
- * <p> The listener registers itself at each input gate and is notified for *each incoming
- * buffer* at one of the unioned input gates.
- */
- private static class InputGateListener implements EventListener<InputGate> {
-
- private final UnionInputGate unionInputGate;
-
- private final BlockingQueue<InputGate> inputGatesWithData = new LinkedBlockingQueue<InputGate>();
+ @Override
+ public void notifyInputGateNonEmpty(InputGate inputGate) {
+ queueInputGate(checkNotNull(inputGate));
+ }
- private final List<EventListener<InputGate>> registeredListeners = new CopyOnWriteArrayList<EventListener<InputGate>>();
+ private void queueInputGate(InputGate inputGate) {
+ int availableInputGates;
- public InputGateListener(InputGate[] inputGates, UnionInputGate unionInputGate) {
- for (InputGate inputGate : inputGates) {
- inputGate.registerListener(this);
- }
+ synchronized (inputGatesWithData) {
+ availableInputGates = inputGatesWithData.size();
- this.unionInputGate = unionInputGate;
- }
-
- @Override
- public void onEvent(InputGate inputGate) {
- // This method is called from the input channel thread, which can be either the same
- // thread as the consuming task thread or a different one.
inputGatesWithData.add(inputGate);
- for (int i = 0; i < registeredListeners.size(); i++) {
- registeredListeners.get(i).onEvent(unionInputGate);
+ if (availableInputGates == 0) {
+ inputGatesWithData.notify();
}
}
- InputGate getNextInputGateToReadFrom() throws InterruptedException {
- return inputGatesWithData.take();
- }
-
- public void registerListener(EventListener<InputGate> listener) {
- registeredListeners.add(checkNotNull(listener));
+ if (availableInputGates == 0) {
+ InputGateListener listener = inputGateListener;
+ if (listener != null) {
+ listener.notifyInputGateNonEmpty(this);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 08b5044..d887ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -23,8 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -36,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* An input channel place holder to be replaced by either a {@link RemoteInputChannel}
* or {@link LocalInputChannel} at runtime.
*/
-public class UnknownInputChannel extends InputChannel {
+class UnknownInputChannel extends InputChannel {
private final ResultPartitionManager partitionManager;
@@ -78,9 +76,9 @@ public class UnknownInputChannel extends InputChannel {
}
@Override
- public Buffer getNextBuffer() throws IOException {
+ public BufferAndAvailability getNextBuffer() throws IOException {
// Nothing to do here
- return null;
+ throw new UnsupportedOperationException("Cannot retrieve a buffer from an UnknownInputChannel");
}
@Override
@@ -93,8 +91,7 @@ public class UnknownInputChannel extends InputChannel {
* <p>
* <strong>Important</strong>: It is important that the method correctly
* always <code>false</code> for unknown input channels in order to not
- * finish the consumption of an intermediate result partition early in
- * {@link BufferReader}.
+ * finish the consumption of an intermediate result partition early.
*/
@Override
public boolean isReleased() {
http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bd8c196..14ef1bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -347,7 +347,6 @@ public class Task implements Runnable, TaskActions {
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
ioManager,
- networkEnvironment.getDefaultIOMode(),
desc.sendScheduleOrUpdateConsumersMessage());
writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);