You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:16:56 UTC
[06/17] flink git commit: [FLINK-4329] [streaming api] Fix Streaming
File Source Timestamps/Watermarks Handling
[FLINK-4329] [streaming api] Fix Streaming File Source Timestamps/Watermarks Handling
This closes #2546
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ff451be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ff451be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ff451be
Branch: refs/heads/master
Commit: 8ff451bec58e9f5800eb77c74c1d7457b776cc94
Parents: c62776f
Author: kl0u <kk...@gmail.com>
Authored: Thu Aug 25 17:38:49 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200
----------------------------------------------------------------------
.../state/RocksDBAsyncSnapshotTest.java | 11 +-
.../ContinuousFileMonitoringFunctionITCase.java | 17 +-
.../hdfstests/ContinuousFileMonitoringTest.java | 209 ++++++++++++--
.../fs/bucketing/BucketingSinkTest.java | 4 +-
.../source/ContinuousFileReaderOperator.java | 96 ++++---
.../streaming/api/operators/StreamSource.java | 275 +-----------------
.../api/operators/StreamSourceContexts.java | 284 +++++++++++++++++++
.../runtime/tasks/AsyncExceptionHandler.java | 8 +-
.../tasks/DefaultTimeServiceProvider.java | 11 +-
.../runtime/tasks/OneInputStreamTask.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 54 +---
.../runtime/tasks/TwoInputStreamTask.java | 2 +-
.../operators/StreamSourceOperatorTest.java | 17 +-
.../runtime/operators/TimeProviderTest.java | 79 ++++--
...AlignedProcessingTimeWindowOperatorTest.java | 34 ++-
...AlignedProcessingTimeWindowOperatorTest.java | 36 ++-
.../runtime/tasks/StreamMockEnvironment.java | 8 +-
.../KeyedOneInputStreamOperatorTestHarness.java | 4 +-
.../util/OneInputStreamOperatorTestHarness.java | 23 +-
19 files changed, 694 insertions(+), 480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bccbabc..2ebd84a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -86,7 +86,7 @@ public class RocksDBAsyncSnapshotTest {
}
/**
- * This ensures that asynchronous state handles are actually materialized asynchonously.
+ * This ensures that asynchronous state handles are actually materialized asynchronously.
*
* <p>We use latches to block at various stages and see if the code still continues through
* the parts that are not asynchronous. If the checkpoint is not done asynchronously the
@@ -168,7 +168,6 @@ public class RocksDBAsyncSnapshotTest {
while (!field.getBoolean(task)) {
Thread.sleep(10);
}
-
}
}
@@ -189,7 +188,9 @@ public class RocksDBAsyncSnapshotTest {
Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
testHarness.waitForTaskCompletion();
- task.checkTimerException();
+ if (mockEnv.wasFailedExternally()) {
+ Assert.fail("Unexpected exception during execution.");
+ }
}
/**
@@ -261,8 +262,10 @@ public class RocksDBAsyncSnapshotTest {
threadPool.shutdown();
Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
testHarness.waitForTaskCompletion();
- task.checkTimerException();
+ if (mockEnv.wasFailedExternally()) {
+ throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
+ }
Assert.fail("Operation completed. Cancel failed.");
} catch (Exception expected) {
AsynchronousException asynchronousException = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index 663345c..079bf04 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -120,7 +120,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(4);
format.setFilesFilter(FilePathFilter.createDefaultFilter());
ContinuousFileMonitoringFunction<String> monitoringFunction =
@@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
- TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);
+ TestingSinkFunction sink = new TestingSinkFunction();
DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
@@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
private static class TestingSinkFunction extends RichSinkFunction<String> {
- private final ContinuousFileMonitoringFunction src;
-
private int elementCounter = 0;
private Map<Integer, Integer> elementCounters = new HashMap<>();
private Map<Integer, List<String>> collectedContent = new HashMap<>();
- TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) {
- this.src = monitoringFunction;
- }
-
@Override
public void open(Configuration parameters) throws Exception {
// this sink can only work with DOP 1
@@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
}
expectedContents.clear();
-
- src.cancel();
- try {
- src.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
}
private int getLineNo(String line) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 8a700f5..36b5c5e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
@@ -51,7 +53,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
public class ContinuousFileMonitoringTest {
@@ -106,6 +107,155 @@ public class ContinuousFileMonitoringTest {
// TESTS
@Test
+ public void testFileReadingOperatorWithIngestionTime() throws Exception {
+ Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+ Map<Integer, String> expectedFileContents = new HashMap<>();
+
+ for(int i = 0; i < NO_OF_FILES; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+ filesCreated.add(file.f0);
+ expectedFileContents.put(i, file.f1);
+ }
+
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+ final long watermarkInterval = 10;
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+ ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+ reader.setOutputType(typeInfo, executionConfig);
+
+ final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+ final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
+ new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
+ tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ tester.open();
+
+ Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
+
+ // test that watermarks are correctly emitted
+
+ timeServiceProvider.setCurrentTime(201);
+ timeServiceProvider.setCurrentTime(301);
+ timeServiceProvider.setCurrentTime(401);
+ timeServiceProvider.setCurrentTime(501);
+
+ int i = 0;
+ for(Object line: tester.getOutput()) {
+ if (!(line instanceof Watermark)) {
+ Assert.fail("Only watermarks are expected here ");
+ }
+ Watermark w = (Watermark) line;
+ Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+ i++;
+ }
+
+ // clear the output to get the elements only and the final watermark
+ tester.getOutput().clear();
+ Assert.assertEquals(0, tester.getOutput().size());
+
+ // create the necessary splits for the test
+ FileInputSplit[] splits = format.createInputSplits(
+ reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+ // and feed them to the operator
+ Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+ long lastSeenWatermark = Long.MIN_VALUE;
+ int lineCounter = 0; // counter for the lines read from the splits
+ int watermarkCounter = 0;
+
+ for(FileInputSplit split: splits) {
+
+ // set the next "current processing time".
+ long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+ timeServiceProvider.setCurrentTime(nextTimestamp);
+
+ // send the next split to be read and wait until it is fully read.
+ tester.processElement(new StreamRecord<>(split));
+ synchronized (tester.getCheckpointLock()) {
+ while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+ tester.getCheckpointLock().wait(10);
+ }
+ }
+
+ // verify that the results are the expected
+ for(Object line: tester.getOutput()) {
+ if (line instanceof StreamRecord) {
+ StreamRecord<String> element = (StreamRecord<String>) line;
+ lineCounter++;
+
+ Assert.assertEquals(nextTimestamp, element.getTimestamp());
+
+ int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+ List<String> content = actualFileContents.get(fileIdx);
+ if (content == null) {
+ content = new ArrayList<>();
+ actualFileContents.put(fileIdx, content);
+ }
+ content.add(element.getValue() + "\n");
+ } else if (line instanceof Watermark) {
+ long watermark = ((Watermark) line).getTimestamp();
+
+ Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
+ Assert.assertTrue(watermark > lastSeenWatermark);
+ watermarkCounter++;
+
+ lastSeenWatermark = watermark;
+ } else {
+ Assert.fail("Unknown element in the list.");
+ }
+ }
+
+ // clean the output to be ready for the next split
+ tester.getOutput().clear();
+ }
+
+ // now we are processing one split after the other,
+ // so all the elements must be here by now.
+ Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+ // because we expect one watermark per split.
+ Assert.assertEquals(NO_OF_FILES, watermarkCounter);
+
+ // then close the reader gracefully so that the Long.MAX watermark is emitted
+ synchronized (tester.getCheckpointLock()) {
+ tester.close();
+ }
+
+ for(org.apache.hadoop.fs.Path file: filesCreated) {
+ hdfs.delete(file, false);
+ }
+
+ // check if the last element is the LongMax watermark (by now this must be the only element)
+ Assert.assertEquals(1, tester.getOutput().size());
+ Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
+ Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
+
+ // check if the elements are the expected ones.
+ Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
+ for (Integer fileIdx: expectedFileContents.keySet()) {
+ Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
+
+ List<String> cntnt = actualFileContents.get(fileIdx);
+ Collections.sort(cntnt, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return getLineNo(o1) - getLineNo(o2);
+ }
+ });
+
+ StringBuilder cntntStr = new StringBuilder();
+ for (String line: cntnt) {
+ cntntStr.append(line);
+ }
+ Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
+ }
+ }
+
+ @Test
public void testFileReadingOperator() throws Exception {
Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
@@ -119,10 +269,11 @@ public class ContinuousFileMonitoringTest {
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+ reader.setOutputType(typeInfo, new ExecutionConfig());
+
OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
new OneInputStreamOperatorTestHarness<>(reader);
-
- reader.setOutputType(typeInfo, new ExecutionConfig());
+ tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
tester.open();
// create the necessary splits for the test
@@ -134,38 +285,38 @@ public class ContinuousFileMonitoringTest {
tester.processElement(new StreamRecord<>(split));
}
- // then close the reader gracefully
+ // then close the reader gracefully (and wait to finish reading)
synchronized (tester.getCheckpointLock()) {
tester.close();
}
- /*
- * Given that the reader is multithreaded, the test finishes before the reader thread finishes
- * reading. This results in files being deleted by the test before being read, thus throwing an exception.
- * In addition, even if file deletion happens at the end, the results are not ready for testing.
- * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
- */
+ // the lines received must be the elements in the files +1 for for the longMax watermark
+ // we are in event time, which emits no watermarks, so the last watermark will mark the
+ // of the input stream.
- long start = System.currentTimeMillis();
- Queue<Object> output;
- do {
- output = tester.getOutput();
- Thread.sleep(50);
- } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+ Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size());
Map<Integer, List<String>> actualFileContents = new HashMap<>();
+ Object lastElement = null;
for(Object line: tester.getOutput()) {
- StreamRecord<String> element = (StreamRecord<String>) line;
-
- int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
- List<String> content = actualFileContents.get(fileIdx);
- if(content == null) {
- content = new ArrayList<>();
- actualFileContents.put(fileIdx, content);
+ lastElement = line;
+ if (line instanceof StreamRecord) {
+ StreamRecord<String> element = (StreamRecord<String>) line;
+
+ int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+ List<String> content = actualFileContents.get(fileIdx);
+ if (content == null) {
+ content = new ArrayList<>();
+ actualFileContents.put(fileIdx, content);
+ }
+ content.add(element.getValue() + "\n");
}
- content.add(element.getValue() +"\n");
}
+ // check if the last element is the LongMax watermark
+ Assert.assertTrue(lastElement instanceof Watermark);
+ Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
+
Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
for (Integer fileIdx: expectedFileContents.keySet()) {
Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
@@ -224,7 +375,7 @@ public class ContinuousFileMonitoringTest {
monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
- Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+ Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
for(int i = 0; i < NO_OF_FILES; i++) {
org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
@@ -268,8 +419,8 @@ public class ContinuousFileMonitoringTest {
t.interrupt();
fc.join();
- Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
- Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+ Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
+ Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
Set<String> fileNamesCreated = new HashSet<>();
@@ -316,7 +467,7 @@ public class ContinuousFileMonitoringTest {
// wait until all the files are created
fc.join();
- Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+ Assert.assertEquals(NO_OF_FILES, filesCreated.size());
Set<String> fileNamesCreated = new HashSet<>();
for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
@@ -337,7 +488,7 @@ public class ContinuousFileMonitoringTest {
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
- Assert.assertTrue(tkns.length == 6);
+ Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index e274fdd..ac1e3f0 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -70,7 +70,7 @@ public class BucketingSinkTest {
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
- private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TimeServiceProvider clock) {
+ private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TestTimeServiceProvider clock) {
BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath())
.setBucketer(new Bucketer<String>() {
private static final long serialVersionUID = 1L;
@@ -91,7 +91,7 @@ public class BucketingSinkTest {
}
private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(BucketingSink<T> sink,
- TimeServiceProvider clock) {
+ TestTimeServiceProvider clock) {
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig(), clock);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 838bee6..35e72a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -29,14 +29,13 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,20 +43,24 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors
- * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another
- * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from
- * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()}
- * so that the checkpoints reflect the current state.
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism
+ * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has
+ * a parallelism of 1.
+ * <p/>
+ * This operator will receive the split descriptors, put them in a queue, and have another
+ * thread read the actual data from the split. This architecture allows the separation of the
+ * reading thread, from the one emitting the checkpoint barriers, thus removing any potential
+ * back-pressure.
*/
@Internal
public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
@@ -67,16 +70,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
+ /** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */
private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
- private transient SplitReader<S, OUT> reader;
- private transient TimestampedCollector<OUT> collector;
-
private FileInputFormat<OUT> format;
private TypeSerializer<OUT> serializer;
private transient Object checkpointLock;
+ private transient SplitReader<S, OUT> reader;
+ private transient SourceFunction.SourceContext<OUT> readerContext;
private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -92,25 +95,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
public void open() throws Exception {
super.open();
- if (this.serializer == null) {
- throw new IllegalStateException("The serializer has not been set. " +
- "Probably the setOutputType() was not called and this should not have happened. " +
- "Please report it.");
- }
+ checkState(this.reader == null, "The reader is already initialized.");
+ checkState(this.serializer != null, "The serializer has not been set. " +
+ "Probably the setOutputType() was not called. Please report it.");
this.format.setRuntimeContext(getRuntimeContext());
this.format.configure(new Configuration());
-
- this.collector = new TimestampedCollector<>(output);
this.checkpointLock = getContainingTask().getCheckpointLock();
- Preconditions.checkState(reader == null, "The reader is already initialized.");
+ // set the reader context based on the time characteristic
+ final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
+ final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+ this.readerContext = StreamSourceContexts.getSourceContext(
+ timeCharacteristic, getTimerService(), checkpointLock, output, watermarkInterval);
- this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
-
- // the readerState is needed for the initialization of the reader
- // when recovering from a failure. So after the initialization,
- // we can set it to null.
+ // and initialize the split reading thread
+ this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
this.readerState = null;
this.reader.start();
}
@@ -122,7 +122,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
@Override
public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
+ // we do nothing because we emit our own watermarks if needed.
}
@Override
@@ -156,7 +156,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
}
reader = null;
- collector = null;
+ readerContext = null;
+ readerState = null;
format = null;
serializer = null;
}
@@ -177,7 +178,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
// called by the StreamTask while having it.
checkpointLock.wait();
}
- collector.close();
+
+ // finally if we are closed normally and we are operating on
+ // event or ingestion time, emit the max watermark indicating
+ // the end of the stream, like a normal source would do.
+
+ if (readerContext != null) {
+ readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+ readerContext.close();
+ }
+ output.close();
}
private class SplitReader<S extends Serializable, OT> extends Thread {
@@ -188,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private final TypeSerializer<OT> serializer;
private final Object checkpointLock;
- private final TimestampedCollector<OT> collector;
+ private final SourceFunction.SourceContext<OT> readerContext;
private final Queue<FileInputSplit> pendingSplits;
@@ -200,16 +210,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
- TimestampedCollector<OT> collector,
+ SourceFunction.SourceContext<OT> readerContext,
Object checkpointLock,
Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
this.format = checkNotNull(format, "Unspecified FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified Serializer.");
+ this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
+ this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
- this.pendingSplits = new LinkedList<>();
- this.collector = collector;
- this.checkpointLock = checkpointLock;
+ this.pendingSplits = new ArrayDeque<>();
this.isRunning = true;
// this is the case where a task recovers from a previous failed attempt
@@ -219,7 +229,6 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = restoredState.f2;
for (FileInputSplit split : pending) {
- Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
pendingSplits.add(split);
}
@@ -229,9 +238,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
private void addSplit(FileInputSplit split) {
- Preconditions.checkNotNull(split);
+ checkNotNull(split, "Cannot insert a null value in the pending splits queue.");
synchronized (checkpointLock) {
- Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
this.pendingSplits.add(split);
}
}
@@ -267,7 +275,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
checkpointableFormat.reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
- LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
+ LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
format.open(currentSplit);
}
// reset the restored state to null for the next iteration
@@ -299,7 +307,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
synchronized (checkpointLock) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
- collector.collect(nextElement);
+ readerContext.collect(nextElement);
} else {
break;
}
@@ -318,10 +326,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
} catch (Throwable e) {
- if (isRunning) {
- LOG.error("Caught exception processing split: ", currentSplit);
- }
- getContainingTask().failExternally(e);
+ getContainingTask().handleAsyncException("Caught exception when processing split: " + currentSplit, e);
} finally {
synchronized (checkpointLock) {
LOG.info("Reader terminated, and exiting...");
@@ -358,7 +363,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
restoredFormatState;
return new Tuple3<>(snapshot, currentSplit, formatState);
} else {
- LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+ LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
}
} else {
@@ -404,7 +409,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
FileInputSplit currSplit = (FileInputSplit) ois.readObject();
// read the pending splits list
- List<FileInputSplit> pendingSplits = new LinkedList<>();
+ List<FileInputSplit> pendingSplits = new ArrayList<>();
int noOfSplits = ois.readInt();
for (int i = 0; i < noOfSplits; i++) {
FileInputSplit split = (FileInputSplit) ois.readObject();
@@ -416,8 +421,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = (S) ois.readObject();
// set the whole reader state for the open() to find.
- Preconditions.checkState(this.readerState == null,
- "The reader state has already been initialized.");
+ checkState(this.readerState == null, "The reader state has already been initialized.");
this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 22987ab..1409ae4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,11 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
/**
* {@link StreamOperator} for streaming sources.
@@ -57,26 +53,11 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
- final SourceFunction.SourceContext<OUT> ctx;
-
- switch (timeCharacteristic) {
- case EventTime:
- ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
- break;
- case IngestionTime:
- ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
- getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
- break;
- case ProcessingTime:
- ctx = new NonTimestampContext<>(this, lockingObject, collector);
- break;
- default:
- throw new Exception(String.valueOf(timeCharacteristic));
- }
+ final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+
+ this.ctx = StreamSourceContexts.getSourceContext(
+ timeCharacteristic, getTimerService(), lockingObject, collector, watermarkInterval);
- // copy to a field to give the 'cancel()' method access
- this.ctx = ctx;
-
try {
userFunction.run(ctx);
@@ -122,252 +103,4 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
protected boolean isCanceledOrStopped() {
return canceledOrStopped;
}
-
- /**
- * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...)
- * has caused an exception. If one of these threads caused an exception, this method will
- * throw that exception.
- */
- void checkAsyncException() {
- getContainingTask().checkTimerException();
- }
-
- // ------------------------------------------------------------------------
- // Source contexts for various stream time characteristics
- // ------------------------------------------------------------------------
-
- /**
- * A source context that attached {@code -1} as a timestamp to all records, and that
- * does not forward watermarks.
- */
- public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
-
- private final StreamSource<?, ?> owner;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
- this.owner = owner;
- this.lockingObject = lockingObject;
- this.output = output;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- owner.checkAsyncException();
- synchronized (lockingObject) {
- output.collect(reuse.replace(element));
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- // ignore the timestamp
- collect(element);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
- // do nothing else
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {}
- }
-
- /**
- * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
- * and watermark emission.
- */
- public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final StreamSource<?, ?> owner;
- private final TimeServiceProvider timeService;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- private final ScheduledFuture<?> watermarkTimer;
- private final long watermarkInterval;
-
- private volatile long nextWatermarkTime;
-
- public AutomaticWatermarkContext(
- final StreamSource<?, ?> owner,
- final Object lockingObjectParam,
- final Output<StreamRecord<T>> outputParam,
- final long watermarkInterval) {
-
- if (watermarkInterval < 1L) {
- throw new IllegalArgumentException("The watermark interval cannot be smaller than one.");
- }
-
- this.owner = owner;
- this.timeService = owner.getTimerService();
- this.lockingObject = lockingObjectParam;
- this.output = outputParam;
- this.watermarkInterval = watermarkInterval;
- this.reuse = new StreamRecord<T>(null);
-
- long now = this.timeService.getCurrentProcessingTime();
- this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
- new WatermarkEmittingTask(this.timeService, lockingObjectParam, outputParam));
- }
-
- @Override
- public void collect(T element) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- final long currentTime = this.timeService.getCurrentProcessingTime();
- output.collect(reuse.replace(element, currentTime));
-
- // this is to avoid lock contention in the lockingObject by
- // sending the watermark before the firing of the watermark
- // emission task.
-
- if (currentTime > nextWatermarkTime) {
- // in case we jumped some watermarks, recompute the next watermark time
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
- nextWatermarkTime = watermarkTime + watermarkInterval;
- output.emitWatermark(new Watermark(watermarkTime));
-
- // we do not need to register another timer here
- // because the emitting task will do so.
- }
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- collect(element);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
-
- if (mark.getTimestamp() == Long.MAX_VALUE) {
- // allow it since this is the special end-watermark that for example the Kafka source emits
- synchronized (lockingObject) {
- nextWatermarkTime = Long.MAX_VALUE;
- output.emitWatermark(mark);
- }
-
- // we can shutdown the timer now, no watermarks will be needed any more
- watermarkTimer.cancel(true);
- }
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {
- watermarkTimer.cancel(true);
- }
-
- private class WatermarkEmittingTask implements Triggerable {
-
- private final TimeServiceProvider timeService;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
-
- private WatermarkEmittingTask(TimeServiceProvider timeService, Object lock, Output<StreamRecord<T>> output) {
- this.timeService = timeService;
- this.lockingObject = lock;
- this.output = output;
- }
-
- @Override
- public void trigger(long timestamp) {
- final long currentTime = this.timeService.getCurrentProcessingTime();
-
- if (currentTime > nextWatermarkTime) {
- // align the watermarks across all machines. this will ensure that we
- // don't have watermarks that creep along at different intervals because
- // the machine clocks are out of sync
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-
- synchronized (lockingObject) {
- if (currentTime > nextWatermarkTime) {
- output.emitWatermark(new Watermark(watermarkTime));
- nextWatermarkTime += watermarkInterval;
- }
- }
- }
-
- this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + watermarkInterval,
- new WatermarkEmittingTask(this.timeService, lockingObject, output));
- }
- }
- }
-
- /**
- * A SourceContext for event time. Sources may directly attach timestamps and generate
- * watermarks, but if records are emitted without timestamps, no timestamps are automatically
- * generated and attached. The records will simply have no timestamp in that case.
- *
- * Streaming topologies can use timestamp assigner functions to override the timestamps
- * assigned here.
- */
- public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
- private final StreamSource<?, ?> owner;
- private final Object lockingObject;
- private final Output<StreamRecord<T>> output;
- private final StreamRecord<T> reuse;
-
- public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
- this.owner = owner;
- this.lockingObject = lockingObject;
- this.output = output;
- this.reuse = new StreamRecord<T>(null);
- }
-
- @Override
- public void collect(T element) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- output.collect(reuse.replace(element));
- }
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- output.collect(reuse.replace(element, timestamp));
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
-
- synchronized (lockingObject) {
- output.emitWatermark(mark);
- }
- }
-
- @Override
- public Object getCheckpointLock() {
- return lockingObject;
- }
-
- @Override
- public void close() {}
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
new file mode 100644
index 0000000..abaf4e7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Source contexts for various stream time characteristics.
+ */
+public class StreamSourceContexts {
+
+ /**
+ * Depending on the {@link TimeCharacteristic}, this method will return the adequate
+ * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is:
+ * <ul>
+ * <li> {@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}
+ * <li> {@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}
+ * <li> {@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}
+ * </ul>
+ * */
+ public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
+ TimeCharacteristic timeCharacteristic, TimeServiceProvider timeService,
+ Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
+
+ final SourceFunction.SourceContext<OUT> ctx;
+ switch (timeCharacteristic) {
+ case EventTime:
+ ctx = new ManualWatermarkContext<>(checkpointLock, output);
+ break;
+ case IngestionTime:
+ ctx = new AutomaticWatermarkContext<>(timeService, checkpointLock, output, watermarkInterval);
+ break;
+ case ProcessingTime:
+ ctx = new NonTimestampContext<>(checkpointLock, output);
+ break;
+ default:
+ throw new IllegalArgumentException(String.valueOf(timeCharacteristic));
+ }
+ return ctx;
+ }
+
+ /**
+ * A source context that attached {@code -1} as a timestamp to all records, and that
+ * does not forward watermarks.
+ */
+ private static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
+
+ private final Object lock;
+ private final Output<StreamRecord<T>> output;
+ private final StreamRecord<T> reuse;
+
+ private NonTimestampContext(Object checkpointLock, Output<StreamRecord<T>> output) {
+ this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+ this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+ this.reuse = new StreamRecord<>(null);
+ }
+
+ @Override
+ public void collect(T element) {
+ synchronized (lock) {
+ output.collect(reuse.replace(element));
+ }
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ // ignore the timestamp
+ collect(element);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ // do nothing
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ /**
+ * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
+ * and watermark emission.
+ */
+ private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+ private final TimeServiceProvider timeService;
+ private final Object lock;
+ private final Output<StreamRecord<T>> output;
+ private final StreamRecord<T> reuse;
+
+ private final ScheduledFuture<?> watermarkTimer;
+ private final long watermarkInterval;
+
+ private volatile long nextWatermarkTime;
+
+ private AutomaticWatermarkContext(
+ final TimeServiceProvider timeService,
+ final Object checkpointLock,
+ final Output<StreamRecord<T>> output,
+ final long watermarkInterval) {
+
+ this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+ this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+ this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+
+ Preconditions.checkArgument(watermarkInterval > 1L, "The watermark interval cannot be smaller than 1 ms.");
+ this.watermarkInterval = watermarkInterval;
+
+ this.reuse = new StreamRecord<>(null);
+
+ long now = this.timeService.getCurrentProcessingTime();
+ this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
+ new WatermarkEmittingTask(this.timeService, lock, output));
+ }
+
+ @Override
+ public void collect(T element) {
+ synchronized (lock) {
+ final long currentTime = this.timeService.getCurrentProcessingTime();
+ output.collect(reuse.replace(element, currentTime));
+
+ // this is to avoid lock contention in the lockingObject by
+ // sending the watermark before the firing of the watermark
+ // emission task.
+
+ if (currentTime > nextWatermarkTime) {
+ // in case we jumped some watermarks, recompute the next watermark time
+ final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+ nextWatermarkTime = watermarkTime + watermarkInterval;
+ output.emitWatermark(new Watermark(watermarkTime));
+
+ // we do not need to register another timer here
+ // because the emitting task will do so.
+ }
+ }
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ collect(element);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+
+ if (mark.getTimestamp() == Long.MAX_VALUE) {
+ // allow it since this is the special end-watermark that for example the Kafka source emits
+ synchronized (lock) {
+ nextWatermarkTime = Long.MAX_VALUE;
+ output.emitWatermark(mark);
+ }
+
+ // we can shutdown the timer now, no watermarks will be needed any more
+ if (watermarkTimer != null) {
+ watermarkTimer.cancel(true);
+ }
+ }
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {
+ if (watermarkTimer != null) {
+ watermarkTimer.cancel(true);
+ }
+ }
+
+ private class WatermarkEmittingTask implements Triggerable {
+
+ private final TimeServiceProvider timeService;
+ private final Object lock;
+ private final Output<StreamRecord<T>> output;
+
+ private WatermarkEmittingTask(TimeServiceProvider timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
+ this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+ this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+ this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+ }
+
+ @Override
+ public void trigger(long timestamp) {
+ final long currentTime = timeService.getCurrentProcessingTime();
+
+ if (currentTime > nextWatermarkTime) {
+ // align the watermarks across all machines. this will ensure that we
+ // don't have watermarks that creep along at different intervals because
+ // the machine clocks are out of sync
+ final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+
+ synchronized (lock) {
+ if (currentTime > nextWatermarkTime) {
+ output.emitWatermark(new Watermark(watermarkTime));
+ nextWatermarkTime = watermarkTime + watermarkInterval;
+ }
+ }
+ }
+
+ long nextWatermark = currentTime + watermarkInterval;
+ this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output));
+ }
+ }
+ }
+
+ /**
+ * A SourceContext for event time. Sources may directly attach timestamps and generate
+ * watermarks, but if records are emitted without timestamps, no timestamps are automatically
+ * generated and attached. The records will simply have no timestamp in that case.
+ *
+ * Streaming topologies can use timestamp assigner functions to override the timestamps
+ * assigned here.
+ */
+ private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+ private final Object lock;
+ private final Output<StreamRecord<T>> output;
+ private final StreamRecord<T> reuse;
+
+ private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
+ this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+ this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+ this.reuse = new StreamRecord<>(null);
+ }
+
+ @Override
+ public void collect(T element) {
+ synchronized (lock) {
+ output.collect(reuse.replace(element));
+ }
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ synchronized (lock) {
+ output.collect(reuse.replace(element, timestamp));
+ }
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ synchronized (lock) {
+ output.emitWatermark(mark);
+ }
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index c7ec2ed..4c55055 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -18,12 +18,14 @@
package org.apache.flink.streaming.runtime.tasks;
/**
- * Interface for reporting exceptions that are thrown in (possibly) a different thread.
+ * An interface marking a task as capable of handling exceptions thrown
+ * by different threads, other than the one executing the task itself.
*/
public interface AsyncExceptionHandler {
/**
- * Registers the given exception.
+ * Handles an exception thrown by another thread (e.g. a TriggerTask),
+ * other than the one executing the main task.
*/
- void registerAsyncException(AsynchronousException exception);
+ void handleAsyncException(String message, Throwable exception);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index ea2b07f..9534b3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.util.Preconditions;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -49,9 +50,9 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
private DefaultTimeServiceProvider(AsyncExceptionHandler task,
ScheduledExecutorService threadPoolExecutor,
Object checkpointLock) {
- this.task = task;
- this.timerService = threadPoolExecutor;
- this.checkpointLock = checkpointLock;
+ this.task = Preconditions.checkNotNull(task);
+ this.timerService = Preconditions.checkNotNull(threadPoolExecutor);
+ this.checkpointLock = Preconditions.checkNotNull(checkpointLock);
}
@Override
@@ -99,7 +100,7 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
target.trigger(timestamp);
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
- exceptionHandler.registerAsyncException(asyncException);
+ exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
}
}
}
@@ -109,7 +110,7 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
@Override
- public void registerAsyncException(AsynchronousException exception) {
+ public void handleAsyncException(String message, Throwable exception) {
exception.printStackTrace();
}
}, executor, checkpointLock);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d6d2fb5..cf8853e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
final Object lock = getCheckpointLock();
while (running && inputProcessor.processInput(operator, lock)) {
- checkTimerException();
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9802a16..33317fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,11 +159,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private List<Collection<OperatorStateHandle>> lazyRestoreOperatorState;
- /**
- * This field is used to forward an exception that is caught in the timer thread or other
- * asynchronous Threads. Subclasses must ensure that exceptions stored here get thrown on the
- * actual execution Thread. */
- private volatile AsynchronousException asyncException;
/** The currently active background materialization threads */
private final ClosableRegistry cancelables = new ClosableRegistry();
@@ -301,9 +297,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// still let the computation fail
tryDisposeAllOperators();
disposed = true;
-
- // Don't forget to check and throw exceptions that happened in async thread one last time
- checkTimerException();
}
finally {
// clean up everything we initialized
@@ -354,19 +347,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- /**
- * Marks task execution failed for an external reason (a reason other than the task code itself
- * throwing an exception). If the task is already in a terminal state
- * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
- * Otherwise it sets the state to FAILED, and, if the invokable code is running,
- * starts an asynchronous thread that aborts that code.
- *
- * <p>This method never blocks.</p>
- */
- public void failExternally(Throwable cause) {
- getEnvironment().failExternally(cause);
- }
-
@Override
public final void cancel() throws Exception {
isRunning = false;
@@ -898,27 +878,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
/**
- * Check whether an exception was thrown in a Thread other than the main Thread. (For example
- * in the processing-time trigger Thread). This will rethrow that exception in case on
- * occurred.
+ * Handles an exception thrown by another thread (e.g. a TriggerTask),
+ * other than the one executing the main task by failing the task entirely.
+ *
+ * In more detail, it marks task execution failed for an external reason
+ * (a reason other than the task code itself throwing an exception). If the task
+ * is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the
+ * task is already canceling this does nothing. Otherwise it sets the state to
+ * FAILED, and, if the invokable code is running, starts an asynchronous thread
+ * that aborts that code.
*
- * <p>This must be called in the main loop of {@code StreamTask} subclasses to ensure
- * that we propagate failures.
+ * <p>This method never blocks.</p>
*/
- public void checkTimerException() throws AsynchronousException {
- if (asyncException != null) {
- throw asyncException;
- }
- }
-
@Override
- public void registerAsyncException(AsynchronousException exception) {
- if (isRunning) {
- LOG.error("Asynchronous exception registered.", exception);
- }
- if (this.asyncException == null) {
- this.asyncException = exception;
- }
+ public void handleAsyncException(String message, Throwable exception) {
+ getEnvironment().failExternally(exception);
}
// ------------------------------------------------------------------------
@@ -1030,7 +1004,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
catch (Exception e) {
// registers the exception and tries to fail the whole task
AsynchronousException asyncException = new AsynchronousException(e);
- owner.registerAsyncException(asyncException);
+ owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
}
finally {
cancelables.unregisterClosable(this);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..0197c53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
final Object lock = getCheckpointLock();
while (running && inputProcessor.processInput(operator, lock)) {
- checkTimerException();
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index e8663f5..10b30d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -187,12 +188,11 @@ public class StreamSourceOperatorTest {
final List<StreamElement> output = new ArrayList<>();
- StreamSource.AutomaticWatermarkContext<String> ctx =
- new StreamSource.AutomaticWatermarkContext<>(
- operator,
- operator.getContainingTask().getCheckpointLock(),
- new CollectorOutput<String>(output),
- operator.getExecutionConfig().getAutoWatermarkInterval());
+ StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
+ operator.getContainingTask().getTimerService(),
+ operator.getContainingTask().getCheckpointLock(),
+ new CollectorOutput<String>(output),
+ operator.getExecutionConfig().getAutoWatermarkInterval());
// periodically emit the watermarks
// even though we start from 1 the watermark are still
@@ -218,7 +218,7 @@ public class StreamSourceOperatorTest {
private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
TimeCharacteristic timeChar,
long watermarkInterval,
- final TimeServiceProvider timeProvider) {
+ final TestTimeServiceProvider timeProvider) {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setAutoWatermarkInterval(watermarkInterval);
@@ -241,9 +241,6 @@ public class StreamSourceOperatorTest {
doAnswer(new Answer<TimeServiceProvider>() {
@Override
public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
- if (timeProvider == null) {
- throw new RuntimeException("The time provider is null.");
- }
return timeProvider;
}
}).when(mockTask).getTimerService();
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 60850d8..0351978 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
@@ -39,6 +38,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
@@ -52,28 +52,24 @@ public class TimeProviderTest {
final OneShotLatch latch = new OneShotLatch();
final Object lock = new Object();
- TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider.create(
- new AsyncExceptionHandler() {
- @Override
- public void registerAsyncException(AsynchronousException exception) {
- exception.printStackTrace();
- }
- },
- Executors.newSingleThreadScheduledExecutor(),
- lock);
+ TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
+ .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
final List<Long> timestamps = new ArrayList<>();
- long start = System.currentTimeMillis();
long interval = 50L;
-
final long noOfTimers = 20;
// we add 2 timers per iteration minus the first that would have a negative timestamp
- final long expectedNoOfTimers = 2 * noOfTimers - 1;
+ final long expectedNoOfTimers = 2 * noOfTimers;
for (int i = 0; i < noOfTimers; i++) {
- double nextTimer = start + i * interval;
+
+ // we add a delay (100ms) so that both timers are inserted before the first is processed.
+ // If not, and given that we add timers out of order, we may have a timer firing
+ // before the next one (with smaller timestamp) is added.
+
+ double nextTimer = timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval;
timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() {
@Override
@@ -88,17 +84,15 @@ public class TimeProviderTest {
// add also out-of-order tasks to verify that eventually
// they will be executed in the correct order.
- if (i > 0) {
- timeServiceProvider.registerTimer((long) (nextTimer - 10), new Triggerable() {
- @Override
- public void trigger(long timestamp) throws Exception {
- timestamps.add(timestamp);
- if (timestamps.size() == expectedNoOfTimers) {
- latch.trigger();
- }
+ timeServiceProvider.registerTimer((long) (nextTimer - 10L), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ timestamps.add(timestamp);
+ if (timestamps.size() == expectedNoOfTimers) {
+ latch.trigger();
}
- });
- }
+ }
+ });
}
if (!latch.isTriggered()) {
@@ -114,15 +108,46 @@ public class TimeProviderTest {
long lastTs = Long.MIN_VALUE;
for (long timestamp: timestamps) {
Assert.assertTrue(timestamp >= lastTs);
+ if (lastTs != Long.MIN_VALUE && counter % 2 == 1) {
+ Assert.assertEquals((timestamp - lastTs), 10);
+ }
lastTs = timestamp;
-
- long expectedTs = start + (counter/2) * interval;
- Assert.assertEquals(timestamp, (expectedTs + ((counter % 2 == 0) ? 0 : 40)));
counter++;
}
}
@Test
+ public void testDefaultTimeProviderExceptionHandling() throws InterruptedException {
+ final OneShotLatch latch = new OneShotLatch();
+
+ final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+
+ final Object lock = new Object();
+
+ TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
+ .create(new AsyncExceptionHandler() {
+ @Override
+ public void handleAsyncException(String message, Throwable exception) {
+ exceptionWasThrown.compareAndSet(false, true);
+ latch.trigger();
+ }
+ }, Executors.newSingleThreadScheduledExecutor(), lock);
+
+ long now = System.currentTimeMillis();
+ timeServiceProvider.registerTimer(now, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ throw new Exception("Exception in Timer");
+ }
+ });
+
+ if (!latch.isTriggered()) {
+ latch.await();
+ }
+ Assert.assertTrue(exceptionWasThrown.get());
+ }
+
+ @Test
public void testTimerSorting() throws Exception {
final List<Long> result = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index f33da89..30f38e3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -183,13 +183,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowTriggerTimeAlignment() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+ TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
Executors.newSingleThreadScheduledExecutor(), lock);
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
AccumulatingProcessingTimeWindowOperator<String, String, String> op;
@@ -201,6 +201,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
+ timerService.shutdownService();
+ timerService = DefaultTimeServiceProvider.createForTesting(
+ Executors.newSingleThreadScheduledExecutor(), lock);
+ mockTask = createMockTaskWithTimer(timerService, lock);
+
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -209,6 +214,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
+ timerService.shutdownService();
+ timerService = DefaultTimeServiceProvider.createForTesting(
+ Executors.newSingleThreadScheduledExecutor(), lock);
+ mockTask = createMockTaskWithTimer(timerService, lock);
+
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -217,6 +227,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
+ timerService.shutdownService();
+ timerService = DefaultTimeServiceProvider.createForTesting(
+ Executors.newSingleThreadScheduledExecutor(), lock);
+ mockTask = createMockTaskWithTimer(timerService, lock);
+
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -243,7 +258,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
final int windowSize = 50;
final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -297,7 +312,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -359,7 +374,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -416,7 +431,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -743,7 +758,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static StreamTask<?, ?> createMockTask() {
+ private static StreamTask<?, ?> createMockTask(Object lock) {
Configuration configuration = new Configuration();
configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
@@ -751,6 +766,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
when(task.getName()).thenReturn("Test task name");
when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(task.getCheckpointLock()).thenReturn(lock);
final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -765,9 +781,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
private static StreamTask<?, ?> createMockTaskWithTimer(
- final TimeServiceProvider timerService)
+ final TimeServiceProvider timerService, final Object lock)
{
- StreamTask<?, ?> mockTask = createMockTask();
+ StreamTask<?, ?> mockTask = createMockTask(lock);
when(mockTask.getTimerService()).thenReturn(timerService);
return mockTask;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 826b230..7539c2d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -191,13 +191,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowTriggerTimeAlignment() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+ TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
Executors.newSingleThreadScheduledExecutor(), lock);
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
AggregatingProcessingTimeWindowOperator<String, String> op;
@@ -209,6 +209,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
+ timerService.shutdownService();
+ timerService = DefaultTimeServiceProvider.createForTesting(
+ Executors.newSingleThreadScheduledExecutor(), lock);
+ mockTask = createMockTaskWithTimer(timerService, lock);
+
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -217,6 +222,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
+ timerService.shutdownService();
+ timerService = DefaultTimeServiceProvider.createForTesting(
+ Executors.newSingleThreadScheduledExecutor(), lock);
+ mockTask = createMockTaskWithTimer(timerService, lock);
+
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -225,6 +235,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
+ timerService.shutdownService();
+ timerService = DefaultTimeServiceProvider.createForTesting(
+ Executors.newSingleThreadScheduledExecutor(), lock);
+ mockTask = createMockTaskWithTimer(timerService, lock);
+
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -257,7 +272,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
op.open();
@@ -309,7 +324,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSize = 50;
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
@@ -377,7 +392,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -448,7 +463,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -508,7 +523,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
@@ -929,7 +944,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static StreamTask<?, ?> createMockTask() {
+ private static StreamTask<?, ?> createMockTask(Object lock) {
Configuration configuration = new Configuration();
configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
@@ -937,6 +952,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
when(task.getName()).thenReturn("Test task name");
when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(task.getCheckpointLock()).thenReturn(lock);
final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -947,9 +963,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
return task;
}
- private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService)
+ private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService, final Object lock)
{
- StreamTask<?, ?> mockTask = createMockTask();
+ StreamTask<?, ?> mockTask = createMockTask(lock);
when(mockTask.getTimerService()).thenReturn(timerService);
return mockTask;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index f638ddd..9b773d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment {
private final ExecutionConfig executionConfig;
+ private volatile boolean wasFailedExternally = false;
+
public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.taskInfo = new TaskInfo("", 1, 0, 1, 0);
@@ -325,7 +327,11 @@ public class StreamMockEnvironment implements Environment {
@Override
public void failExternally(Throwable cause) {
- throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
+ this.wasFailedExternally = true;
+ }
+
+ public boolean wasFailedExternally() {
+ return wasFailedExternally;
}
@Override