You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:49 UTC
[7/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve
stream alignment via the BarrierBuffer
[FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer
- Add an interface for the functionaliy of the barrier buffer (for later addition of other implementatiions)
- Add broader tests for the BarrierBuffer, inluding trailing data and barrier races.
- Checkpoint barriers are handled by the buffer directly, rather than being returned and re-injected.
- Simplify logic in the BarrierBuffer and fix certain corner cases.
- Give access to spill directories properly via I/O manager, rather than via GlobalConfiguration singleton.
- Rename the "BarrierBufferIOTest" to "BarrierBufferMassiveRandomTest"
- A lot of code style/robustness fixes (proplery define constants, visibility, exception signatures)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0579f90b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0579f90b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0579f90b
Branch: refs/heads/master
Commit: 0579f90bab165a7df336163eb9d6337267020029
Parents: ed30ff4
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 26 18:58:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:07 2015 +0200
----------------------------------------------------------------------
.../runtime/io/disk/iomanager/IOManager.java | 12 +-
.../io/network/api/EndOfPartitionEvent.java | 23 +-
.../partition/consumer/SingleInputGate.java | 2 +-
.../streaming/runtime/io/BarrierBuffer.java | 413 +++++-----
.../streaming/runtime/io/BufferSpiller.java | 54 +-
.../runtime/io/CheckpointBarrierHandler.java | 55 ++
.../runtime/io/StreamInputProcessor.java | 69 +-
.../runtime/io/StreamTwoInputProcessor.java | 56 +-
.../runtime/tasks/OneInputStreamTask.java | 37 +-
.../streaming/runtime/tasks/StreamTask.java | 16 +-
.../runtime/tasks/TwoInputStreamTask.java | 74 +-
.../consumer/StreamTestSingleInputGate.java | 13 -
.../runtime/io/BarrierBufferIOTest.java | 159 ----
.../io/BarrierBufferMassiveRandomTest.java | 167 ++++
.../streaming/runtime/io/BarrierBufferTest.java | 775 +++++++++++++++----
.../runtime/io/DummyBufferRecycler.java | 8 +-
.../runtime/io/SpillingBufferOrEventTest.java | 20 +-
17 files changed, 1282 insertions(+), 671 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index c0bd360..45d9b9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -301,9 +301,19 @@ public abstract class IOManager {
*
* @return The number of temporary file directories.
*/
- public int getNumberOfTempDirs() {
+ public int getNumberOfSpillingDirectories() {
return this.paths.length;
}
+
+ /**
+ * Gets the directories that the I/O manager spills to.
+ *
+ * @return The directories that the I/O manager spills to.
+ */
+ public File[] getSpillingDirectories() {
+ return this.paths;
+ }
+
protected int getNextPathNum() {
final int next = this.nextPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index 49d7958..3ecdb94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -22,19 +22,34 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.RuntimeEvent;
-import java.io.IOException;
public class EndOfPartitionEvent extends RuntimeEvent {
public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
-
+
+
@Override
- public void read(DataInputView in) throws IOException {
+ public void read(DataInputView in) {
// Nothing to do here
}
@Override
- public void write(DataOutputView out) throws IOException {
+ public void write(DataOutputView out) {
// Nothing to do here
}
+
+ @Override
+ public int hashCode() {
+ return 1965146673;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == EndOfPartitionEvent.class;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 78aa6f7..0aebcae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -408,7 +408,7 @@ public class SingleInputGate implements InputGate {
// Sanity check that notifications only happen when data is available
if (buffer == null) {
- throw new IllegalStateException("Bug in input gate/channel logic: input gate got" +
+ throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
"notified by channel about available data, but none was available.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 40e84fc..466b8f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,263 +19,262 @@ package org.apache.flink.streaming.runtime.io;
import java.io.File;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+import java.util.ArrayDeque;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The barrier buffer is responsible for implementing the blocking behaviour described
- * here: {@link CheckpointBarrier}.
- *
- * <p>
- * To avoid back-pressuring the
- * readers, we buffer up the new data received from the blocked channels until
- * the blocks are released.
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
+ * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
+ * the blocks are released.</p>
*/
-public class BarrierBuffer {
+public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+ /** The gate that the buffer draws its input from */
+ private final InputGate inputGate;
+
+ /** Flags that indicate whether a channel is currently blocked/buffered */
+ private final boolean[] blockedChannels;
+
+ /** The total number of channels that this buffer handles data from */
+ private final int totalNumberOfInputChannels;
+
+ private final SpillReader spillReader;
+ private final BufferSpiller bufferSpiller;
+
+ private ArrayDeque<SpillingBufferOrEvent> nonProcessed;
+ private ArrayDeque<SpillingBufferOrEvent> blockedNonProcessed;
+
+ /** Handler that receives the checkpoint notifications */
+ private EventListener<CheckpointBarrier> checkpointHandler;
+
+ /** The ID of the checkpoint for which we expect barriers */
+ private long currentCheckpointId = -1L;
+
+ /** The number of received barriers (= number of blocked/buffered channels) */
+ private long numReceivedBarriers;
+
+ /** Flag to indicate whether we have drawn all available input */
+ private boolean endOfStream;
+
+
+ public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+ this.inputGate = inputGate;
+ this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+ this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
+
+ this.nonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+ this.blockedNonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+
+ this.bufferSpiller = new BufferSpiller(ioManager);
+ this.spillReader = new SpillReader();
+ }
- private Queue<SpillingBufferOrEvent> nonProcessed = new LinkedList<SpillingBufferOrEvent>();
- private Queue<SpillingBufferOrEvent> blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
- private Set<Integer> blockedChannels = new HashSet<Integer>();
- private int totalNumberOfInputChannels;
-
- private CheckpointBarrier currentBarrier;
-
- private AbstractReader reader;
-
- private InputGate inputGate;
-
- private SpillReader spillReader;
- private BufferSpiller bufferSpiller;
-
- private boolean inputFinished = false;
+ // ------------------------------------------------------------------------
+ // Buffer and barrier handling
+ // ------------------------------------------------------------------------
- private BufferOrEvent endOfStreamEvent = null;
+ @Override
+ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ while (true) {
+ // process buffered BufferOrEvents before grabbing new ones
+ final SpillingBufferOrEvent nextBuffered = nonProcessed.pollFirst();
+ final BufferOrEvent next = nextBuffered == null ?
+ inputGate.getNextBufferOrEvent() :
+ nextBuffered.getBufferOrEvent();
+
+ if (next != null) {
+ if (isBlocked(next.getChannelIndex())) {
+ // if the channel is blocked we, we just store the BufferOrEvent
+ blockedNonProcessed.add(new SpillingBufferOrEvent(next, bufferSpiller, spillReader));
+ }
+ else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+ return next;
+ }
+ else if (!endOfStream) {
+ // process barriers only if there is a chance of the checkpoint completing
+ processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
+ }
+ }
+ else if (!endOfStream) {
+ // end of stream. we feed the data that is still buffered
+ endOfStream = true;
+ releaseBlocks();
+ return getNextNonBlocked();
+ }
+ else {
+ return null;
+ }
+ }
+ }
+
+ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+ final long barrierId = receivedBarrier.getId();
+
+ if (numReceivedBarriers > 0) {
+ // subsequent barrier of a checkpoint.
+ if (barrierId == currentCheckpointId) {
+ // regular case
+ onBarrier(channelIndex);
+ }
+ else if (barrierId > currentCheckpointId) {
+ // we did not complete the current checkpoint
+ LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+ releaseBlocks();
+ currentCheckpointId = barrierId;
+ onBarrier(channelIndex);
+ }
+ else {
+ // ignore trailing barrier from aborted checkpoint
+ return;
+ }
+
+ }
+ else if (barrierId > currentCheckpointId) {
+ // first barrier of a new checkpoint
+ currentCheckpointId = barrierId;
+ onBarrier(channelIndex);
+ }
+ else {
+ // trailing barrier from previous (skipped) checkpoint
+ return;
+ }
- private long lastCheckpointId = Long.MIN_VALUE;
+ // check if we have all barriers
+ if (numReceivedBarriers == totalNumberOfInputChannels) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+ receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ }
- public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
- this.inputGate = inputGate;
- totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.reader = reader;
- try {
- this.bufferSpiller = new BufferSpiller();
- this.spillReader = new SpillReader();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (checkpointHandler != null) {
+ checkpointHandler.onEvent(receivedBarrier);
+ }
+
+ releaseBlocks();
}
-
+ }
+
+ @Override
+ public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
+ if (this.checkpointHandler == null) {
+ this.checkpointHandler = checkpointHandler;
+ }
+ else {
+ throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
}
- /**
- * Get then next non-blocked non-processed {@link BufferOrEvent}. Returns null if
- * none available.
- *
- * @throws IOException
- */
- private BufferOrEvent getNonProcessed() throws IOException {
- SpillingBufferOrEvent nextNonProcessed;
-
- while ((nextNonProcessed = nonProcessed.poll()) != null) {
- BufferOrEvent boe = nextNonProcessed.getBufferOrEvent();
- if (isBlocked(boe.getChannelIndex())) {
- blockedNonProcessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
- } else {
- return boe;
+ @Override
+ public void cleanup() throws IOException {
+ bufferSpiller.close();
+ File spillfile1 = bufferSpiller.getSpillFile();
+ if (spillfile1 != null) {
+ if (!spillfile1.delete()) {
+ LOG.warn("Cannot remove barrier buffer spill file: " + spillfile1.getAbsolutePath());
}
}
- return null;
+ spillReader.close();
+ File spillfile2 = spillReader.getSpillFile();
+ if (spillfile2 != null) {
+ if (!spillfile2.delete()) {
+ LOG.warn("Cannot remove barrier buffer spill file: " + spillfile2.getAbsolutePath());
+ }
+ }
}
-
+
/**
* Checks whether the channel with the given index is blocked.
*
- * @param channelIndex The channel index to check
+ * @param channelIndex The channel index to check.
+ * @return True if the channel is blocked, false if not.
*/
private boolean isBlocked(int channelIndex) {
- return blockedChannels.contains(channelIndex);
+ return blockedChannels[channelIndex];
}
-
- /**
- * Checks whether all channels are blocked meaning that barriers have been
- * received from all channels
- */
- private boolean isAllBlocked() {
- return blockedChannels.size() == totalNumberOfInputChannels;
- }
-
- /**
- * Returns the next non-blocked {@link BufferOrEvent}. This is a blocking operator.
- */
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
- // If there are non-processed buffers from the previously blocked ones,
- // we get the next
- BufferOrEvent bufferOrEvent = getNonProcessed();
-
- if (bufferOrEvent != null) {
- return bufferOrEvent;
- } else if (blockedNonProcessed.isEmpty() && inputFinished) {
- return endOfStreamEvent;
- } else {
- // If no non-processed, get new from input
- while (true) {
- if (!inputFinished) {
- // We read the next buffer from the inputgate
- bufferOrEvent = inputGate.getNextBufferOrEvent();
-
- if (!bufferOrEvent.isBuffer()
- && bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
- if (inputGate.isFinished()) {
- // store the event for later if the channel is
- // closed
- endOfStreamEvent = bufferOrEvent;
- inputFinished = true;
- }
-
- } else {
- if (isBlocked(bufferOrEvent.getChannelIndex())) {
- // If channel blocked we just store it
- blockedNonProcessed.add(new SpillingBufferOrEvent(bufferOrEvent,
- bufferSpiller, spillReader));
- } else {
- return bufferOrEvent;
- }
- }
- } else {
- actOnAllBlocked();
- return getNextNonBlocked();
- }
- }
- }
- }
-
+
/**
* Blocks the given channel index, from which a barrier has been received.
*
- * @param channelIndex
- * The channel index to block.
+ * @param channelIndex The channel index to block.
*/
- private void blockChannel(int channelIndex) {
- if (!blockedChannels.contains(channelIndex)) {
- blockedChannels.add(channelIndex);
+ private void onBarrier(int channelIndex) throws IOException {
+ if (!blockedChannels[channelIndex]) {
+ blockedChannels[channelIndex] = true;
+ numReceivedBarriers++;
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Channel blocked with index: " + channelIndex);
- }
- if (isAllBlocked()) {
- actOnAllBlocked();
+ LOG.debug("Received barrier from channel " + channelIndex);
}
-
- } else {
- throw new RuntimeException("Tried to block an already blocked channel");
+ }
+ else {
+ throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
}
}
/**
* Releases the blocks on all channels.
*/
- private void releaseBlocks() {
- if (!nonProcessed.isEmpty()) {
- // sanity check
- throw new RuntimeException("Error in barrier buffer logic");
- }
- nonProcessed = blockedNonProcessed;
- blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
- try {
- spillReader.setSpillFile(bufferSpiller.getSpillFile());
- bufferSpiller.resetSpillFile();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- blockedChannels.clear();
- currentBarrier = null;
+ private void releaseBlocks() throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("All barriers received, blocks released");
+ LOG.debug("Releasing blocks");
}
- }
- /**
- * Method that is executed once the barrier has been received from all
- * channels.
- */
- private void actOnAllBlocked() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Publishing barrier to the vertex");
+ for (int i = 0; i < blockedChannels.length; i++) {
+ blockedChannels[i] = false;
}
-
- if (currentBarrier != null && !inputFinished) {
- reader.publish(currentBarrier);
- lastCheckpointId = currentBarrier.getId();
+ numReceivedBarriers = 0;
+
+ if (nonProcessed.isEmpty()) {
+ // swap the queues
+ ArrayDeque<SpillingBufferOrEvent> empty = nonProcessed;
+ nonProcessed = blockedNonProcessed;
+ blockedNonProcessed = empty;
}
-
- releaseBlocks();
- }
-
- /**
- * Processes one {@link org.apache.flink.streaming.runtime.tasks.CheckpointBarrier}
- *
- * @param bufferOrEvent The {@link BufferOrEvent} containing the checkpoint barrier
- */
- public void processBarrier(BufferOrEvent bufferOrEvent) {
- CheckpointBarrier receivedBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
-
- if (receivedBarrier.getId() < lastCheckpointId) {
- // a barrier from an old checkpoint, ignore these
- return;
- }
-
- if (currentBarrier == null) {
- this.currentBarrier = receivedBarrier;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint barrier received start waiting for checkpoint: {}", receivedBarrier);
- }
- } else if (receivedBarrier.getId() > currentBarrier.getId()) {
- // we have a barrier from a more recent checkpoint, free all locks and start with
- // this newer checkpoint
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint barrier received while waiting on checkpoint {}. Restarting waiting with checkpoint {}: ", currentBarrier, receivedBarrier);
- }
- releaseBlocks();
- currentBarrier = receivedBarrier;
-
+ else {
+ throw new IllegalStateException("Unconsumed data from previous checkpoint alignment " +
+ "when starting next checkpoint alignment");
}
- blockChannel(bufferOrEvent.getChannelIndex());
+
+ // roll over the spill files
+ spillReader.setSpillFile(bufferSpiller.getSpillFile());
+ bufferSpiller.resetSpillFile();
}
- public void cleanup() throws IOException {
- bufferSpiller.close();
- File spillfile1 = bufferSpiller.getSpillFile();
- if (spillfile1 != null) {
- spillfile1.delete();
- }
+ // ------------------------------------------------------------------------
+ // For Testing
+ // ------------------------------------------------------------------------
- spillReader.close();
- File spillfile2 = spillReader.getSpillFile();
- if (spillfile2 != null) {
- spillfile2.delete();
- }
+ public long getCurrentCheckpointId() {
+ return this.currentCheckpointId;
}
-
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
public String toString() {
- return nonProcessed.toString() + blockedNonProcessed.toString();
- }
-
- public boolean isEmpty() {
- return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
+ return "Non-Processed: " + nonProcessed + " | Blocked: " + blockedNonProcessed;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 0d57d05..fda612e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -22,28 +22,33 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.StringUtils;
public class BufferSpiller {
+
+ /** The random number generator for temp file names */
+ private static final Random RND = new Random();
- protected static Random rnd = new Random();
+ /** The counter that selects the next directory to spill into */
+ private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
+
+
+ /** The directories to spill to */
+ private final File tempDir;
private File spillFile;
- protected FileChannel spillingChannel;
- private String tempDir;
-
- public BufferSpiller() throws IOException {
- String tempDirString = GlobalConfiguration.getString(
- ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
- String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
-
- tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
-
+
+ private FileChannel spillingChannel;
+
+
+
+ public BufferSpiller(IOManager ioManager) throws IOException {
+ File[] tempDirs = ioManager.getSpillingDirectories();
+ this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
createSpillingChannel();
}
@@ -54,24 +59,20 @@ public class BufferSpiller {
try {
spillingChannel.write(buffer.getNioBuffer());
buffer.recycle();
- } catch (IOException e) {
+ }
+ catch (IOException e) {
close();
- throw new IOException(e);
+ throw e;
}
-
}
@SuppressWarnings("resource")
private void createSpillingChannel() throws IOException {
- this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+ this.spillFile = new File(tempDir, randomString(RND) + ".buffer");
this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
}
- private static String randomString(Random random) {
- final byte[] bytes = new byte[20];
- random.nextBytes(bytes);
- return StringUtils.byteToHexString(bytes);
- }
+
public void close() throws IOException {
if (spillingChannel != null && spillingChannel.isOpen()) {
@@ -87,5 +88,12 @@ public class BufferSpiller {
public File getSpillFile() {
return spillFile;
}
+
+ // ------------------------------------------------------------------------
+ private static String randomString(Random random) {
+ final byte[] bytes = new byte[20];
+ random.nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
new file mode 100644
index 0000000..02dd33d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
+ * Different implementations may either simply track barriers, or block certain inputs on
+ * barriers.
+ */
+public interface CheckpointBarrierHandler {
+
+ /**
+ * Returns the next {@link BufferOrEvent} that the operator may consume.
+ * This call blocks until the next BufferOrEvent is available, ir until the stream
+ * has been determined to be finished.
+ *
+ * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
+ * @throws java.io.IOException Thrown, if the network or local disk I/O fails.
+ * @throws java.lang.InterruptedException Thrown, if the thread is interrupted while blocking during
+ * waiting for the next BufferOrEvent to become available.
+ */
+ BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+
+ void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+
+ void cleanup() throws IOException;
+
+ /**
+ * Checks if the barrier handler has buffered any data internally.
+ * @return True, if no data is buffered internally, false otherwise.
+ */
+ boolean isEmpty();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 9db0178..4d60375 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -40,6 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +69,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
private boolean isFinished;
- private final BarrierBuffer barrierBuffer;
+ private final CheckpointBarrierHandler barrierHandler;
private final long[] watermarks;
private long lastEmittedWatermark;
@@ -74,10 +77,17 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
private final DeserializationDelegate<Object> deserializationDelegate;
@SuppressWarnings("unchecked")
- public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
+ public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
+ EventListener<CheckpointBarrier> checkpointListener,
+ IOManager ioManager,
+ boolean enableWatermarkMultiplexing) throws IOException {
+
super(InputGateUtil.createInputGate(inputGates));
- barrierBuffer = new BarrierBuffer(inputGate, this);
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ if (checkpointListener != null) {
+ this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+ }
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
@@ -101,8 +111,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
lastEmittedWatermark = Long.MIN_VALUE;
}
-
- @SuppressWarnings("unchecked")
+
+
public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
if (isFinished) {
return false;
@@ -137,8 +147,10 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
}
continue;
- } else {
+ }
+ else {
// now we can do the actual processing
+ @SuppressWarnings("unchecked")
StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
if (ctx != null) {
@@ -150,32 +162,26 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
}
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentChannel = bufferOrEvent.getChannelIndex();
- currentRecordDeserializer = recordDeserializers[currentChannel];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
-
- if (event instanceof CheckpointBarrier) {
- barrierBuffer.processBarrier(bufferOrEvent);
- } else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- if (!barrierBuffer.isEmpty()) {
- throw new RuntimeException("BarrierBuffer should be empty at this point");
- }
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
+ final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+ if (bufferOrEvent != null) {
+ if (bufferOrEvent.isBuffer()) {
+ currentChannel = bufferOrEvent.getChannelIndex();
+ currentRecordDeserializer = recordDeserializers[currentChannel];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+ }
+ else {
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+ handleEvent(event);
}
}
+ else {
+ isFinished = true;
+ if (!barrierHandler.isEmpty()) {
+ throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
+ }
+ return false;
+ }
}
}
@@ -195,7 +201,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
}
+ @Override
public void cleanup() throws IOException {
- barrierBuffer.cleanup();
+ barrierHandler.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e235ffe..9668c7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -31,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -68,7 +70,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
private boolean isFinished;
- private final BarrierBuffer barrierBuffer;
+ private final CheckpointBarrierHandler barrierHandler;
private final long[] watermarks1;
private long lastEmittedWatermark1;
@@ -87,11 +89,17 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
- boolean enableWatermarkMultiplexing) {
+ EventListener<CheckpointBarrier> checkpointListener,
+ IOManager ioManager,
+ boolean enableWatermarkMultiplexing) throws IOException {
super(InputGateUtil.createInputGate(inputGates1, inputGates2));
- barrierBuffer = new BarrierBuffer(inputGate, this);
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ if (checkpointListener != null) {
+ this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+ }
+
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
@@ -186,32 +194,26 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
}
}
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentChannel = bufferOrEvent.getChannelIndex();
- currentRecordDeserializer = recordDeserializers[currentChannel];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
+ final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+ if (bufferOrEvent != null) {
- if (event instanceof CheckpointBarrier) {
- barrierBuffer.processBarrier(bufferOrEvent);
+ if (bufferOrEvent.isBuffer()) {
+ currentChannel = bufferOrEvent.getChannelIndex();
+ currentRecordDeserializer = recordDeserializers[currentChannel];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+
} else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- if (!barrierBuffer.isEmpty()) {
- throw new RuntimeException("BarrierBuffer should be empty at this point");
- }
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+ handleEvent(event);
+ }
+ }
+ else {
+ isFinished = true;
+ if (!barrierHandler.isEmpty()) {
+ throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
+ return false;
}
}
}
@@ -270,6 +272,6 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
@Override
public void cleanup() throws IOException {
- barrierBuffer.cleanup();
+ barrierHandler.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 9d6e88e..d078320 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -34,22 +34,27 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
@Override
public void registerInputOutput() {
- super.registerInputOutput();
-
- TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- if (numberOfInputs > 0) {
- InputGate[] inputGates = getEnvironment().getAllInputGates();
- inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, getExecutionConfig().areTimestampsEnabled());
-
- inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
-
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-
- inputProcessor.setReporter(reporter);
+ try {
+ super.registerInputOutput();
+
+ TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
+ int numberOfInputs = configuration.getNumberOfInputs();
+
+ if (numberOfInputs > 0) {
+ InputGate[] inputGates = getEnvironment().getAllInputGates();
+ inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
+ getCheckpointBarrierListener(),
+ getEnvironment().getIOManager(),
+ getExecutionConfig().areTimestampsEnabled());
+
+ // make sure that stream tasks report their I/O statistics
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+ inputProcessor.setReporter(reporter);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 75bdd57..d829833 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
@@ -74,7 +73,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected ClassLoader userClassLoader;
- private EventListener<TaskEvent> checkpointBarrierListener;
+ private EventListener<CheckpointBarrier> checkpointBarrierListener;
public StreamTask() {
streamOperator = null;
@@ -106,7 +105,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
streamOperator.setup(outputHandler.getOutput(), headContext);
}
- hasChainedOperators = !(outputHandler.getChainedOperators().size() == 1);
+ hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
}
public String getName() {
@@ -199,7 +198,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
this.isRunning = false;
}
- public EventListener<TaskEvent> getCheckpointBarrierListener() {
+ public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
return this.checkpointBarrierListener;
}
@@ -211,7 +210,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
@Override
public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
- // We retrieve end restore the states for the chained oeprators.
+ // We retrieve end restore the states for the chained operators.
List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle.getState();
// We restore all stateful chained operators
@@ -306,13 +305,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
// ------------------------------------------------------------------------
- private class CheckpointBarrierListener implements EventListener<TaskEvent> {
+ private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
@Override
- public void onEvent(TaskEvent event) {
+ public void onEvent(CheckpointBarrier barrier) {
try {
- CheckpointBarrier sStep = (CheckpointBarrier) event;
- triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+ triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
}
catch (Exception e) {
throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index f981cd5..b4667b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -34,44 +34,52 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
- StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+ private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
@Override
public void registerInputOutput() {
- super.registerInputOutput();
-
- TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
- TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
- ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
- List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
- for (int i = 0; i < numberOfInputs; i++) {
- int inputType = inEdges.get(i).getTypeNumber();
- InputGate reader = getEnvironment().getInputGate(i);
- switch (inputType) {
- case 1:
- inputList1.add(reader);
- break;
- case 2:
- inputList2.add(reader);
- break;
- default:
- throw new RuntimeException("Invalid input type number: " + inputType);
+ try {
+ super.registerInputOutput();
+
+ TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+ TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+
+ int numberOfInputs = configuration.getNumberOfInputs();
+
+ ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+ ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+
+ List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ int inputType = inEdges.get(i).getTypeNumber();
+ InputGate reader = getEnvironment().getInputGate(i);
+ switch (inputType) {
+ case 1:
+ inputList1.add(reader);
+ break;
+ case 2:
+ inputList2.add(reader);
+ break;
+ default:
+ throw new RuntimeException("Invalid input type number: " + inputType);
+ }
}
+
+ this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+ inputDeserializer1, inputDeserializer2,
+ getCheckpointBarrierListener(),
+ getEnvironment().getIOManager(),
+ getExecutionConfig().areTimestampsEnabled());
+
+ // make sure that stream tasks report their I/O statistics
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+ this.inputProcessor.setReporter(reporter);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
}
-
- inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, getExecutionConfig().areTimestampsEnabled());
-
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
- inputProcessor.setReporter(reporter);
-
- inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index c479f95..b59ad19 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -20,11 +20,8 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -220,14 +217,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
return isEvent;
}
}
-
- public static class DummyEvent extends TaskEvent {
- @Override
- public void write(DataOutputView out) throws IOException {
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
deleted file mode 100644
index d8a3696..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-
-public class BarrierBufferIOTest {
-
- @Test
- public void IOTest() throws IOException, InterruptedException {
-
- BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
- BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-
- MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
- new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
- // new BarrierSimulator[] { new CountBarrier(1000), new
- // CountBarrier(1000) });
-
- BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
- new BarrierBufferTest.MockReader(myIG));
-
- try {
- // long time = System.currentTimeMillis();
- for (int i = 0; i < 2000000; i++) {
- BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
- if (boe.isBuffer()) {
- boe.getBuffer().recycle();
- } else {
- barrierBuffer.processBarrier(boe);
- }
- }
- // System.out.println("Ran for " + (System.currentTimeMillis() -
- // time));
- } catch (Exception e) {
- fail();
- } finally {
- barrierBuffer.cleanup();
- }
- }
-
- private static class RandomBarrier implements BarrierGenerator {
- private static Random rnd = new Random();
-
- double threshold;
-
- public RandomBarrier(double expectedEvery) {
- threshold = 1 / expectedEvery;
- }
-
- @Override
- public boolean isNextBarrier() {
- return rnd.nextDouble() < threshold;
- }
- }
-
- private static class CountBarrier implements BarrierGenerator {
-
- long every;
- long c = 0;
-
- public CountBarrier(long every) {
- this.every = every;
- }
-
- @Override
- public boolean isNextBarrier() {
- return c++ % every == 0;
- }
- }
-
- protected static class MockInputGate implements InputGate {
-
- private int numChannels;
- private BufferPool[] bufferPools;
- private int[] currentBarriers;
- BarrierGenerator[] barrierGens;
- int currentChannel = 0;
- long c = 0;
-
- public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
- this.numChannels = bufferPools.length;
- this.currentBarriers = new int[numChannels];
- this.bufferPools = bufferPools;
- this.barrierGens = barrierGens;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return numChannels;
- }
-
- @Override
- public boolean isFinished() {
- return false;
- }
-
- @Override
- public void requestPartitions() throws IOException, InterruptedException {
- }
-
- @Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- currentChannel = (currentChannel + 1) % numChannels;
-
- if (barrierGens[currentChannel].isNextBarrier()) {
- return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
- currentChannel);
- } else {
- Buffer buffer = bufferPools[currentChannel].requestBuffer();
- buffer.getMemorySegment().putLong(0, c++);
-
- return new BufferOrEvent(buffer, currentChannel);
- }
-
- }
-
- @Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
- }
-
- @Override
- public void registerListener(EventListener<InputGate> listener) {
- }
-
- }
-
- protected interface BarrierGenerator {
- public boolean isNextBarrier();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
new file mode 100644
index 0000000..c2df4d8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import org.junit.Test;
+
+/**
+ * The test generates two random streams (input channels) which independently
+ * and randomly generate checkpoint barriers. The two streams are very
+ * unaligned, putting heavy work on the BarrierBuffer.
+ */
+public class BarrierBufferMassiveRandomTest {
+
+ @Test
+ public void testWithTwoChannelsAndRandomBarriers() {
+ IOManager ioMan = null;
+ try {
+ ioMan = new IOManagerAsync();
+
+ BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+ BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+ RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
+ new BufferPool[] { pool1, pool2 },
+ new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+
+ BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan);
+
+ for (int i = 0; i < 2000000; i++) {
+ BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+ if (boe.isBuffer()) {
+ boe.getBuffer().recycle();
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (ioMan != null) {
+ ioMan.shutdown();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Mocks and Generators
+ // ------------------------------------------------------------------------
+
+ protected interface BarrierGenerator {
+ public boolean isNextBarrier();
+ }
+
+ protected static class RandomBarrier implements BarrierGenerator {
+
+ private static final Random rnd = new Random();
+
+ private final double threshold;
+
+ public RandomBarrier(double expectedEvery) {
+ threshold = 1 / expectedEvery;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return rnd.nextDouble() < threshold;
+ }
+ }
+
+ private static class CountBarrier implements BarrierGenerator {
+
+ private final long every;
+ private long c = 0;
+
+ public CountBarrier(long every) {
+ this.every = every;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return c++ % every == 0;
+ }
+ }
+
+ protected static class RandomGeneratingInputGate implements InputGate {
+
+ private final int numChannels;
+ private final BufferPool[] bufferPools;
+ private final int[] currentBarriers;
+ private final BarrierGenerator[] barrierGens;
+ private int currentChannel = 0;
+ private long c = 0;
+
+ public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+ this.numChannels = bufferPools.length;
+ this.currentBarriers = new int[numChannels];
+ this.bufferPools = bufferPools;
+ this.barrierGens = barrierGens;
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public void requestPartitions() {}
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ currentChannel = (currentChannel + 1) % numChannels;
+
+ if (barrierGens[currentChannel].isNextBarrier()) {
+ return new BufferOrEvent(
+ new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+ currentChannel);
+ } else {
+ Buffer buffer = bufferPools[currentChannel].requestBuffer();
+ buffer.getMemorySegment().putLong(0, c++);
+ return new BufferOrEvent(buffer, currentChannel);
+ }
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) {}
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {}
+ }
+}