You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 00:15:43 UTC
[1/8] flink git commit: [tests] Add a manual test to evaluate impact
of checkpointing on latency
Repository: flink
Updated Branches:
refs/heads/master 571084152 -> 8f87b7164
[tests] Add a manual test to evaluate impact of checkpointing on latency
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed30ff4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed30ff4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed30ff4d
Branch: refs/heads/master
Commit: ed30ff4de46deb524d9449768d7199d99e3cc0f0
Parents: 2d237e1
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 19:33:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200
----------------------------------------------------------------------
.../manual/StreamingScalabilityAndLatency.java | 154 +++++++++++++++++++
.../apache/flink/test/manual/package-info.java | 4 +-
2 files changed, 156 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed30ff4d/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
new file mode 100644
index 0000000..a34ec15
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+import static org.junit.Assert.fail;
+
+public class StreamingScalabilityAndLatency {
+
+ public static void main(String[] args) throws Exception {
+ if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
+ throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
+ }
+
+ final int TASK_MANAGERS = 1;
+ final int SLOTS_PER_TASK_MANAGER = 80;
+ final int PARALLELISM = TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
+
+ LocalFlinkMiniCluster cluster = null;
+
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, TASK_MANAGERS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 20000);
+
+ config.setInteger("taskmanager.net.server.numThreads", 1);
+ config.setInteger("taskmanager.net.client.numThreads", 1);
+
+ cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.STREAMING);
+
+ runPartitioningProgram(cluster.getJobManagerRPCPort(), PARALLELISM);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+ env.setParallelism(parallelism);
+ env.getConfig().enableObjectReuse();
+
+ env.setBufferTimeout(5L);
+// env.enableCheckpointing(1000);
+
+ env
+ .addSource(new TimeStampingSource())
+ .map(new IdMapper<Tuple2<Long, Long>>())
+ .partitionByHash(0)
+ .addSink(new TimestampingSink());
+
+ env.execute("Partitioning Program");
+ }
+
+ public static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = -151782334777482511L;
+
+ private volatile boolean running = true;
+
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+ long num = 100;
+ long counter = (long) (Math.random() * 4096);
+
+ while (running) {
+ if (num < 100) {
+ num++;
+ ctx.collect(new Tuple2<Long, Long>(counter++, 0L));
+ }
+ else {
+ num = 0;
+ ctx.collect(new Tuple2<Long, Long>(counter++, System.currentTimeMillis()));
+ }
+
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ public static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1876986644706201196L;
+
+ private long maxLatency;
+ private long count;
+
+ @Override
+ public void invoke(Tuple2<Long, Long> value) {
+ long ts = value.f1;
+ if (ts != 0L) {
+ long diff = System.currentTimeMillis() - ts;
+ maxLatency = Math.max(diff, maxLatency);
+ }
+
+ count++;
+ if (count == 5000) {
+ System.out.println("Max latency: " + maxLatency);
+ count = 0;
+ maxLatency = 0;
+ }
+ }
+ }
+
+ public static class IdMapper<T> implements MapFunction<T, T> {
+
+ private static final long serialVersionUID = -6543809409233225099L;
+
+ @Override
+ public T map(T value) {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ed30ff4d/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
index 893f3cc..1c5744d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
@@ -18,7 +18,7 @@
/**
* This package contains various tests that are not automatically executed, but
- * need to be manually invoked, because they are extremely heavy of require larger-than-usual
- * JVMs.
+ * need to be manually invoked, because they are extremely heavy, time intensive,
+ * of require larger-than-usual JVMs.
*/
package org.apache.flink.test.manual;
\ No newline at end of file
[7/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve
stream alignment via the BarrierBuffer
Posted by se...@apache.org.
[FLINK-2406] [streaming] Abstract and improve stream alignment via the BarrierBuffer
- Add an interface for the functionaliy of the barrier buffer (for later addition of other implementatiions)
- Add broader tests for the BarrierBuffer, inluding trailing data and barrier races.
- Checkpoint barriers are handled by the buffer directly, rather than being returned and re-injected.
- Simplify logic in the BarrierBuffer and fix certain corner cases.
- Give access to spill directories properly via I/O manager, rather than via GlobalConfiguration singleton.
- Rename the "BarrierBufferIOTest" to "BarrierBufferMassiveRandomTest"
- A lot of code style/robustness fixes (proplery define constants, visibility, exception signatures)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0579f90b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0579f90b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0579f90b
Branch: refs/heads/master
Commit: 0579f90bab165a7df336163eb9d6337267020029
Parents: ed30ff4
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 26 18:58:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:07 2015 +0200
----------------------------------------------------------------------
.../runtime/io/disk/iomanager/IOManager.java | 12 +-
.../io/network/api/EndOfPartitionEvent.java | 23 +-
.../partition/consumer/SingleInputGate.java | 2 +-
.../streaming/runtime/io/BarrierBuffer.java | 413 +++++-----
.../streaming/runtime/io/BufferSpiller.java | 54 +-
.../runtime/io/CheckpointBarrierHandler.java | 55 ++
.../runtime/io/StreamInputProcessor.java | 69 +-
.../runtime/io/StreamTwoInputProcessor.java | 56 +-
.../runtime/tasks/OneInputStreamTask.java | 37 +-
.../streaming/runtime/tasks/StreamTask.java | 16 +-
.../runtime/tasks/TwoInputStreamTask.java | 74 +-
.../consumer/StreamTestSingleInputGate.java | 13 -
.../runtime/io/BarrierBufferIOTest.java | 159 ----
.../io/BarrierBufferMassiveRandomTest.java | 167 ++++
.../streaming/runtime/io/BarrierBufferTest.java | 775 +++++++++++++++----
.../runtime/io/DummyBufferRecycler.java | 8 +-
.../runtime/io/SpillingBufferOrEventTest.java | 20 +-
17 files changed, 1282 insertions(+), 671 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index c0bd360..45d9b9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -301,9 +301,19 @@ public abstract class IOManager {
*
* @return The number of temporary file directories.
*/
- public int getNumberOfTempDirs() {
+ public int getNumberOfSpillingDirectories() {
return this.paths.length;
}
+
+ /**
+ * Gets the directories that the I/O manager spills to.
+ *
+ * @return The directories that the I/O manager spills to.
+ */
+ public File[] getSpillingDirectories() {
+ return this.paths;
+ }
+
protected int getNextPathNum() {
final int next = this.nextPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index 49d7958..3ecdb94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -22,19 +22,34 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.RuntimeEvent;
-import java.io.IOException;
public class EndOfPartitionEvent extends RuntimeEvent {
public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
-
+
+
@Override
- public void read(DataInputView in) throws IOException {
+ public void read(DataInputView in) {
// Nothing to do here
}
@Override
- public void write(DataOutputView out) throws IOException {
+ public void write(DataOutputView out) {
// Nothing to do here
}
+
+ @Override
+ public int hashCode() {
+ return 1965146673;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == EndOfPartitionEvent.class;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 78aa6f7..0aebcae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -408,7 +408,7 @@ public class SingleInputGate implements InputGate {
// Sanity check that notifications only happen when data is available
if (buffer == null) {
- throw new IllegalStateException("Bug in input gate/channel logic: input gate got" +
+ throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
"notified by channel about available data, but none was available.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 40e84fc..466b8f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,263 +19,262 @@ package org.apache.flink.streaming.runtime.io;
import java.io.File;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
+import java.util.ArrayDeque;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The barrier buffer is responsible for implementing the blocking behaviour described
- * here: {@link CheckpointBarrier}.
- *
- * <p>
- * To avoid back-pressuring the
- * readers, we buffer up the new data received from the blocked channels until
- * the blocks are released.
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
+ * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
+ * the blocks are released.</p>
*/
-public class BarrierBuffer {
+public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+
+ /** The gate that the buffer draws its input from */
+ private final InputGate inputGate;
+
+ /** Flags that indicate whether a channel is currently blocked/buffered */
+ private final boolean[] blockedChannels;
+
+ /** The total number of channels that this buffer handles data from */
+ private final int totalNumberOfInputChannels;
+
+ private final SpillReader spillReader;
+ private final BufferSpiller bufferSpiller;
+
+ private ArrayDeque<SpillingBufferOrEvent> nonProcessed;
+ private ArrayDeque<SpillingBufferOrEvent> blockedNonProcessed;
+
+ /** Handler that receives the checkpoint notifications */
+ private EventListener<CheckpointBarrier> checkpointHandler;
+
+ /** The ID of the checkpoint for which we expect barriers */
+ private long currentCheckpointId = -1L;
+
+ /** The number of received barriers (= number of blocked/buffered channels) */
+ private long numReceivedBarriers;
+
+ /** Flag to indicate whether we have drawn all available input */
+ private boolean endOfStream;
+
+
+ public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+ this.inputGate = inputGate;
+ this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+ this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
+
+ this.nonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+ this.blockedNonProcessed = new ArrayDeque<SpillingBufferOrEvent>();
+
+ this.bufferSpiller = new BufferSpiller(ioManager);
+ this.spillReader = new SpillReader();
+ }
- private Queue<SpillingBufferOrEvent> nonProcessed = new LinkedList<SpillingBufferOrEvent>();
- private Queue<SpillingBufferOrEvent> blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
- private Set<Integer> blockedChannels = new HashSet<Integer>();
- private int totalNumberOfInputChannels;
-
- private CheckpointBarrier currentBarrier;
-
- private AbstractReader reader;
-
- private InputGate inputGate;
-
- private SpillReader spillReader;
- private BufferSpiller bufferSpiller;
-
- private boolean inputFinished = false;
+ // ------------------------------------------------------------------------
+ // Buffer and barrier handling
+ // ------------------------------------------------------------------------
- private BufferOrEvent endOfStreamEvent = null;
+ @Override
+ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ while (true) {
+ // process buffered BufferOrEvents before grabbing new ones
+ final SpillingBufferOrEvent nextBuffered = nonProcessed.pollFirst();
+ final BufferOrEvent next = nextBuffered == null ?
+ inputGate.getNextBufferOrEvent() :
+ nextBuffered.getBufferOrEvent();
+
+ if (next != null) {
+ if (isBlocked(next.getChannelIndex())) {
+ // if the channel is blocked we, we just store the BufferOrEvent
+ blockedNonProcessed.add(new SpillingBufferOrEvent(next, bufferSpiller, spillReader));
+ }
+ else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+ return next;
+ }
+ else if (!endOfStream) {
+ // process barriers only if there is a chance of the checkpoint completing
+ processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
+ }
+ }
+ else if (!endOfStream) {
+ // end of stream. we feed the data that is still buffered
+ endOfStream = true;
+ releaseBlocks();
+ return getNextNonBlocked();
+ }
+ else {
+ return null;
+ }
+ }
+ }
+
+ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+ final long barrierId = receivedBarrier.getId();
+
+ if (numReceivedBarriers > 0) {
+ // subsequent barrier of a checkpoint.
+ if (barrierId == currentCheckpointId) {
+ // regular case
+ onBarrier(channelIndex);
+ }
+ else if (barrierId > currentCheckpointId) {
+ // we did not complete the current checkpoint
+ LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+ releaseBlocks();
+ currentCheckpointId = barrierId;
+ onBarrier(channelIndex);
+ }
+ else {
+ // ignore trailing barrier from aborted checkpoint
+ return;
+ }
+
+ }
+ else if (barrierId > currentCheckpointId) {
+ // first barrier of a new checkpoint
+ currentCheckpointId = barrierId;
+ onBarrier(channelIndex);
+ }
+ else {
+ // trailing barrier from previous (skipped) checkpoint
+ return;
+ }
- private long lastCheckpointId = Long.MIN_VALUE;
+ // check if we have all barriers
+ if (numReceivedBarriers == totalNumberOfInputChannels) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+ receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ }
- public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
- this.inputGate = inputGate;
- totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.reader = reader;
- try {
- this.bufferSpiller = new BufferSpiller();
- this.spillReader = new SpillReader();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (checkpointHandler != null) {
+ checkpointHandler.onEvent(receivedBarrier);
+ }
+
+ releaseBlocks();
}
-
+ }
+
+ @Override
+ public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
+ if (this.checkpointHandler == null) {
+ this.checkpointHandler = checkpointHandler;
+ }
+ else {
+ throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
}
- /**
- * Get then next non-blocked non-processed {@link BufferOrEvent}. Returns null if
- * none available.
- *
- * @throws IOException
- */
- private BufferOrEvent getNonProcessed() throws IOException {
- SpillingBufferOrEvent nextNonProcessed;
-
- while ((nextNonProcessed = nonProcessed.poll()) != null) {
- BufferOrEvent boe = nextNonProcessed.getBufferOrEvent();
- if (isBlocked(boe.getChannelIndex())) {
- blockedNonProcessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
- } else {
- return boe;
+ @Override
+ public void cleanup() throws IOException {
+ bufferSpiller.close();
+ File spillfile1 = bufferSpiller.getSpillFile();
+ if (spillfile1 != null) {
+ if (!spillfile1.delete()) {
+ LOG.warn("Cannot remove barrier buffer spill file: " + spillfile1.getAbsolutePath());
}
}
- return null;
+ spillReader.close();
+ File spillfile2 = spillReader.getSpillFile();
+ if (spillfile2 != null) {
+ if (!spillfile2.delete()) {
+ LOG.warn("Cannot remove barrier buffer spill file: " + spillfile2.getAbsolutePath());
+ }
+ }
}
-
+
/**
* Checks whether the channel with the given index is blocked.
*
- * @param channelIndex The channel index to check
+ * @param channelIndex The channel index to check.
+ * @return True if the channel is blocked, false if not.
*/
private boolean isBlocked(int channelIndex) {
- return blockedChannels.contains(channelIndex);
+ return blockedChannels[channelIndex];
}
-
- /**
- * Checks whether all channels are blocked meaning that barriers have been
- * received from all channels
- */
- private boolean isAllBlocked() {
- return blockedChannels.size() == totalNumberOfInputChannels;
- }
-
- /**
- * Returns the next non-blocked {@link BufferOrEvent}. This is a blocking operator.
- */
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
- // If there are non-processed buffers from the previously blocked ones,
- // we get the next
- BufferOrEvent bufferOrEvent = getNonProcessed();
-
- if (bufferOrEvent != null) {
- return bufferOrEvent;
- } else if (blockedNonProcessed.isEmpty() && inputFinished) {
- return endOfStreamEvent;
- } else {
- // If no non-processed, get new from input
- while (true) {
- if (!inputFinished) {
- // We read the next buffer from the inputgate
- bufferOrEvent = inputGate.getNextBufferOrEvent();
-
- if (!bufferOrEvent.isBuffer()
- && bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
- if (inputGate.isFinished()) {
- // store the event for later if the channel is
- // closed
- endOfStreamEvent = bufferOrEvent;
- inputFinished = true;
- }
-
- } else {
- if (isBlocked(bufferOrEvent.getChannelIndex())) {
- // If channel blocked we just store it
- blockedNonProcessed.add(new SpillingBufferOrEvent(bufferOrEvent,
- bufferSpiller, spillReader));
- } else {
- return bufferOrEvent;
- }
- }
- } else {
- actOnAllBlocked();
- return getNextNonBlocked();
- }
- }
- }
- }
-
+
/**
* Blocks the given channel index, from which a barrier has been received.
*
- * @param channelIndex
- * The channel index to block.
+ * @param channelIndex The channel index to block.
*/
- private void blockChannel(int channelIndex) {
- if (!blockedChannels.contains(channelIndex)) {
- blockedChannels.add(channelIndex);
+ private void onBarrier(int channelIndex) throws IOException {
+ if (!blockedChannels[channelIndex]) {
+ blockedChannels[channelIndex] = true;
+ numReceivedBarriers++;
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Channel blocked with index: " + channelIndex);
- }
- if (isAllBlocked()) {
- actOnAllBlocked();
+ LOG.debug("Received barrier from channel " + channelIndex);
}
-
- } else {
- throw new RuntimeException("Tried to block an already blocked channel");
+ }
+ else {
+ throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
}
}
/**
* Releases the blocks on all channels.
*/
- private void releaseBlocks() {
- if (!nonProcessed.isEmpty()) {
- // sanity check
- throw new RuntimeException("Error in barrier buffer logic");
- }
- nonProcessed = blockedNonProcessed;
- blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
-
- try {
- spillReader.setSpillFile(bufferSpiller.getSpillFile());
- bufferSpiller.resetSpillFile();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- blockedChannels.clear();
- currentBarrier = null;
+ private void releaseBlocks() throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("All barriers received, blocks released");
+ LOG.debug("Releasing blocks");
}
- }
- /**
- * Method that is executed once the barrier has been received from all
- * channels.
- */
- private void actOnAllBlocked() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Publishing barrier to the vertex");
+ for (int i = 0; i < blockedChannels.length; i++) {
+ blockedChannels[i] = false;
}
-
- if (currentBarrier != null && !inputFinished) {
- reader.publish(currentBarrier);
- lastCheckpointId = currentBarrier.getId();
+ numReceivedBarriers = 0;
+
+ if (nonProcessed.isEmpty()) {
+ // swap the queues
+ ArrayDeque<SpillingBufferOrEvent> empty = nonProcessed;
+ nonProcessed = blockedNonProcessed;
+ blockedNonProcessed = empty;
}
-
- releaseBlocks();
- }
-
- /**
- * Processes one {@link org.apache.flink.streaming.runtime.tasks.CheckpointBarrier}
- *
- * @param bufferOrEvent The {@link BufferOrEvent} containing the checkpoint barrier
- */
- public void processBarrier(BufferOrEvent bufferOrEvent) {
- CheckpointBarrier receivedBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
-
- if (receivedBarrier.getId() < lastCheckpointId) {
- // a barrier from an old checkpoint, ignore these
- return;
- }
-
- if (currentBarrier == null) {
- this.currentBarrier = receivedBarrier;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint barrier received start waiting for checkpoint: {}", receivedBarrier);
- }
- } else if (receivedBarrier.getId() > currentBarrier.getId()) {
- // we have a barrier from a more recent checkpoint, free all locks and start with
- // this newer checkpoint
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checkpoint barrier received while waiting on checkpoint {}. Restarting waiting with checkpoint {}: ", currentBarrier, receivedBarrier);
- }
- releaseBlocks();
- currentBarrier = receivedBarrier;
-
+ else {
+ throw new IllegalStateException("Unconsumed data from previous checkpoint alignment " +
+ "when starting next checkpoint alignment");
}
- blockChannel(bufferOrEvent.getChannelIndex());
+
+ // roll over the spill files
+ spillReader.setSpillFile(bufferSpiller.getSpillFile());
+ bufferSpiller.resetSpillFile();
}
- public void cleanup() throws IOException {
- bufferSpiller.close();
- File spillfile1 = bufferSpiller.getSpillFile();
- if (spillfile1 != null) {
- spillfile1.delete();
- }
+ // ------------------------------------------------------------------------
+ // For Testing
+ // ------------------------------------------------------------------------
- spillReader.close();
- File spillfile2 = spillReader.getSpillFile();
- if (spillfile2 != null) {
- spillfile2.delete();
- }
+ public long getCurrentCheckpointId() {
+ return this.currentCheckpointId;
}
-
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
public String toString() {
- return nonProcessed.toString() + blockedNonProcessed.toString();
- }
-
- public boolean isEmpty() {
- return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
+ return "Non-Processed: " + nonProcessed + " | Blocked: " + blockedNonProcessed;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 0d57d05..fda612e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -22,28 +22,33 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.StringUtils;
public class BufferSpiller {
+
+ /** The random number generator for temp file names */
+ private static final Random RND = new Random();
- protected static Random rnd = new Random();
+ /** The counter that selects the next directory to spill into */
+ private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
+
+
+ /** The directories to spill to */
+ private final File tempDir;
private File spillFile;
- protected FileChannel spillingChannel;
- private String tempDir;
-
- public BufferSpiller() throws IOException {
- String tempDirString = GlobalConfiguration.getString(
- ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
- String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
-
- tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
-
+
+ private FileChannel spillingChannel;
+
+
+
+ public BufferSpiller(IOManager ioManager) throws IOException {
+ File[] tempDirs = ioManager.getSpillingDirectories();
+ this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
createSpillingChannel();
}
@@ -54,24 +59,20 @@ public class BufferSpiller {
try {
spillingChannel.write(buffer.getNioBuffer());
buffer.recycle();
- } catch (IOException e) {
+ }
+ catch (IOException e) {
close();
- throw new IOException(e);
+ throw e;
}
-
}
@SuppressWarnings("resource")
private void createSpillingChannel() throws IOException {
- this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+ this.spillFile = new File(tempDir, randomString(RND) + ".buffer");
this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
}
- private static String randomString(Random random) {
- final byte[] bytes = new byte[20];
- random.nextBytes(bytes);
- return StringUtils.byteToHexString(bytes);
- }
+
public void close() throws IOException {
if (spillingChannel != null && spillingChannel.isOpen()) {
@@ -87,5 +88,12 @@ public class BufferSpiller {
public File getSpillFile() {
return spillFile;
}
+
+ // ------------------------------------------------------------------------
+ private static String randomString(Random random) {
+ final byte[] bytes = new byte[20];
+ random.nextBytes(bytes);
+ return StringUtils.byteToHexString(bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
new file mode 100644
index 0000000..02dd33d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
+ * Different implementations may either simply track barriers, or block certain inputs on
+ * barriers.
+ */
+public interface CheckpointBarrierHandler {
+
+ /**
+ * Returns the next {@link BufferOrEvent} that the operator may consume.
+ * This call blocks until the next BufferOrEvent is available, ir until the stream
+ * has been determined to be finished.
+ *
+ * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
+ * @throws java.io.IOException Thrown, if the network or local disk I/O fails.
+ * @throws java.lang.InterruptedException Thrown, if the thread is interrupted while blocking during
+ * waiting for the next BufferOrEvent to become available.
+ */
+ BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+
+ void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+
+ void cleanup() throws IOException;
+
+ /**
+ * Checks if the barrier handler has buffered any data internally.
+ * @return True, if no data is buffered internally, false otherwise.
+ */
+ boolean isEmpty();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 9db0178..4d60375 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -40,6 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +69,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
private boolean isFinished;
- private final BarrierBuffer barrierBuffer;
+ private final CheckpointBarrierHandler barrierHandler;
private final long[] watermarks;
private long lastEmittedWatermark;
@@ -74,10 +77,17 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
private final DeserializationDelegate<Object> deserializationDelegate;
@SuppressWarnings("unchecked")
- public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
+ public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
+ EventListener<CheckpointBarrier> checkpointListener,
+ IOManager ioManager,
+ boolean enableWatermarkMultiplexing) throws IOException {
+
super(InputGateUtil.createInputGate(inputGates));
- barrierBuffer = new BarrierBuffer(inputGate, this);
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ if (checkpointListener != null) {
+ this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+ }
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
@@ -101,8 +111,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
lastEmittedWatermark = Long.MIN_VALUE;
}
-
- @SuppressWarnings("unchecked")
+
+
public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
if (isFinished) {
return false;
@@ -137,8 +147,10 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
}
continue;
- } else {
+ }
+ else {
// now we can do the actual processing
+ @SuppressWarnings("unchecked")
StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
if (ctx != null) {
@@ -150,32 +162,26 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
}
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentChannel = bufferOrEvent.getChannelIndex();
- currentRecordDeserializer = recordDeserializers[currentChannel];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
-
- if (event instanceof CheckpointBarrier) {
- barrierBuffer.processBarrier(bufferOrEvent);
- } else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- if (!barrierBuffer.isEmpty()) {
- throw new RuntimeException("BarrierBuffer should be empty at this point");
- }
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
+ final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+ if (bufferOrEvent != null) {
+ if (bufferOrEvent.isBuffer()) {
+ currentChannel = bufferOrEvent.getChannelIndex();
+ currentRecordDeserializer = recordDeserializers[currentChannel];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+ }
+ else {
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+ handleEvent(event);
}
}
+ else {
+ isFinished = true;
+ if (!barrierHandler.isEmpty()) {
+ throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
+ }
+ return false;
+ }
}
}
@@ -195,7 +201,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
}
}
+ @Override
public void cleanup() throws IOException {
- barrierBuffer.cleanup();
+ barrierHandler.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e235ffe..9668c7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
@@ -31,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -68,7 +70,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
private boolean isFinished;
- private final BarrierBuffer barrierBuffer;
+ private final CheckpointBarrierHandler barrierHandler;
private final long[] watermarks1;
private long lastEmittedWatermark1;
@@ -87,11 +89,17 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
- boolean enableWatermarkMultiplexing) {
+ EventListener<CheckpointBarrier> checkpointListener,
+ IOManager ioManager,
+ boolean enableWatermarkMultiplexing) throws IOException {
super(InputGateUtil.createInputGate(inputGates1, inputGates2));
- barrierBuffer = new BarrierBuffer(inputGate, this);
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ if (checkpointListener != null) {
+ this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+ }
+
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
@@ -186,32 +194,26 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
}
}
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentChannel = bufferOrEvent.getChannelIndex();
- currentRecordDeserializer = recordDeserializers[currentChannel];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
+ final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
+ if (bufferOrEvent != null) {
- if (event instanceof CheckpointBarrier) {
- barrierBuffer.processBarrier(bufferOrEvent);
+ if (bufferOrEvent.isBuffer()) {
+ currentChannel = bufferOrEvent.getChannelIndex();
+ currentRecordDeserializer = recordDeserializers[currentChannel];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+
} else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- if (!barrierBuffer.isEmpty()) {
- throw new RuntimeException("BarrierBuffer should be empty at this point");
- }
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+ handleEvent(event);
+ }
+ }
+ else {
+ isFinished = true;
+ if (!barrierHandler.isEmpty()) {
+ throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
+ return false;
}
}
}
@@ -270,6 +272,6 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
@Override
public void cleanup() throws IOException {
- barrierBuffer.cleanup();
+ barrierHandler.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 9d6e88e..d078320 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -34,22 +34,27 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
@Override
public void registerInputOutput() {
- super.registerInputOutput();
-
- TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- if (numberOfInputs > 0) {
- InputGate[] inputGates = getEnvironment().getAllInputGates();
- inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, getExecutionConfig().areTimestampsEnabled());
-
- inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
-
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-
- inputProcessor.setReporter(reporter);
+ try {
+ super.registerInputOutput();
+
+ TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
+ int numberOfInputs = configuration.getNumberOfInputs();
+
+ if (numberOfInputs > 0) {
+ InputGate[] inputGates = getEnvironment().getAllInputGates();
+ inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
+ getCheckpointBarrierListener(),
+ getEnvironment().getIOManager(),
+ getExecutionConfig().areTimestampsEnabled());
+
+ // make sure that stream tasks report their I/O statistics
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+ inputProcessor.setReporter(reporter);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 75bdd57..d829833 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
@@ -74,7 +73,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected ClassLoader userClassLoader;
- private EventListener<TaskEvent> checkpointBarrierListener;
+ private EventListener<CheckpointBarrier> checkpointBarrierListener;
public StreamTask() {
streamOperator = null;
@@ -106,7 +105,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
streamOperator.setup(outputHandler.getOutput(), headContext);
}
- hasChainedOperators = !(outputHandler.getChainedOperators().size() == 1);
+ hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
}
public String getName() {
@@ -199,7 +198,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
this.isRunning = false;
}
- public EventListener<TaskEvent> getCheckpointBarrierListener() {
+ public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
return this.checkpointBarrierListener;
}
@@ -211,7 +210,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
@Override
public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
- // We retrieve end restore the states for the chained oeprators.
+ // We retrieve end restore the states for the chained operators.
List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle.getState();
// We restore all stateful chained operators
@@ -306,13 +305,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
// ------------------------------------------------------------------------
- private class CheckpointBarrierListener implements EventListener<TaskEvent> {
+ private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
@Override
- public void onEvent(TaskEvent event) {
+ public void onEvent(CheckpointBarrier barrier) {
try {
- CheckpointBarrier sStep = (CheckpointBarrier) event;
- triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+ triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
}
catch (Exception e) {
throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index f981cd5..b4667b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -34,44 +34,52 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
- StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+ private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
@Override
public void registerInputOutput() {
- super.registerInputOutput();
-
- TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
- TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
- ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
- List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
- for (int i = 0; i < numberOfInputs; i++) {
- int inputType = inEdges.get(i).getTypeNumber();
- InputGate reader = getEnvironment().getInputGate(i);
- switch (inputType) {
- case 1:
- inputList1.add(reader);
- break;
- case 2:
- inputList2.add(reader);
- break;
- default:
- throw new RuntimeException("Invalid input type number: " + inputType);
+ try {
+ super.registerInputOutput();
+
+ TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+ TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+
+ int numberOfInputs = configuration.getNumberOfInputs();
+
+ ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+ ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+
+ List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ int inputType = inEdges.get(i).getTypeNumber();
+ InputGate reader = getEnvironment().getInputGate(i);
+ switch (inputType) {
+ case 1:
+ inputList1.add(reader);
+ break;
+ case 2:
+ inputList2.add(reader);
+ break;
+ default:
+ throw new RuntimeException("Invalid input type number: " + inputType);
+ }
}
+
+ this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+ inputDeserializer1, inputDeserializer2,
+ getCheckpointBarrierListener(),
+ getEnvironment().getIOManager(),
+ getExecutionConfig().areTimestampsEnabled());
+
+ // make sure that stream tasks report their I/O statistics
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+ this.inputProcessor.setReporter(reporter);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
}
-
- inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, getExecutionConfig().areTimestampsEnabled());
-
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
- inputProcessor.setReporter(reporter);
-
- inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index c479f95..b59ad19 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -20,11 +20,8 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -220,14 +217,4 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
return isEvent;
}
}
-
- public static class DummyEvent extends TaskEvent {
- @Override
- public void write(DataOutputView out) throws IOException {
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
deleted file mode 100644
index d8a3696..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-
-public class BarrierBufferIOTest {
-
- @Test
- public void IOTest() throws IOException, InterruptedException {
-
- BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
- BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-
- MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
- new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
- // new BarrierSimulator[] { new CountBarrier(1000), new
- // CountBarrier(1000) });
-
- BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
- new BarrierBufferTest.MockReader(myIG));
-
- try {
- // long time = System.currentTimeMillis();
- for (int i = 0; i < 2000000; i++) {
- BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
- if (boe.isBuffer()) {
- boe.getBuffer().recycle();
- } else {
- barrierBuffer.processBarrier(boe);
- }
- }
- // System.out.println("Ran for " + (System.currentTimeMillis() -
- // time));
- } catch (Exception e) {
- fail();
- } finally {
- barrierBuffer.cleanup();
- }
- }
-
- private static class RandomBarrier implements BarrierGenerator {
- private static Random rnd = new Random();
-
- double threshold;
-
- public RandomBarrier(double expectedEvery) {
- threshold = 1 / expectedEvery;
- }
-
- @Override
- public boolean isNextBarrier() {
- return rnd.nextDouble() < threshold;
- }
- }
-
- private static class CountBarrier implements BarrierGenerator {
-
- long every;
- long c = 0;
-
- public CountBarrier(long every) {
- this.every = every;
- }
-
- @Override
- public boolean isNextBarrier() {
- return c++ % every == 0;
- }
- }
-
- protected static class MockInputGate implements InputGate {
-
- private int numChannels;
- private BufferPool[] bufferPools;
- private int[] currentBarriers;
- BarrierGenerator[] barrierGens;
- int currentChannel = 0;
- long c = 0;
-
- public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
- this.numChannels = bufferPools.length;
- this.currentBarriers = new int[numChannels];
- this.bufferPools = bufferPools;
- this.barrierGens = barrierGens;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return numChannels;
- }
-
- @Override
- public boolean isFinished() {
- return false;
- }
-
- @Override
- public void requestPartitions() throws IOException, InterruptedException {
- }
-
- @Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- currentChannel = (currentChannel + 1) % numChannels;
-
- if (barrierGens[currentChannel].isNextBarrier()) {
- return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
- currentChannel);
- } else {
- Buffer buffer = bufferPools[currentChannel].requestBuffer();
- buffer.getMemorySegment().putLong(0, c++);
-
- return new BufferOrEvent(buffer, currentChannel);
- }
-
- }
-
- @Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
- }
-
- @Override
- public void registerListener(EventListener<InputGate> listener) {
- }
-
- }
-
- protected interface BarrierGenerator {
- public boolean isNextBarrier();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
new file mode 100644
index 0000000..c2df4d8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import org.junit.Test;
+
+/**
+ * The test generates two random streams (input channels) which independently
+ * and randomly generate checkpoint barriers. The two streams are very
+ * unaligned, putting heavy work on the BarrierBuffer.
+ */
+public class BarrierBufferMassiveRandomTest {
+
+ @Test
+ public void testWithTwoChannelsAndRandomBarriers() {
+ IOManager ioMan = null;
+ try {
+ ioMan = new IOManagerAsync();
+
+ BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+ BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+ RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
+ new BufferPool[] { pool1, pool2 },
+ new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+
+ BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan);
+
+ for (int i = 0; i < 2000000; i++) {
+ BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+ if (boe.isBuffer()) {
+ boe.getBuffer().recycle();
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (ioMan != null) {
+ ioMan.shutdown();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Mocks and Generators
+ // ------------------------------------------------------------------------
+
+ protected interface BarrierGenerator {
+ public boolean isNextBarrier();
+ }
+
+ protected static class RandomBarrier implements BarrierGenerator {
+
+ private static final Random rnd = new Random();
+
+ private final double threshold;
+
+ public RandomBarrier(double expectedEvery) {
+ threshold = 1 / expectedEvery;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return rnd.nextDouble() < threshold;
+ }
+ }
+
+ private static class CountBarrier implements BarrierGenerator {
+
+ private final long every;
+ private long c = 0;
+
+ public CountBarrier(long every) {
+ this.every = every;
+ }
+
+ @Override
+ public boolean isNextBarrier() {
+ return c++ % every == 0;
+ }
+ }
+
+ protected static class RandomGeneratingInputGate implements InputGate {
+
+ private final int numChannels;
+ private final BufferPool[] bufferPools;
+ private final int[] currentBarriers;
+ private final BarrierGenerator[] barrierGens;
+ private int currentChannel = 0;
+ private long c = 0;
+
+ public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+ this.numChannels = bufferPools.length;
+ this.currentBarriers = new int[numChannels];
+ this.bufferPools = bufferPools;
+ this.barrierGens = barrierGens;
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
+
+ @Override
+ public void requestPartitions() {}
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ currentChannel = (currentChannel + 1) % numChannels;
+
+ if (barrierGens[currentChannel].isNextBarrier()) {
+ return new BufferOrEvent(
+ new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+ currentChannel);
+ } else {
+ Buffer buffer = bufferPools[currentChannel].requestBuffer();
+ buffer.getMemorySegment().putLong(0, c++);
+ return new BufferOrEvent(buffer, currentChannel);
+ }
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) {}
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {}
+ }
+}
[4/8] flink git commit: [FLINK-2421] [streaming] Add tests for basic
utility behavior of StreamRecordSerializer (fixed in previous commit).
Posted by se...@apache.org.
[FLINK-2421] [streaming] Add tests for basic utility behavior of StreamRecordSerializer (fixed in previous commit).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d237e18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d237e18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d237e18
Branch: refs/heads/master
Commit: 2d237e18a2f7cf21721340933c505bb518c4fc66
Parents: acae9ff
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 18:08:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200
----------------------------------------------------------------------
.../StreamRecordSerializerTest.java | 68 ++++++++++++++++++++
1 file changed, 68 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d237e18/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
new file mode 100644
index 0000000..d48f7f4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class StreamRecordSerializerTest {
+
+ @Test
+ public void testDeepDuplication() {
+ try {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class);
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class);
+
+ when(serializer1.duplicate()).thenReturn(serializer2);
+
+ StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(serializer1);
+ assertEquals(serializer1, streamRecSer.getContainedTypeSerializer());
+
+ StreamRecordSerializer<Long> copy = streamRecSer.duplicate();
+ assertNotEquals(copy, streamRecSer);
+ assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBasicProperties() {
+ try {
+ StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
+
+ assertFalse(streamRecSer.isImmutableType());
+ assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass());
+ assertEquals(LongSerializer.INSTANCE.getLength(), streamRecSer.getLength());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
[5/8] flink git commit: [hotfix] Fix generics for stream record and
watermark multiplexing.
Posted by se...@apache.org.
[hotfix] Fix generics for stream record and watermark multiplexing.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acae9ff2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acae9ff2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acae9ff2
Branch: refs/heads/master
Commit: acae9ff2583384dada84b40a89d3a068e3b2a00c
Parents: 8ba3213
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 18:07:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200
----------------------------------------------------------------------
.../runtime/plugable/SerializationDelegate.java | 7 +-
.../runtime/io/RecordWriterOutput.java | 21 ++---
.../runtime/io/StreamInputProcessor.java | 25 +++---
.../runtime/io/StreamRecordWriter.java | 2 +-
.../runtime/io/StreamTwoInputProcessor.java | 57 +++++++------
.../MultiplexingStreamRecordSerializer.java | 35 ++++++--
.../runtime/streamrecord/StreamRecord.java | 16 ++--
.../streamrecord/StreamRecordSerializer.java | 65 ++++++++-------
.../streaming/runtime/tasks/OutputHandler.java | 85 +++++++++++---------
.../runtime/tasks/StreamTaskTestHarness.java | 7 +-
10 files changed, 185 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
index 3cbaac3..91b6dd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/SerializationDelegate.java
@@ -26,7 +26,12 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
+/**
+ * The serialization delegate exposes an arbitrary element as a {@link IOReadableWritable} for
+ * serialization, with the help of a type serializer.
+ *
+ * @param <T> The type to be represented as an IOReadableWritable.
+ */
public class SerializationDelegate<T> implements IOReadableWritable {
private T instance;
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index f7d8d47..de8c205 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -39,32 +39,33 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
- private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
- private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+ private RecordWriter<SerializationDelegate<Object>> recordWriter;
+
+ private SerializationDelegate<Object> serializationDelegate;
@SuppressWarnings("unchecked")
public RecordWriterOutput(
- RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
+ RecordWriter<?> recordWriter,
TypeSerializer<OUT> outSerializer,
boolean enableWatermarkMultiplexing) {
+
Preconditions.checkNotNull(recordWriter);
- this.recordWriter = recordWriter;
+ this.recordWriter = (RecordWriter<SerializationDelegate<Object>>) recordWriter;
- StreamRecordSerializer<OUT> outRecordSerializer;
+ TypeSerializer<Object> outRecordSerializer;
if (enableWatermarkMultiplexing) {
outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
} else {
- outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer);
+ outRecordSerializer = (TypeSerializer<Object>) (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
}
if (outSerializer != null) {
- serializationDelegate = new SerializationDelegate(outRecordSerializer);
+ serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer);
}
}
@Override
- @SuppressWarnings("unchecked")
public void collect(StreamRecord<OUT> record) {
serializationDelegate.setInstance(record);
@@ -79,9 +80,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
}
@Override
- @SuppressWarnings("unchecked,rawtypes")
public void emitWatermark(Watermark mark) {
- ((SerializationDelegate)serializationDelegate).setInstance(mark);
+ serializationDelegate.setInstance(mark);
+
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4c40e5f..9db0178 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -46,9 +46,8 @@ import org.slf4j.LoggerFactory;
/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
*
- * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
+ * <p>This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.</p>
*
* @param <IN> The type of the record that can be read with this record reader.
*/
@@ -63,33 +62,35 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
- int currentChannel = -1;
+ private int currentChannel = -1;
private boolean isFinished;
private final BarrierBuffer barrierBuffer;
- private long[] watermarks;
+ private final long[] watermarks;
private long lastEmittedWatermark;
- private DeserializationDelegate<Object> deserializationDelegate;
+ private final DeserializationDelegate<Object> deserializationDelegate;
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
super(InputGateUtil.createInputGate(inputGates));
barrierBuffer = new BarrierBuffer(inputGate, this);
-
- StreamRecordSerializer<IN> inputRecordSerializer;
+
if (enableWatermarkMultiplexing) {
- inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+ MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+ this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser);
} else {
- inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
+ StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
+ this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>)
+ (NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
}
- this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);
-
+
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index b0e2532..321f3b4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -58,7 +58,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
super(writer, channelSelector);
- checkArgument(timeout < 0);
+ checkArgument(timeout >= 0);
if (timeout == 0) {
flushAlways = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 82e7936..e235ffe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -64,64 +64,70 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
- int currentChannel = -1;
+ private int currentChannel = -1;
private boolean isFinished;
private final BarrierBuffer barrierBuffer;
- private long[] watermarks1;
+ private final long[] watermarks1;
private long lastEmittedWatermark1;
- private long[] watermarks2;
+ private final long[] watermarks2;
private long lastEmittedWatermark2;
- private int numInputChannels1;
- private int numInputChannels2;
+ private final int numInputChannels1;
- private DeserializationDelegate<Object> deserializationDelegate1;
- private DeserializationDelegate<Object> deserializationDelegate2;
+ private final DeserializationDelegate<Object> deserializationDelegate1;
+ private final DeserializationDelegate<Object> deserializationDelegate2;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
public StreamTwoInputProcessor(
Collection<InputGate> inputGates1,
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
boolean enableWatermarkMultiplexing) {
+
super(InputGateUtil.createInputGate(inputGates1, inputGates2));
barrierBuffer = new BarrierBuffer(inputGate, this);
-
- StreamRecordSerializer<IN1> inputRecordSerializer1;
+
if (enableWatermarkMultiplexing) {
- inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
- } else {
- inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1);
+ MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+ this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser);
}
- this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1);
-
- StreamRecordSerializer<IN2> inputRecordSerializer2;
+ else {
+ StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
+ this.deserializationDelegate1 = (DeserializationDelegate<Object>)
+ (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
+ }
+
if (enableWatermarkMultiplexing) {
- inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
- } else {
- inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2);
+ MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+ this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser);
+ }
+ else {
+ StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
+ this.deserializationDelegate2 = (DeserializationDelegate<Object>)
+ (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
}
- this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2);
// Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
- .getNumberOfInputChannels()];
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+
for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
}
// determine which unioned channels belong to input 1 and which belong to input 2
- numInputChannels1 = 0;
+ int numInputChannels1 = 0;
for (InputGate gate: inputGates1) {
numInputChannels1 += gate.getNumberOfInputChannels();
}
- numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+
+ this.numInputChannels1 = numInputChannels1;
+ int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
watermarks1 = new long[numInputChannels1];
for (int i = 0; i < numInputChannels1; i++) {
@@ -262,6 +268,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
}
}
+ @Override
public void cleanup() throws IOException {
barrierBuffer.cleanup();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 715f0d2..075c4fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.streamrecord;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -35,17 +36,36 @@ import java.io.IOException;
*
* @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
*/
-public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> {
-
- private final long IS_WATERMARK = Long.MIN_VALUE;
+public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Object> {
private static final long serialVersionUID = 1L;
+ private static final long IS_WATERMARK = Long.MIN_VALUE;
+
+ protected final TypeSerializer<T> typeSerializer;
+
+
public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
- super(serializer);
- if (serializer instanceof MultiplexingStreamRecordSerializer) {
+ if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
}
+ this.typeSerializer = Preconditions.checkNotNull(serializer);
+ }
+
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Object> duplicate() {
+ return this;
+ }
+
+ @Override
+ public Object createInstance() {
+ return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
}
@Override
@@ -81,6 +101,11 @@ public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSer
}
@Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
@SuppressWarnings("unchecked")
public void serialize(Object value, DataOutputView target) throws IOException {
if (value instanceof StreamRecord) {
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 6521e7f..92ce66f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -19,12 +19,13 @@ package org.apache.flink.streaming.runtime.streamrecord;
/**
* One value in a data stream. This stores the value and the associated timestamp.
+ *
+ * @param <T> The type encapsulated with the stream record.
*/
public class StreamRecord<T> {
-
- // We store it as Object so that we can reuse a StreamElement for emitting
- // elements of a different type while still reusing the timestamp.
- private Object value;
+
+ private T value;
+
private long timestamp;
/**
@@ -52,9 +53,8 @@ public class StreamRecord<T> {
/**
* Returns the value wrapped in this stream value.
*/
- @SuppressWarnings("unchecked")
public T getValue() {
- return (T) value;
+ return value;
}
/**
@@ -74,7 +74,7 @@ public class StreamRecord<T> {
*/
@SuppressWarnings("unchecked")
public <X> StreamRecord<X> replace(X element) {
- this.value = element;
+ this.value = (T) element;
return (StreamRecord<X>) this;
}
@@ -90,7 +90,7 @@ public class StreamRecord<T> {
@SuppressWarnings("unchecked")
public <X> StreamRecord<X> replace(X value, long timestamp) {
this.timestamp = timestamp;
- this.value = value;
+ this.value = (T) value;
return (StreamRecord<X>) this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2619891..e58d3c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -38,11 +38,12 @@ import org.apache.flink.core.memory.DataOutputView;
*
* @param <T> The type of value in the {@link StreamRecord}
*/
-public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
private static final long serialVersionUID = 1L;
- protected final TypeSerializer<T> typeSerializer;
+ private final TypeSerializer<T> typeSerializer;
+
public StreamRecordSerializer(TypeSerializer<T> serializer) {
if (serializer instanceof StreamRecordSerializer) {
@@ -51,19 +52,36 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
this.typeSerializer = Preconditions.checkNotNull(serializer);
}
+ public TypeSerializer<T> getContainedTypeSerializer() {
+ return this.typeSerializer;
+ }
+
+ // ------------------------------------------------------------------------
+ // General serializer and type utils
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamRecordSerializer<T> duplicate() {
+ TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
+ return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
+ }
+
@Override
public boolean isImmutableType() {
return false;
}
@Override
- @SuppressWarnings("unchecked")
- public StreamRecordSerializer<T> duplicate() {
- return this;
+ public int getLength() {
+ return typeSerializer.getLength();
}
+ // ------------------------------------------------------------------------
+ // Type serialization, copying, instantiation
+ // ------------------------------------------------------------------------
+
@Override
- public Object createInstance() {
+ public StreamRecord<T> createInstance() {
try {
return new StreamRecord<T>(typeSerializer.createInstance());
} catch (Exception e) {
@@ -72,46 +90,31 @@ public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
}
@Override
- @SuppressWarnings("unchecked")
- public Object copy(Object from) {
- StreamRecord<T> fromRecord = (StreamRecord<T>) from;
- return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+ public StreamRecord<T> copy(StreamRecord<T> from) {
+ return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
}
@Override
- @SuppressWarnings("unchecked")
- public Object copy(Object from, Object reuse) {
- StreamRecord<T> fromRecord = (StreamRecord<T>) from;
- StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
-
- reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0);
+ public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
+ reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
return reuse;
}
@Override
- public int getLength() {
- return -1;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void serialize(Object value, DataOutputView target) throws IOException {
- StreamRecord<T> record = (StreamRecord<T>) value;
- typeSerializer.serialize(record.getValue(), target);
+ public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
+ typeSerializer.serialize(value.getValue(), target);
}
@Override
- public Object deserialize(DataInputView source) throws IOException {
+ public StreamRecord<T> deserialize(DataInputView source) throws IOException {
T element = typeSerializer.deserialize(source);
return new StreamRecord<T>(element, 0);
}
@Override
- @SuppressWarnings("unchecked")
- public Object deserialize(Object reuse, DataInputView source) throws IOException {
- StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
- T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
- reuseRecord.replace(element, 0);
+ public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
+ T element = typeSerializer.deserialize(reuse.getValue(), source);
+ reuse.replace(element, 0);
return reuse;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index aa55151..84614bf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -44,28 +44,30 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OutputHandler<OUT> {
+
private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
- private StreamTask<OUT, ?> vertex;
- private StreamConfig configuration;
- private ClassLoader cl;
- private Output<StreamRecord<OUT>> outerOutput;
+ private final StreamTask<OUT, ?> vertex;
+
+ /** The classloader used to access all user code */
+ private final ClassLoader userCodeClassloader;
+
+
+ private final Output<StreamRecord<OUT>> outerOutput;
- public List<StreamOperator<?>> chainedOperators;
+ public final List<StreamOperator<?>> chainedOperators;
- private Map<StreamEdge, RecordWriterOutput<?>> outputMap;
+ private final Map<StreamEdge, RecordWriterOutput<?>> outputMap;
- private Map<Integer, StreamConfig> chainedConfigs;
- private List<StreamEdge> outEdgesInOrder;
+ private final Map<Integer, StreamConfig> chainedConfigs;
- /**
- * Counters for the number of records emitted and bytes written.
- */
- protected AccumulatorRegistry.Reporter reporter;
+ /** Counters for the number of records emitted and bytes written. */
+ protected final AccumulatorRegistry.Reporter reporter;
public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
@@ -73,17 +75,17 @@ public class OutputHandler<OUT> {
// Initialize some fields
this.vertex = vertex;
- this.configuration = new StreamConfig(vertex.getTaskConfiguration());
+ StreamConfig configuration = new StreamConfig(vertex.getTaskConfiguration());
this.chainedOperators = new ArrayList<StreamOperator<?>>();
this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
- this.cl = vertex.getUserCodeClassLoader();
+ this.userCodeClassloader = vertex.getUserCodeClassLoader();
// We read the chained configs, and the order of record writer
- // registrations by outputname
- this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
+ // registrations by output name
+ this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
this.chainedConfigs.put(configuration.getVertexID(), configuration);
- this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+ List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
this.reporter = reporter;
@@ -133,25 +135,24 @@ public class OutputHandler<OUT> {
* @return Returns the output for the chain starting from the given
* config
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressWarnings("unchecked")
private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-
-
+
// We create a wrapper that will encapsulate the chained operators and
// network outputs
- OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
+ OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(userCodeClassloader);
CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper);
// Create collectors for the network outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
+ for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(userCodeClassloader)) {
Output<?> output = outputMap.get(outputEdge);
wrapper.addCollector(output, outputEdge);
}
// Create collectors for the chained outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
+ for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(userCodeClassloader)) {
Integer outputId = outputEdge.getTargetId();
Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
@@ -163,11 +164,12 @@ public class OutputHandler<OUT> {
// The current task is the first chained task at this vertex so we
// return the wrapper
return (Output<StreamRecord<X>>) wrapper;
- } else {
+ }
+ else {
// The current task is a part of the chain so we get the chainable
// operator which will be returned and set it up using the wrapper
OneInputStreamOperator chainableOperator =
- chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
+ chainedTaskConfig.getStreamOperator(userCodeClassloader);
StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
vertex.contexts.add(chainedContext);
@@ -177,14 +179,20 @@ public class OutputHandler<OUT> {
chainedOperators.add(chainableOperator);
if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
return new ChainingOutput<X>(chainableOperator);
- } else {
- StreamRecordSerializer serializerIn1;
+ }
+ else {
+ TypeSerializer<X> typeSer = chainedTaskConfig.getTypeSerializerIn1(userCodeClassloader);
+ TypeSerializer<StreamRecord<X>> inSerializer;
+
if (vertex.getExecutionConfig().areTimestampsEnabled()) {
- serializerIn1 = new MultiplexingStreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
- } else {
- serializerIn1 = new StreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+ inSerializer = (TypeSerializer<StreamRecord<X>>)
+ (TypeSerializer<?>) new MultiplexingStreamRecordSerializer<X>(typeSer);
}
- return new CopyingChainingOutput<X>(chainableOperator, (TypeSerializer<StreamRecord<X>>) serializerIn1);
+ else {
+ inSerializer = new StreamRecordSerializer<X>(typeSer);
+ }
+
+ return new CopyingChainingOutput<X>(chainableOperator, inSerializer);
}
}
@@ -244,14 +252,14 @@ public class OutputHandler<OUT> {
}
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
- protected OneInputStreamOperator<T, ?> operator;
+
+ protected final OneInputStreamOperator<T, ?> operator;
public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
}
@Override
- @SuppressWarnings("unchecked")
public void collect(StreamRecord<T> record) {
try {
operator.getRuntimeContext().setNextInput(record);
@@ -268,7 +276,8 @@ public class OutputHandler<OUT> {
public void emitWatermark(Watermark mark) {
try {
operator.processWatermark(mark);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward element to operator: {}", e);
}
@@ -280,10 +289,12 @@ public class OutputHandler<OUT> {
public void close() {
try {
operator.close();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward close call to operator.", e);
}
+ throw new RuntimeException(e);
}
}
}
@@ -298,12 +309,12 @@ public class OutputHandler<OUT> {
}
@Override
- @SuppressWarnings("unchecked")
public void collect(StreamRecord<T> record) {
try {
operator.getRuntimeContext().setNextInput(record);
operator.processElement(serializer.copy(record));
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward element to operator.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acae9ff2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 283243e..435831f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
@@ -79,9 +78,8 @@ public class StreamTaskTestHarness<OUT> {
private AbstractInvokable task;
- private TypeInformation<OUT> outputType;
private TypeSerializer<OUT> outputSerializer;
- private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
+ private TypeSerializer<Object> outputStreamRecordSerializer;
private ConcurrentLinkedQueue<Object> outputList;
@@ -114,7 +112,6 @@ public class StreamTaskTestHarness<OUT> {
streamConfig.setChainStart();
streamConfig.setBufferTimeout(0);
- this.outputType = outputType;
outputSerializer = outputType.createSerializer(executionConfig);
outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
}
@@ -127,7 +124,7 @@ public class StreamTaskTestHarness<OUT> {
@SuppressWarnings("unchecked")
private void initializeOutput() {
- outputList = new ConcurrentLinkedQueue();
+ outputList = new ConcurrentLinkedQueue<Object>();
mockEnv.addOutput(outputList, outputStreamRecordSerializer);
[3/8] flink git commit: [FLINK-2418] [streaming] Add an end-to-end
exactly-once test for Checkpointed functions.
Posted by se...@apache.org.
[FLINK-2418] [streaming] Add an end-to-end exactly-once test for Checkpointed functions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0556efb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0556efb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0556efb
Branch: refs/heads/master
Commit: a0556efb233f15c6985d17886372a8b4b00392b2
Parents: 5710841
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 15:07:43 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200
----------------------------------------------------------------------
.../checkpointing/StateCheckpoinedITCase.java | 433 +++++++++++++++++++
1 file changed, 433 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a0556efb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
new file mode 100644
index 0000000..0693665
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ *
+ * The test triggers a failure after a while and verifies that, after completion, the
+ * state reflects the "exactly once" semantics.
+ */
+@SuppressWarnings("serial")
+public class StateCheckpoinedITCase {
+
+ private static final int NUM_TASK_MANAGERS = 2;
+ private static final int NUM_TASK_SLOTS = 3;
+ private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+ private static ForkableFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void startCluster() {
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+ config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+ cluster = new ForkableFlinkMiniCluster(config, false);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to start test cluster: " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ try {
+ cluster.shutdown();
+ cluster = null;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to stop test cluster: " + e.getMessage());
+ }
+ }
+
+
+ /**
+ * Runs the following program:
+ *
+ * <pre>
+ * [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+ * </pre>
+ */
+ @Test
+ public void runCheckpointedProgram() {
+
+ final long NUM_STRINGS = 10000000L;
+ assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRPCPort());
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(500);
+ env.getConfig().enableSysoutLogging();
+
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
+
+ stream
+ // -------------- first vertex, chained to the source ----------------
+ .filter(new StringRichFilterFunction())
+
+ // -------------- seconds vertex - one-to-one connected ----------------
+ .map(new StringPrefixCountRichMapFunction())
+ .startNewChain()
+ .map(new StatefulCounterFunction())
+
+ // -------------- third vertex - reducer and the sink ----------------
+ .partitionByHash("prefix")
+ .flatMap(new OnceFailingAggregator(NUM_STRINGS))
+ .addSink(new ValidatingSink());
+
+ env.execute();
+
+ long filterSum = 0;
+ for (long l : StringRichFilterFunction.counts) {
+ filterSum += l;
+ }
+
+ long mapSum = 0;
+ for (long l : StringPrefixCountRichMapFunction.counts) {
+ mapSum += l;
+ }
+
+ long countSum = 0;
+ for (long l : StatefulCounterFunction.counts) {
+ countSum += l;
+ }
+
+ // verify that we counted exactly right
+ assertEquals(NUM_STRINGS, filterSum);
+ assertEquals(NUM_STRINGS, mapSum);
+ assertEquals(NUM_STRINGS, countSum);
+
+ for (Map<Character, Long> map : ValidatingSink.maps) {
+ for (Long count : map.values()) {
+ assertEquals(NUM_STRINGS / 40, count.longValue());
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Functions
+ // --------------------------------------------------------------------------------------------
+
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+ implements CheckpointedAsynchronously<Integer>
+ {
+ private final long numElements;
+
+ private int index;
+
+ private volatile boolean isRunning = true;
+
+
+ StringGeneratingSourceFunction(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ final Object lockingObject = ctx.getCheckpointLock();
+
+ final Random rnd = new Random();
+ final StringBuilder stringBuilder = new StringBuilder();
+
+ final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ while (isRunning && index < numElements) {
+ char first = (char) ((index % 40) + 40);
+
+ stringBuilder.setLength(0);
+ stringBuilder.append(first);
+
+ String result = randomString(stringBuilder, rnd);
+
+ synchronized (lockingObject) {
+ index += step;
+ ctx.collect(result);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ private static String randomString(StringBuilder bld, Random rnd) {
+ final int len = rnd.nextInt(10) + 5;
+
+ for (int i = 0; i < len; i++) {
+ char next = (char) (rnd.nextInt(20000) + 33);
+ bld.append(next);
+ }
+
+ return bld.toString();
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
+ }
+
+ private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+
+ static final long[] counts = new long[PARALLELISM];
+
+ private long count;
+
+ @Override
+ public boolean filter(String value) {
+ count++;
+ return value.length() < 100; // should be always true
+ }
+
+ @Override
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+ }
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
+ }
+
+ private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+ implements CheckpointedAsynchronously<Long> {
+
+ static final long[] counts = new long[PARALLELISM];
+
+ private long count;
+
+ @Override
+ public PrefixCount map(String value) {
+ count++;
+ return new PrefixCount(value.substring(0, 1), value, 1L);
+ }
+
+ @Override
+ public void close() {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+ }
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
+ }
+
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+
+ static final long[] counts = new long[PARALLELISM];
+
+ private OperatorState<Long> count;
+
+ @Override
+ public PrefixCount map(PrefixCount value) throws Exception {
+ count.update(count.value() + 1);
+ return value;
+ }
+
+ @Override
+ public void open(Configuration conf) throws IOException {
+ count = getRuntimeContext().getOperatorState("count", 0L, false);
+ }
+
+ @Override
+ public void close() throws IOException {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+ }
+
+ }
+
+ private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
+ implements Checkpointed<HashMap<String, PrefixCount>> {
+
+ private static volatile boolean hasFailed = false;
+
+ private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
+
+ private final long numElements;
+
+ private long failurePos;
+ private long count;
+
+
+ OnceFailingAggregator(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+ long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+ failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+ count = 0;
+ }
+
+ @Override
+ public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
+ count++;
+ if (!hasFailed && count >= failurePos) {
+ hasFailed = true;
+ throw new Exception("Test Failure");
+ }
+
+ PrefixCount curr = aggregationMap.get(value.prefix);
+ if (curr == null) {
+ aggregationMap.put(value.prefix, value);
+ out.collect(value);
+ }
+ else {
+ curr.count += value.count;
+ out.collect(curr);
+ }
+ }
+
+ @Override
+ public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return aggregationMap;
+ }
+
+ @Override
+ public void restoreState(HashMap<String, PrefixCount> state) {
+ aggregationMap.putAll(state);
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<PrefixCount>
+ implements Checkpointed<HashMap<Character, Long>> {
+
+ @SuppressWarnings("unchecked")
+ private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
+
+ private HashMap<Character, Long> counts = new HashMap<Character, Long>();
+
+ @Override
+ public void invoke(PrefixCount value) {
+ Character first = value.prefix.charAt(0);
+ Long previous = counts.get(first);
+ if (previous == null) {
+ counts.put(first, value.count);
+ } else {
+ counts.put(first, Math.max(previous, value.count));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ maps[getRuntimeContext().getIndexOfThisSubtask()] = counts;
+ }
+
+ @Override
+ public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
+ return counts;
+ }
+
+ @Override
+ public void restoreState(HashMap<Character, Long> state) {
+ counts.putAll(state);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Type Classes
+ // --------------------------------------------------------------------------------------------
+
+ public static class PrefixCount implements Serializable {
+
+ public String prefix;
+ public String value;
+ public long count;
+
+ public PrefixCount() {}
+
+ public PrefixCount(String prefix, String value, long count) {
+ this.prefix = prefix;
+ this.value = value;
+ this.count = count;
+ }
+
+ @Override
+ public String toString() {
+ return prefix + " / " + value;
+ }
+ }
+}
[8/8] flink git commit: [FLINK-2402] [streaming] Add a stream
checkpoint barrier tracker.
Posted by se...@apache.org.
[FLINK-2402] [streaming] Add a stream checkpoint barrier tracker.
The BarrierTracker is non-blocking and only counts barriers.
That way, it does not increase latency of records in the stream, but can only be
used to obtain "at least once" processing guarantees.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f87b716
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f87b716
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f87b716
Branch: refs/heads/master
Commit: 8f87b7164b644ea8f1708f7eb76567e58341b224
Parents: 0579f90
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 26 19:05:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:07 2015 +0200
----------------------------------------------------------------------
.../streaming/runtime/io/BarrierTracker.java | 194 +++++++++
.../runtime/io/BarrierTrackerTest.java | 404 +++++++++++++++++++
2 files changed, 598 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8f87b716/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
new file mode 100644
index 0000000..6b24556
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+/**
+ * The BarrierTracker keeps track of what checkpoint barriers have been received from
+ * which input channels.
+ *
+ * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
+ * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
+ * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ */
+public class BarrierTracker implements CheckpointBarrierHandler {
+
+ private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
+
+ private final InputGate inputGate;
+
+ private final int totalNumberOfInputChannels;
+
+ private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
+
+ private EventListener<CheckpointBarrier> checkpointHandler;
+
+ private long latestPendingCheckpointID = -1;
+
+ public BarrierTracker(InputGate inputGate) {
+ this.inputGate = inputGate;
+ this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
+ this.pendingCheckpoints = new ArrayDeque<CheckpointBarrierCount>();
+ }
+
+ @Override
+ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ while (true) {
+ BufferOrEvent next = inputGate.getNextBufferOrEvent();
+ if (next == null) {
+ return null;
+ }
+ else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+ return next;
+ }
+ else {
+ processBarrier((CheckpointBarrier) next.getEvent());
+ }
+ }
+ }
+
+ @Override
+ public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
+ if (this.checkpointHandler == null) {
+ this.checkpointHandler = checkpointHandler;
+ }
+ else {
+ throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ pendingCheckpoints.clear();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return pendingCheckpoints.isEmpty();
+ }
+
+ private void processBarrier(CheckpointBarrier receivedBarrier) {
+ // fast path for single channel trackers
+ if (totalNumberOfInputChannels == 1) {
+ if (checkpointHandler != null) {
+ checkpointHandler.onEvent(receivedBarrier);
+ }
+ return;
+ }
+
+ // general path for multiple input channels
+ final long barrierId = receivedBarrier.getId();
+
+ // find the checkpoint barrier in the queue of bending barriers
+ CheckpointBarrierCount cbc = null;
+ int pos = 0;
+
+ for (CheckpointBarrierCount next : pendingCheckpoints) {
+ if (next.checkpointId == barrierId) {
+ cbc = next;
+ break;
+ }
+ pos++;
+ }
+
+ if (cbc != null) {
+ // add one to the count to that barrier and check for completion
+ int numBarriersNew = cbc.incrementBarrierCount();
+ if (numBarriersNew == totalNumberOfInputChannels) {
+ // checkpoint can be triggered
+ // first, remove this checkpoint and all all prior pending
+ // checkpoints (which are now subsumed)
+ for (int i = 0; i <= pos; i++) {
+ pendingCheckpoints.pollFirst();
+ }
+
+ // notify the listener
+ if (checkpointHandler != null) {
+ checkpointHandler.onEvent(receivedBarrier);
+ }
+ }
+ }
+ else {
+ // first barrier for that checkpoint ID
+ // add it only if it is newer than the latest checkpoint.
+ // if it is not newer than the latest checkpoint ID, then there cannot be a
+ // successful checkpoint for that ID anyways
+ if (barrierId > latestPendingCheckpointID) {
+ latestPendingCheckpointID = barrierId;
+ pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
+
+ // make sure we do not track too many checkpoints
+ if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
+ pendingCheckpoints.pollFirst();
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ //
+ // ------------------------------------------------------------------------
+
+ /**
+ * Simple class for a checkpoint ID with a barrier counter.
+ */
+ private static final class CheckpointBarrierCount {
+
+ private final long checkpointId;
+
+ private int barrierCount;
+
+ private CheckpointBarrierCount(long checkpointId) {
+ this.checkpointId = checkpointId;
+ this.barrierCount = 1;
+ }
+
+ public int incrementBarrierCount() {
+ return ++barrierCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof CheckpointBarrierCount) {
+ CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
+ return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f87b716/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
new file mode 100644
index 0000000..b2c570e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the behavior of the barrier tracker.
+ */
+public class BarrierTrackerTest {
+
+ @Test
+ public void testSingleChannelNoBarriers() {
+ try {
+ BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), createBuffer(0) };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ for (BufferOrEvent boe : sequence) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiChannelNoBarriers() {
+ try {
+ BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
+ createBuffer(1), createBuffer(0), createBuffer(3),
+ createBuffer(1), createBuffer(1), createBuffer(2)
+ };
+
+ MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ for (BufferOrEvent boe : sequence) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSingleChannelWithBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(0), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
+ createBarrier(2, 0), createBarrier(3, 0),
+ createBuffer(0), createBuffer(0),
+ createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSingleChannelWithSkippedBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0), createBuffer(0),
+ createBarrier(3, 0), createBuffer(0),
+ createBarrier(4, 0), createBarrier(6, 0), createBuffer(0),
+ createBarrier(7, 0), createBuffer(0), createBarrier(10, 0),
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiChannelWithBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1),
+ createBarrier(1, 0),
+
+ createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+ createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 2),
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 0), createBarrier(3, 1),
+
+ createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
+
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(1, 2, 3, 4);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiChannelSkippingCheckpoints() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1),
+ createBarrier(1, 0),
+
+ createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+ createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 2),
+ createBuffer(2), createBuffer(2),
+
+ // jump to checkpoint 4
+ createBarrier(4, 0),
+ createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(4, 1),
+ createBuffer(1),
+ createBarrier(4, 2),
+
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(1, 2, 4);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * This test validates that the barrier tracker does not immediately
+ * discard a pending checkpoint as soon as it sees a barrier from a
+ * later checkpoint from some channel.
+ *
+ * This behavior is crucial, otherwise topologies where different inputs
+ * have different latency (and that latency is close to or higher than the
+ * checkpoint interval) may skip many checkpoints, or fail to complete a
+ * checkpoint all together.
+ */
+ @Test
+ public void testCompleteCheckpointsOnLateBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 2
+ createBuffer(1), createBuffer(1), createBuffer(0), createBuffer(2),
+ createBarrier(2, 1), createBarrier(2, 0), createBarrier(2, 2),
+
+ // incomplete checkpoint 3
+ createBuffer(1), createBuffer(0),
+ createBarrier(3, 1), createBarrier(3, 2),
+
+ // some barriers from checkpoint 4
+ createBuffer(1), createBuffer(0),
+ createBarrier(4, 2), createBarrier(4, 1),
+ createBuffer(1), createBuffer(2),
+
+ // last barrier from checkpoint 3
+ createBarrier(3, 0),
+
+ // complete checkpoint 4
+ createBuffer(0), createBarrier(4, 0),
+
+ // regular checkpoint 5
+ createBuffer(1), createBuffer(2), createBarrier(5, 1),
+ createBuffer(0), createBarrier(5, 0),
+ createBuffer(1), createBarrier(5, 2),
+
+ // checkpoint 6 (incomplete),
+ createBuffer(1), createBarrier(6, 1),
+ createBuffer(0), createBarrier(6, 0),
+
+ // checkpoint 7, with early barriers for checkpoints 8 and 9
+ createBuffer(1), createBarrier(7, 1),
+ createBuffer(0), createBarrier(7, 2),
+ createBuffer(2), createBarrier(8, 2),
+ createBuffer(0), createBarrier(8, 1),
+ createBuffer(1), createBarrier(9, 1),
+
+ // complete checkpoint 7, first barriers from checkpoint 10
+ createBarrier(7, 0),
+ createBuffer(0), createBarrier(9, 2),
+ createBuffer(2), createBarrier(10, 2),
+
+ // complete checkpoint 8 and 9
+ createBarrier(8, 0),
+ createBuffer(1), createBuffer(2), createBarrier(9, 0),
+
+ // trailing data
+ createBuffer(1), createBuffer(0), createBuffer(2)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
+
+ private static BufferOrEvent createBarrier(long id, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ }
+
+ private static BufferOrEvent createBuffer(int channel) {
+ return new BufferOrEvent(
+ new Buffer(new MemorySegment(new byte[] { 1 }), DummyBufferRecycler.INSTANCE), channel);
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing Mocks
+ // ------------------------------------------------------------------------
+
+ private static class MockInputGate implements InputGate {
+
+ private final int numChannels;
+ private final Queue<BufferOrEvent> boes;
+
+ public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
+ this.numChannels = numChannels;
+ this.boes = new ArrayDeque<BufferOrEvent>(boes);
+ }
+
+ @Override
+ public int getNumberOfInputChannels() {
+ return numChannels;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return boes.isEmpty();
+ }
+
+ @Override
+ public void requestPartitions() {}
+
+ @Override
+ public BufferOrEvent getNextBufferOrEvent() {
+ return boes.poll();
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) {}
+
+ @Override
+ public void registerListener(EventListener<InputGate> listener) {}
+ }
+
+ private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+
+ private final long[] checkpointIDs;
+
+ private int i = 0;
+
+ private CheckpointSequenceValidator(long... checkpointIDs) {
+ this.checkpointIDs = checkpointIDs;
+ }
+
+ @Override
+ public void onEvent(CheckpointBarrier barrier) {
+ assertTrue("More checkpoints than expected", i < checkpointIDs.length);
+ assertNotNull(barrier);
+ assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
+ assertTrue(barrier.getTimestamp() > 0);
+ }
+ }
+}
[6/8] flink git commit: [FLINK-2406] [streaming] Abstract and improve
stream alignment via the BarrierBuffer
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cb5e046..ad61c6f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,152 +18,652 @@
package org.apache.flink.streaming.runtime.io;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-public class BarrierBufferTest {
-
- @Test
- public void testWithoutBarriers() throws IOException, InterruptedException {
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBuffer(0));
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
- InputGate mockIG = new MockInputGate(1, input);
- AbstractReader mockAR = new MockReader(mockIG);
+/**
+ * Tests for the behavior of the {@link BarrierBuffer}.
+ */
+public class BarrierBufferTest {
- BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+ private static int SIZE_COUNTER = 0;
+
+ private static IOManager IO_MANAGER;
- assertEquals(input.get(0), bb.getNextNonBlocked());
- assertEquals(input.get(1), bb.getNextNonBlocked());
- assertEquals(input.get(2), bb.getNextNonBlocked());
+ @BeforeClass
+ public static void setup() {
+ IO_MANAGER = new IOManagerAsync();
+ SIZE_COUNTER = 1;
+ }
- bb.cleanup();
+ @AfterClass
+ public static void shutdownIOManager() {
+ IO_MANAGER.shutdown();
}
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * Validates that the buffer behaves correctly if no checkpoint barriers come,
+ * for a single input channel.
+ */
@Test
- public void testOneChannelBarrier() throws IOException, InterruptedException {
+ public void testSingleChannelNoBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(0), createBuffer(0),
+ createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ for (BufferOrEvent boe : sequence) {
+ assertEquals(boe, buffer.getNextNonBlocked());
+ }
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBarrier(1, 0));
- input.add(createBuffer(0));
- input.add(createBuffer(0));
- input.add(createBarrier(2, 0));
- input.add(createBuffer(0));
+ /**
+ * Validates that the buffer behaves correctly if no checkpoint barriers come,
+ * for an input with multiple input channels.
+ */
+ @Test
+ public void testMultiChannelNoBarriers() {
+ try {
+ BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
+ createBuffer(1), createBuffer(0), createEndOfPartition(0),
+ createBuffer(3), createBuffer(1), createEndOfPartition(3),
+ createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
+ };
+
+ MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ for (BufferOrEvent boe : sequence) {
+ assertEquals(boe, buffer.getNextNonBlocked());
+ }
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- InputGate mockIG = new MockInputGate(1, input);
- AbstractReader mockAR = new MockReader(mockIG);
+ /**
+ * Validates that the buffer preserved the order of elements for a
+ * input with a single input channel, and checkpoint events.
+ */
+ @Test
+ public void testSingleChannelWithBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(0), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
+ createBarrier(2, 0), createBarrier(3, 0),
+ createBuffer(0), createBuffer(0),
+ createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
+ assertEquals(boe, buffer.getNextNonBlocked());
+ }
+ }
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
- BufferOrEvent nextBoe;
+ /**
+ * Validates that the buffer correctly aligns the streams for inputs with
+ * multiple input channels, by buffering and blocking certain inputs.
+ */
+ @Test
+ public void testMultiChannelWithBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+
+ // checkpoint without blocked data
+ createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
+ createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
+
+ // checkpoint with data only from one channel
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 2),
+ createBuffer(2), createBuffer(2),
+ createBarrier(3, 0), createBarrier(3, 1),
+
+ // empty checkpoint
+ createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
+
+ // checkpoint with blocked data in mixed order
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(5, 1),
+ createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1),
+ createBarrier(5, 2),
+ createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1),
+ createBarrier(5, 0),
+
+ // some trailing data
+ createBuffer(0),
+ createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // pre checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+ // blocking while aligning for checkpoint 1
+ check(sequence[7], buffer.getNextNonBlocked());
+ assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+ // checkpoint 1 done, returning buffered data
+ check(sequence[5], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+ check(sequence[6], buffer.getNextNonBlocked());
+
+ // pre checkpoint 2
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+
+ // checkpoint 2 barriers come together
+ check(sequence[17], buffer.getNextNonBlocked());
+ assertEquals(3L, handler.getNextExpectedCheckpointId());
+ check(sequence[18], buffer.getNextNonBlocked());
+
+ // checkpoint 3 starts, data buffered
+ check(sequence[20], buffer.getNextNonBlocked());
+ assertEquals(4L, handler.getNextExpectedCheckpointId());
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // checkpoint 4 happens without extra data
+
+ // pre checkpoint 5
+ check(sequence[27], buffer.getNextNonBlocked());
+ assertEquals(5L, handler.getNextExpectedCheckpointId());
+ check(sequence[28], buffer.getNextNonBlocked());
+ check(sequence[29], buffer.getNextNonBlocked());
+
+ // checkpoint 5 aligning
+ check(sequence[31], buffer.getNextNonBlocked());
+ check(sequence[32], buffer.getNextNonBlocked());
+ check(sequence[33], buffer.getNextNonBlocked());
+ check(sequence[37], buffer.getNextNonBlocked());
+
+ // buffered data from checkpoint 5 alignment
+ check(sequence[34], buffer.getNextNonBlocked());
+ check(sequence[36], buffer.getNextNonBlocked());
+ check(sequence[38], buffer.getNextNonBlocked());
+ check(sequence[39], buffer.getNextNonBlocked());
+
+ // remaining data
+ check(sequence[41], buffer.getNextNonBlocked());
+ check(sequence[42], buffer.getNextNonBlocked());
+ check(sequence[43], buffer.getNextNonBlocked());
+ check(sequence[44], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
- assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+ @Test
+ public void testMultiChannelTrailingBlockedData() {
+ try {
+ BufferOrEvent[] sequence = {
+ createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
+
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
+ createBarrier(2, 2),
+ createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // pre-checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ assertEquals(1L, handler.getNextExpectedCheckpointId());
+
+ // pre-checkpoint 2
+ check(sequence[6], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+ check(sequence[7], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+
+ // checkpoint 2 alignment
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+
+ // end of stream: remaining buffered contents
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[17], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- bb.cleanup();
+ /**
+ * Validates that the buffer correctly aligns the streams in cases
+ * where some channels receive barriers from multiple successive checkpoints
+ * before the pending checkpoint is complete.
+ */
+ @Test
+ public void testMultiChannelWithQueuedFutureBarriers() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 1 - with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(1), createBuffer(0),
+
+ // checkpoint 2 - where future checkpoint barriers come before
+ // the current checkpoint is complete
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(2), createBarrier(2, 0),
+ createBarrier(3, 0), createBuffer(0),
+ createBarrier(3, 1), createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(4, 1), createBuffer(1), createBuffer(2),
+
+ // complete checkpoint 2, send a barrier for checkpoints 4 and 5
+ createBarrier(2, 2),
+ createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+ createBarrier(4, 0),
+ createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+ createBarrier(5, 1),
+
+ // complete checkpoint 3
+ createBarrier(3, 2),
+ createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
+ createBarrier(6, 1),
+
+ // complete checkpoint 4, checkpoint 5 remains not fully triggered
+ createBarrier(4, 2),
+ createBuffer(2),
+ createBuffer(1), createEndOfPartition(1),
+ createBuffer(2), createEndOfPartition(2),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // around checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+
+ check(sequence[5], buffer.getNextNonBlocked());
+ assertEquals(2L, handler.getNextExpectedCheckpointId());
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // alignment of checkpoint 2 - buffering also some barriers for
+ // checkpoints 3 and 4
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[23], buffer.getNextNonBlocked());
+
+ // checkpoint 2 completed
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[25], buffer.getNextNonBlocked());
+ check(sequence[27], buffer.getNextNonBlocked());
+ check(sequence[30], buffer.getNextNonBlocked());
+ check(sequence[32], buffer.getNextNonBlocked());
+
+ // checkpoint 3 completed (emit buffered)
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[28], buffer.getNextNonBlocked());
+
+ // past checkpoint 3
+ check(sequence[36], buffer.getNextNonBlocked());
+ check(sequence[38], buffer.getNextNonBlocked());
+
+ // checkpoint 4 completed (emit buffered)
+ check(sequence[22], buffer.getNextNonBlocked());
+ check(sequence[26], buffer.getNextNonBlocked());
+ check(sequence[31], buffer.getNextNonBlocked());
+ check(sequence[33], buffer.getNextNonBlocked());
+ check(sequence[39], buffer.getNextNonBlocked());
+
+ // past checkpoint 4, alignment for checkpoint 5
+ check(sequence[42], buffer.getNextNonBlocked());
+ check(sequence[45], buffer.getNextNonBlocked());
+ check(sequence[46], buffer.getNextNonBlocked());
+ check(sequence[47], buffer.getNextNonBlocked());
+ check(sequence[48], buffer.getNextNonBlocked());
+
+ // end of input, emit remainder
+ check(sequence[37], buffer.getNextNonBlocked());
+ check(sequence[43], buffer.getNextNonBlocked());
+ check(sequence[44], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
+ /**
+ * Validates that the buffer skips over the current checkpoint if it
+ * receives a barrier from a later checkpoint on a non-blocked input.
+ */
@Test
- public void testMultiChannelBarrier() throws IOException, InterruptedException {
-
- List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createBarrier(1, 0));
- input.add(createBarrier(2, 0));
- input.add(createBuffer(0));
- input.add(createBarrier(3, 0));
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createBarrier(1, 1));
- input.add(createBuffer(0));
- input.add(createBuffer(1));
- input.add(createBarrier(2, 1));
- input.add(createBarrier(3, 1));
- input.add(createBarrier(4, 0));
- input.add(createBuffer(0));
- input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
-
+ public void testMultiChannelSkippingCheckpoints() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 1 - with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(1), createBuffer(0),
+
+ // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(2),
+ createBarrier(2, 0),
+ createBuffer(2), createBuffer(0),
+ createBarrier(3, 2),
+
+ createBuffer(2),
+ createBuffer(1), createEndOfPartition(1),
+ createBuffer(2), createEndOfPartition(2),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
+ buffer.registerCheckpointEventHandler(handler);
+ handler.setNextExpectedCheckpointId(1L);
+
+ // checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+ assertEquals(1L, buffer.getCurrentCheckpointId());
+
+ check(sequence[5], buffer.getNextNonBlocked());
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // alignment of checkpoint 2
+ check(sequence[13], buffer.getNextNonBlocked());
+ assertEquals(2L, buffer.getCurrentCheckpointId());
+ check(sequence[15], buffer.getNextNonBlocked());
+
+ // checkpoint 2 aborted, checkpoint 3 started
+ check(sequence[12], buffer.getNextNonBlocked());
+ assertEquals(3L, buffer.getCurrentCheckpointId());
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[23], buffer.getNextNonBlocked());
+ check(sequence[24], buffer.getNextNonBlocked());
+
+ // end of input, emit remainder
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[21], buffer.getNextNonBlocked());
+ check(sequence[22], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- InputGate mockIG1 = new MockInputGate(2, input);
- AbstractReader mockAR1 = new MockReader(mockIG1);
+ /**
+ * Validates that the buffer skips over a later checkpoint if it
+ * receives a barrier from an even later checkpoint on a blocked input.
+ *
+ * NOTE: This test currently fails, because the barrier buffer does not support
+ * to unblock inputs before all previously unblocked data is consumed.
+ *
+ * Since this test checks only that the buffer behaves "failsafe" in cases of
+ * corrupt checkpoint barrier propagation (a situation that does not occur
+ * under the current model), we ignore it for the moment.
+ */
+// @Test
+ public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
+ try {
+ BufferOrEvent[] sequence = {
+ // checkpoint 1 - with blocked data
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1), createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(1), createBuffer(0),
+
+ // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
+ createBarrier(2, 1),
+ createBuffer(1), createBuffer(2),
+ createBarrier(2, 0),
+ createBuffer(1), createBuffer(0),
+
+ createBarrier(4, 1), // pre-mature barrier on blocked input
+ createBarrier(3, 0), // queued barrier, ignored on replay
+
+ // complete checkpoint 2
+ createBarrier(2, 0),
+ createBuffer(0),
+
+ createBarrier(3, 2), // should be ignored
+ createBuffer(2),
+ createBarrier(4, 0),
+ createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(4, 1),
+
+ createBuffer(1), createEndOfPartition(1),
+ createBuffer(2), createEndOfPartition(2),
+ createBuffer(0), createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ // checkpoint 1
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+ assertEquals(1L, buffer.getCurrentCheckpointId());
+ check(sequence[5], buffer.getNextNonBlocked());
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // alignment of checkpoint 2
+ check(sequence[13], buffer.getNextNonBlocked());
+ assertEquals(2L, buffer.getCurrentCheckpointId());
+
+ // checkpoint 2 completed
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+
+ // checkpoint 3 skipped, alignment for 4 started
+ check(sequence[20], buffer.getNextNonBlocked());
+ assertEquals(4L, buffer.getCurrentCheckpointId());
+ check(sequence[22], buffer.getNextNonBlocked());
+ check(sequence[26], buffer.getNextNonBlocked());
+
+ // checkpoint 4 completed
+ check(sequence[24], buffer.getNextNonBlocked());
+ check(sequence[25], buffer.getNextNonBlocked());
+
+ check(sequence[28], buffer.getNextNonBlocked());
+ check(sequence[29], buffer.getNextNonBlocked());
+ check(sequence[30], buffer.getNextNonBlocked());
+ check(sequence[31], buffer.getNextNonBlocked());
+ check(sequence[32], buffer.getNextNonBlocked());
+ check(sequence[33], buffer.getNextNonBlocked());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
- BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
- BufferOrEvent nextBoe;
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
- check(input.get(0), nextBoe = bb.getNextNonBlocked());
- check(input.get(1), nextBoe = bb.getNextNonBlocked());
- check(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(7), nextBoe = bb.getNextNonBlocked());
- check(input.get(8), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(3), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(10), nextBoe = bb.getNextNonBlocked());
- check(input.get(11), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(4), nextBoe = bb.getNextNonBlocked());
- check(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(12), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(6), nextBoe = bb.getNextNonBlocked());
- check(input.get(9), nextBoe = bb.getNextNonBlocked());
- check(input.get(13), nextBoe = bb.getNextNonBlocked());
- bb.processBarrier(nextBoe);
- check(input.get(14), nextBoe = bb.getNextNonBlocked());
- check(input.get(15), nextBoe = bb.getNextNonBlocked());
+ private static BufferOrEvent createBarrier(long id, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ }
- bb.cleanup();
+ private static BufferOrEvent createBuffer(int channel) {
+ // since we have no access to the contents, we need to use the size as an
+ // identifier to validate correctness here
+ return new BufferOrEvent(
+ new Buffer(new MemorySegment(new byte[SIZE_COUNTER++]), DummyBufferRecycler.INSTANCE), channel);
}
- private static void check(BufferOrEvent expected, BufferOrEvent actual) {
- assertEquals(expected.isBuffer(), actual.isBuffer());
- assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
- if (expected.isEvent()) {
- assertEquals(expected.getEvent(), actual.getEvent());
+ private static BufferOrEvent createEndOfPartition(int channel) {
+ return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, channel);
+ }
+
+ private static void check(BufferOrEvent expected, BufferOrEvent present) {
+ assertNotNull(expected);
+ assertNotNull(present);
+ assertEquals(expected.isBuffer(), present.isBuffer());
+
+ if (expected.isBuffer()) {
+ // since we have no access to the contents, we need to use the size as an
+ // identifier to validate correctness here
+ assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+ }
+ else {
+ assertEquals(expected.getEvent(), present.getEvent());
}
}
+
+ // ------------------------------------------------------------------------
+ // Testing Mocks
+ // ------------------------------------------------------------------------
- protected static class MockInputGate implements InputGate {
+ private static class MockInputGate implements InputGate {
- private int numChannels;
- private Queue<BufferOrEvent> boes;
+ private final int numChannels;
+ private final Queue<BufferOrEvent> boes;
public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
this.numChannels = numChannels;
- this.boes = new LinkedList<BufferOrEvent>(boes);
+ this.boes = new ArrayDeque<BufferOrEvent>(boes);
}
@Override
@@ -176,48 +677,38 @@ public class BarrierBufferTest {
}
@Override
- public void requestPartitions() throws IOException, InterruptedException {
- }
+ public void requestPartitions() {}
@Override
- public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
- return boes.remove();
+ public BufferOrEvent getNextBufferOrEvent() {
+ return boes.poll();
}
@Override
- public void sendTaskEvent(TaskEvent event) throws IOException {
- }
+ public void sendTaskEvent(TaskEvent event) {}
@Override
- public void registerListener(EventListener<InputGate> listener) {
- }
-
+ public void registerListener(EventListener<InputGate> listener) {}
}
- protected static class MockReader extends AbstractReader {
+ private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+
+ private long nextExpectedCheckpointId = -1L;
- protected MockReader(InputGate inputGate) {
- super(inputGate);
+ public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
+ this.nextExpectedCheckpointId = nextExpectedCheckpointId;
}
- @Override
- public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
+ public long getNextExpectedCheckpointId() {
+ return nextExpectedCheckpointId;
}
- }
-
- protected static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
- }
- protected static BufferOrEvent createBuffer(int channel) {
- return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
- new BufferRecycler() {
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- }
- }), channel);
+ @Override
+ public void onEvent(CheckpointBarrier barrier) {
+ assertNotNull(barrier);
+ assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
+ assertTrue(barrier.getTimestamp() > 0);
+ nextExpectedCheckpointId++;
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
index 23ca86d..3f815ef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
@@ -25,10 +25,10 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
* A BufferRecycler that does nothing.
*/
public class DummyBufferRecycler implements BufferRecycler {
-
+
public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
-
-
+
+
@Override
public void recycle(MemorySegment memorySegment) {}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0579f90b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
index 9934bd9..b6cd656 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -22,18 +22,36 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
public class SpillingBufferOrEventTest {
+
+ private static IOManager IO_MANAGER;
+
+ @BeforeClass
+ public static void createIOManager() {
+ IO_MANAGER = new IOManagerAsync();
+ }
+
+ @AfterClass
+ public static void shutdownIOManager() {
+ IO_MANAGER.shutdown();
+ }
+ // ------------------------------------------------------------------------
+
@Test
public void testSpilling() throws IOException, InterruptedException {
- BufferSpiller bsp = new BufferSpiller();
+ BufferSpiller bsp = new BufferSpiller(IO_MANAGER);
SpillReader spr = new SpillReader();
BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
[2/8] flink git commit: [FLINK-2420] [streaming] StreamRecordWriter
properly reports exceptions on flush.
Posted by se...@apache.org.
[FLINK-2420] [streaming] StreamRecordWriter properly reports exceptions on flush.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ba32133
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ba32133
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ba32133
Branch: refs/heads/master
Commit: 8ba321332b994579f387add8bd0855bd29cb33ec
Parents: a0556ef
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 28 15:38:20 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 28 22:58:06 2015 +0200
----------------------------------------------------------------------
.../runtime/io/RecordWriterOutput.java | 13 +-
.../runtime/io/StreamRecordWriter.java | 142 ++++++++++++++-----
.../runtime/io/DummyBufferRecycler.java | 34 +++++
.../runtime/io/SpillingBufferOrEventTest.java | 4 +-
.../runtime/io/StreamRecordWriterTest.java | 132 +++++++++++++++++
.../checkpointing/StateCheckpoinedITCase.java | 4 +-
.../StreamCheckpointingITCase.java | 28 ++--
7 files changed, 296 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index b656bb5..f7d8d47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -94,15 +94,16 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
@Override
public void close() {
- if (recordWriter instanceof StreamRecordWriter) {
- ((StreamRecordWriter<?>) recordWriter).close();
- } else {
- try {
+ try {
+ if (recordWriter instanceof StreamRecordWriter) {
+ ((StreamRecordWriter<?>) recordWriter).close();
+ } else {
recordWriter.flush();
- } catch (IOException e) {
- e.printStackTrace();
}
}
+ catch (IOException e) {
+ throw new RuntimeException("Failed to flush final output", e);
+ }
}
public void clearBuffers() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index abae9a4..b0e2532 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -23,38 +23,61 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
- private long timeout;
- private boolean flushAlways = false;
+import static com.google.common.base.Preconditions.checkArgument;
- private OutputFlusher outputFlusher;
+/**
+ * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate thread
+ * that flushes the outputs in a defined interval, to make sure data does not linger in the buffers for too long.
+ *
+ * @param <T> The type of elements written.
+ */
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
- public StreamRecordWriter(ResultPartitionWriter writer) {
- this(writer, new RoundRobinChannelSelector<T>(), 1000);
- }
+ /** Default name for teh output flush thread, if no name with a task reference is given */
+ private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+
+ /** The thread that periodically flushes the output, to give an upper latency bound */
+ private final OutputFlusher outputFlusher;
+
+ /** Flag indicating whether the output should be flushed after every element */
+ private final boolean flushAlways;
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
- this(writer, channelSelector, 1000);
+ /** The exception encountered in the flushing thread */
+ private Throwable flusherException;
+
+
+
+ public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
+ this(writer, channelSelector, timeout, null);
}
-
+
public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
- long timeout) {
+ long timeout, String taskName) {
+
super(writer, channelSelector);
-
- this.timeout = timeout;
+
+ checkArgument(timeout < 0);
+
if (timeout == 0) {
flushAlways = true;
- } else {
- this.outputFlusher = new OutputFlusher();
+ outputFlusher = null;
+ }
+ else {
+ flushAlways = false;
+
+ String threadName = taskName == null ?
+ DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
+
+ outputFlusher = new OutputFlusher(threadName, timeout);
outputFlusher.start();
}
}
@Override
public void emit(T record) throws IOException, InterruptedException {
+ checkErroneous();
super.emit(record);
if (flushAlways) {
flush();
@@ -63,46 +86,101 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
@Override
public void broadcastEmit(T record) throws IOException, InterruptedException {
+ checkErroneous();
super.broadcastEmit(record);
if (flushAlways) {
flush();
}
}
- public void close() {
- try {
- if (outputFlusher != null) {
+ /**
+ * Closes the writer. This stops the flushing thread (if there is one) and flushes all pending outputs.
+ *
+ * @throws IOException I/O errors may happen during the final flush of the buffers.
+ */
+ public void close() throws IOException {
+ // propagate exceptions
+ flush();
+
+ if (outputFlusher != null) {
+ try {
outputFlusher.terminate();
outputFlusher.join();
}
+ catch (InterruptedException e) {
+ // ignore on close
+ }
+ }
- flush();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- // Do nothing here
+ // final check for asynchronous errors, before we exit with a green light
+ checkErroneous();
+ }
+
+ /**
+ * Notifies the writer that teh output flusher thread encountered an exception.
+ *
+ * @param t The exception to report.
+ */
+ void notifyFlusherException(Throwable t) {
+ if (this.flusherException == null) {
+ this.flusherException = t;
+ }
+ }
+
+ private void checkErroneous() throws IOException {
+ if (flusherException != null) {
+ throw new IOException("An exception happened while flushing the outputs", flusherException);
}
}
+ // ------------------------------------------------------------------------
+
+ /**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * The thread is daemonic, because it is only a utility thread.
+ */
private class OutputFlusher extends Thread {
+
+ private final long timeout;
+
private volatile boolean running = true;
+
+ OutputFlusher(String name, long timeout) {
+ super(name);
+ setDaemon(true);
+ this.timeout = timeout;
+ }
+
public void terminate() {
running = false;
+ interrupt();
}
@Override
public void run() {
- while (running) {
- try {
+ try {
+ while (running) {
+ try {
+ Thread.sleep(timeout);
+ }
+ catch (InterruptedException e) {
+ // propagate this if we are still running, because it should not happen
+ // in that case
+ if (running) {
+ throw new Exception(e);
+ }
+ }
+
+ // any errors here should let the thread come to a halt and be
+ // recognized by the writer
flush();
- Thread.sleep(timeout);
- } catch (InterruptedException e) {
- // Do nothing here
- } catch (IOException e) {
- throw new RuntimeException(e);
}
}
+ catch (Throwable t) {
+ notifyFlusherException(t);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
new file mode 100644
index 0000000..23ca86d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/DummyBufferRecycler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+/**
+ * A BufferRecycler that does nothing.
+ */
+public class DummyBufferRecycler implements BufferRecycler {
+
+ public static final BufferRecycler INSTANCE = new DummyBufferRecycler();
+
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
index e0fab17..9934bd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -26,9 +26,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.streaming.runtime.io.BufferSpiller;
-import org.apache.flink.streaming.runtime.io.SpillReader;
-import org.apache.flink.streaming.runtime.io.SpillingBufferOrEvent;
+
import org.junit.Test;
public class SpillingBufferOrEventTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
new file mode 100644
index 0000000..b5bece7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * This test uses the PowerMockRunner runner to work around the fact that the
+ * {@link ResultPartitionWriter} class is final.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+public class StreamRecordWriterTest {
+
+ /**
+ * Verifies that exceptions during flush from the output flush thread are
+ * recognized in the writer.
+ */
+ @Test
+ public void testPropagateAsyncFlushError() {
+ FailingWriter<LongValue> testWriter = null;
+ try {
+ ResultPartitionWriter mockResultPartitionWriter = getMockWriter(5);
+
+ // test writer that flushes every 5ms and fails after 3 flushes
+ testWriter = new FailingWriter<LongValue>(mockResultPartitionWriter,
+ new RoundRobinChannelSelector<LongValue>(), 5, 3);
+
+ try {
+ long deadline = System.currentTimeMillis() + 20000; // in max 20 seconds (conservative)
+ long l = 0L;
+
+ while (System.currentTimeMillis() < deadline) {
+ testWriter.emit(new LongValue(l++));
+ }
+
+ fail("This should have failed with an exception");
+ }
+ catch (IOException e) {
+ assertNotNull(e.getCause());
+ assertTrue(e.getCause().getMessage().contains("Test Exception"));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (testWriter != null) {
+ try {
+ testWriter.close();
+ }
+ catch (IOException e) {
+ // ignore in tests
+ }
+ }
+ }
+ }
+
+ private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
+ BufferProvider mockProvider = mock(BufferProvider.class);
+ when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
+ @Override
+ public Buffer answer(InvocationOnMock invocation) {
+ return new Buffer(new MemorySegment(new byte[4096]), DummyBufferRecycler.INSTANCE);
+ }
+ });
+
+ ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
+ when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
+ when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
+
+
+ return mockWriter;
+ }
+
+
+ // ------------------------------------------------------------------------
+
+ private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T> {
+
+ private int flushesBeforeException;
+
+ private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+ long timeout, int flushesBeforeException) {
+ super(writer, channelSelector, timeout);
+ this.flushesBeforeException = flushesBeforeException;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (flushesBeforeException-- <= 0) {
+ throw new IOException("Test Exception");
+ }
+ super.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 0693665..39ff2e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -31,8 +31,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
import org.apache.flink.util.Collector;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -110,7 +110,7 @@ public class StateCheckpoinedITCase {
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
- env.getConfig().enableSysoutLogging();
+ env.getConfig().disableSysoutLogging();
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
http://git-wip-us.apache.org/repos/asf/flink/blob/8ba32133/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 1730c63..438e980 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,40 +18,32 @@
package org.apache.flink.test.checkpointing;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Collector;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.