You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/05 15:57:12 UTC
flink git commit: [FLINK-4329] Fix Streaming File Source
Timestamps/Watermarks Handling
Repository: flink
Updated Branches:
refs/heads/release-1.1 fddd89bcd -> bab59dfa7
[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling
This closes #2593.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bab59dfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bab59dfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bab59dfa
Branch: refs/heads/release-1.1
Commit: bab59dfa7cf94f4c392c3205ee180e72e1ad7814
Parents: fddd89b
Author: kl0u <kk...@gmail.com>
Authored: Tue Oct 4 15:27:59 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Oct 5 17:52:26 2016 +0200
----------------------------------------------------------------------
.../ContinuousFileMonitoringFunctionITCase.java | 15 +-
.../hdfstests/ContinuousFileMonitoringTest.java | 240 +++++++++++++++----
.../source/ContinuousFileReaderOperator.java | 108 +++++----
.../api/operators/AsyncExceptionChecker.java | 27 +++
.../streaming/api/operators/StreamSource.java | 53 ++--
.../util/OneInputStreamOperatorTestHarness.java | 9 +
6 files changed, 335 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index e6cd5d9..dd4dc5a 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
- TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);
+ TestingSinkFunction sink = new TestingSinkFunction();
DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
@@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
private static class TestingSinkFunction extends RichSinkFunction<String> {
- private final ContinuousFileMonitoringFunction src;
-
private int elementCounter = 0;
private Map<Integer, Integer> elementCounters = new HashMap<>();
private Map<Integer, List<String>> collectedContent = new HashMap<>();
- TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) {
- this.src = monitoringFunction;
- }
-
@Override
public void open(Configuration parameters) throws Exception {
// this sink can only work with DOP 1
@@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
}
expectedContents.clear();
-
- src.cancel();
- try {
- src.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
}
private int getLineNo(String line) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index def9378..9358ba9 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
@@ -51,8 +53,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
public class ContinuousFileMonitoringTest {
@@ -106,10 +108,162 @@ public class ContinuousFileMonitoringTest {
// TESTS
@Test
+ public void testFileReadingOperatorWithIngestionTime() throws Exception {
+ Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+ Map<Integer, String> expectedFileContents = new HashMap<>();
+
+ for (int i = 0; i < NO_OF_FILES; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+ filesCreated.add(file.f0);
+ expectedFileContents.put(i, file.f1);
+ }
+
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+ final long watermarkInterval = 10;
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+ ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+ reader.setOutputType(typeInfo, executionConfig);
+
+ final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+ final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
+ new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
+ tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ tester.open();
+
+ Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
+
+ // test that watermarks are correctly emitted
+
+ ConcurrentLinkedQueue<Object> output = tester.getOutput();
+
+ timeServiceProvider.setCurrentTime(201);
+ Assert.assertTrue(output.peek() instanceof Watermark);
+ Assert.assertEquals(200, ((Watermark) output.poll()).getTimestamp());
+
+ timeServiceProvider.setCurrentTime(301);
+ Assert.assertTrue(output.peek() instanceof Watermark);
+ Assert.assertEquals(300, ((Watermark) output.poll()).getTimestamp());
+
+ timeServiceProvider.setCurrentTime(401);
+ Assert.assertTrue(output.peek() instanceof Watermark);
+ Assert.assertEquals(400, ((Watermark) output.poll()).getTimestamp());
+
+ timeServiceProvider.setCurrentTime(501);
+ Assert.assertTrue(output.peek() instanceof Watermark);
+ Assert.assertEquals(500, ((Watermark) output.poll()).getTimestamp());
+
+ Assert.assertTrue(output.isEmpty());
+
+ // create the necessary splits for the test
+ FileInputSplit[] splits = format.createInputSplits(
+ reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+ // and feed them to the operator
+ Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+ long lastSeenWatermark = Long.MIN_VALUE;
+ int lineCounter = 0; // counter for the lines read from the splits
+ int watermarkCounter = 0;
+
+ for (FileInputSplit split: splits) {
+
+ // set the next "current processing time".
+ long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+ timeServiceProvider.setCurrentTime(nextTimestamp);
+
+ // send the next split to be read and wait until it is fully read, the +1 is for the watermark.
+ tester.processElement(new StreamRecord<>(split));
+
+ // NOTE: the following check works because each file fits in one split.
+ // In other case it would fail and wait forever.
+ // BUT THIS IS JUST FOR THIS TEST
+ while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+ Thread.sleep(10);
+ }
+
+ // verify that the results are the expected
+ for (Object line: tester.getOutput()) {
+ if (line instanceof StreamRecord) {
+ StreamRecord<String> element = (StreamRecord<String>) line;
+ lineCounter++;
+
+ Assert.assertEquals(nextTimestamp, element.getTimestamp());
+
+ int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+ List<String> content = actualFileContents.get(fileIdx);
+ if (content == null) {
+ content = new ArrayList<>();
+ actualFileContents.put(fileIdx, content);
+ }
+ content.add(element.getValue() + "\n");
+ } else if (line instanceof Watermark) {
+ long watermark = ((Watermark) line).getTimestamp();
+
+ Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
+ Assert.assertTrue(watermark > lastSeenWatermark);
+ watermarkCounter++;
+
+ lastSeenWatermark = watermark;
+ } else {
+ Assert.fail("Unknown element in the list.");
+ }
+ }
+
+ // clean the output to be ready for the next split
+ tester.getOutput().clear();
+ }
+
+ // now we are processing one split after the other,
+ // so all the elements must be here by now.
+ Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+ // because we expect one watermark per split.
+ Assert.assertEquals(splits.length, watermarkCounter);
+
+ // then close the reader gracefully so that the Long.MAX watermark is emitted
+ synchronized (tester.getCheckpointLock()) {
+ tester.close();
+ }
+
+ for (org.apache.hadoop.fs.Path file: filesCreated) {
+ hdfs.delete(file, false);
+ }
+
+ // check if the last element is the LongMax watermark (by now this must be the only element)
+ Assert.assertEquals(1, tester.getOutput().size());
+ Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
+ Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
+
+ // check if the elements are the expected ones.
+ Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
+ for (Integer fileIdx: expectedFileContents.keySet()) {
+ Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
+
+ List<String> cntnt = actualFileContents.get(fileIdx);
+ Collections.sort(cntnt, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return getLineNo(o1) - getLineNo(o2);
+ }
+ });
+
+ StringBuilder cntntStr = new StringBuilder();
+ for (String line: cntnt) {
+ cntntStr.append(line);
+ }
+ Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
+ }
+ }
+
+ @Test
public void testFileReadingOperator() throws Exception {
Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
- for(int i = 0; i < NO_OF_FILES; i++) {
+ for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
filesCreated.add(file.f0);
expectedFileContents.put(i, file.f1);
@@ -119,10 +273,11 @@ public class ContinuousFileMonitoringTest {
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+ reader.setOutputType(typeInfo, new ExecutionConfig());
+
OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
new OneInputStreamOperatorTestHarness<>(reader);
-
- reader.setOutputType(typeInfo, new ExecutionConfig());
+ tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
tester.open();
// create the necessary splits for the test
@@ -130,42 +285,42 @@ public class ContinuousFileMonitoringTest {
reader.getRuntimeContext().getNumberOfParallelSubtasks());
// and feed them to the operator
- for(FileInputSplit split: splits) {
+ for (FileInputSplit split: splits) {
tester.processElement(new StreamRecord<>(split));
}
- // then close the reader gracefully
+ // then close the reader gracefully (and wait to finish reading)
synchronized (tester.getCheckpointLock()) {
tester.close();
}
- /*
- * Given that the reader is multithreaded, the test finishes before the reader thread finishes
- * reading. This results in files being deleted by the test before being read, thus throwing an exception.
- * In addition, even if file deletion happens at the end, the results are not ready for testing.
- * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
- */
+ // the lines received must be the elements in the files +1 for for the longMax watermark
+ // we are in event time, which emits no watermarks, so the last watermark will mark the
+ // of the input stream.
- long start = System.currentTimeMillis();
- Queue<Object> output;
- do {
- output = tester.getOutput();
- Thread.sleep(50);
- } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+ Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size());
Map<Integer, List<String>> actualFileContents = new HashMap<>();
- for(Object line: tester.getOutput()) {
- StreamRecord<String> element = (StreamRecord<String>) line;
-
- int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
- List<String> content = actualFileContents.get(fileIdx);
- if(content == null) {
- content = new ArrayList<>();
- actualFileContents.put(fileIdx, content);
+ Object lastElement = null;
+ for (Object line: tester.getOutput()) {
+ lastElement = line;
+ if (line instanceof StreamRecord) {
+ StreamRecord<String> element = (StreamRecord<String>) line;
+
+ int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+ List<String> content = actualFileContents.get(fileIdx);
+ if (content == null) {
+ content = new ArrayList<>();
+ actualFileContents.put(fileIdx, content);
+ }
+ content.add(element.getValue() + "\n");
}
- content.add(element.getValue() +"\n");
}
+ // check if the last element is the LongMax watermark
+ Assert.assertTrue(lastElement instanceof Watermark);
+ Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
+
Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
for (Integer fileIdx: expectedFileContents.keySet()) {
Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
@@ -185,7 +340,7 @@ public class ContinuousFileMonitoringTest {
Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
}
- for(org.apache.hadoop.fs.Path file: filesCreated) {
+ for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
@@ -223,13 +378,13 @@ public class ContinuousFileMonitoringTest {
monitoringFunction.open(new Configuration());
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
- Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
- for(int i = 0; i < NO_OF_FILES; i++) {
+ Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
+ for (int i = 0; i < NO_OF_FILES; i++) {
org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
}
- for(org.apache.hadoop.fs.Path file: filesCreated) {
+ for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
@@ -255,9 +410,10 @@ public class ContinuousFileMonitoringTest {
uniqFilesFound.wait(7 * INTERVAL);
}
}
+ fc.join();
- Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
- Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+ Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
+ Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
Set<String> fileNamesCreated = new HashSet<>();
@@ -265,11 +421,11 @@ public class ContinuousFileMonitoringTest {
fileNamesCreated.add(path.toString());
}
- for(String file: uniqFilesFound) {
+ for (String file: uniqFilesFound) {
Assert.assertTrue(fileNamesCreated.contains(file));
}
- for(org.apache.hadoop.fs.Path file: filesCreated) {
+ for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
@@ -301,7 +457,7 @@ public class ContinuousFileMonitoringTest {
// wait until all the files are created
fc.join();
- Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+ Assert.assertEquals(NO_OF_FILES, filesCreated.size());
Set<String> fileNamesCreated = new HashSet<>();
for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
@@ -309,11 +465,11 @@ public class ContinuousFileMonitoringTest {
}
Assert.assertTrue(uniqFilesFound.size() >= 1 && uniqFilesFound.size() < fileNamesCreated.size());
- for(String file: uniqFilesFound) {
+ for (String file: uniqFilesFound) {
Assert.assertTrue(fileNamesCreated.contains(file));
}
- for(org.apache.hadoop.fs.Path file: filesCreated) {
+ for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
@@ -322,7 +478,7 @@ public class ContinuousFileMonitoringTest {
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
- Assert.assertTrue(tkns.length == 6);
+ Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
@@ -345,7 +501,7 @@ public class ContinuousFileMonitoringTest {
public void run() {
try {
- for(int i = 0; i < NO_OF_FILES; i++) {
+ for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
fillWithData(hdfsURI, "file", i, "This is test line.");
@@ -427,12 +583,12 @@ public class ContinuousFileMonitoringTest {
assert (hdfs != null);
org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
- Assert.assertTrue (!hdfs.exists(file));
+ Assert.assertFalse(hdfs.exists(file));
org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
FSDataOutputStream stream = hdfs.create(tmp);
StringBuilder str = new StringBuilder();
- for(int i = 0; i < LINES_PER_FILE; i++) {
+ for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx +": "+ sampleLine + " " + i +"\n";
str.append(line);
stream.write(line.getBytes());
http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index fda5efd..923943f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -29,15 +29,15 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AsyncExceptionChecker;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,39 +46,44 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors
- * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another
- * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from
- * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()}
- * so that the checkpoints reflect the current state.
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism
+ * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has
+ * a parallelism of 1.
+ * <p/>
+ * This operator will receive the split descriptors, put them in a queue, and have another
+ * thread read the actual data from the split. This architecture allows the separation of the
+ * reading thread, from the one emitting the checkpoint barriers, thus removing any potential
+ * back-pressure.
*/
@Internal
public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+ implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
+ /** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */
private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
- private transient SplitReader<S, OUT> reader;
- private transient TimestampedCollector<OUT> collector;
-
private FileInputFormat<OUT> format;
private TypeSerializer<OUT> serializer;
private transient Object checkpointLock;
+ private transient SplitReader<S, OUT> reader;
+ private transient SourceFunction.SourceContext<OUT> readerContext;
private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -94,25 +99,34 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
public void open() throws Exception {
super.open();
- if (this.serializer == null) {
- throw new IllegalStateException("The serializer has not been set. " +
- "Probably the setOutputType() was not called and this should not have happened. " +
- "Please report it.");
- }
+ checkState(this.reader == null, "The reader is already initialized.");
+ checkState(this.serializer != null, "The serializer has not been set. " +
+ "Probably the setOutputType() was not called. Please report it.");
this.format.setRuntimeContext(getRuntimeContext());
this.format.configure(new Configuration());
-
- this.collector = new TimestampedCollector<>(output);
this.checkpointLock = getContainingTask().getCheckpointLock();
- Preconditions.checkState(reader == null, "The reader is already initialized.");
-
- this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
+ // set the reader context based on the time characteristic
+ final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
+
+ switch (timeCharacteristic) {
+ case EventTime:
+ this.readerContext = new StreamSource.ManualWatermarkContext<>(this, this.checkpointLock, this.output);
+ break;
+ case IngestionTime:
+ final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+ this.readerContext = new StreamSource.AutomaticWatermarkContext<>(this, this.checkpointLock, this.output, watermarkInterval);
+ break;
+ case ProcessingTime:
+ this.readerContext = new StreamSource.NonTimestampContext<>(this, this.checkpointLock, this.output);
+ break;
+ default:
+ throw new Exception(String.valueOf(timeCharacteristic));
+ }
- // the readerState is needed for the initialization of the reader
- // when recovering from a failure. So after the initialization,
- // we can set it to null.
+ // and initialize the split reading thread
+ this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
this.readerState = null;
this.reader.start();
}
@@ -124,7 +138,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
@Override
public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
+ // we do nothing because we emit our own watermarks if needed.
}
@Override
@@ -158,7 +172,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
}
reader = null;
- collector = null;
+ readerContext = null;
+ readerState = null;
format = null;
serializer = null;
}
@@ -179,7 +194,21 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
// called by the StreamTask while having it.
checkpointLock.wait();
}
- collector.close();
+
+ // finally if we are closed normally and we are operating on
+ // event or ingestion time, emit the max watermark indicating
+ // the end of the stream, like a normal source would do.
+
+ if (readerContext != null) {
+ readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+ readerContext.close();
+ }
+ output.close();
+ }
+
+ @Override
+ public void checkAsyncException() {
+ // do nothing
}
private class SplitReader<S extends Serializable, OT> extends Thread {
@@ -190,7 +219,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private final TypeSerializer<OT> serializer;
private final Object checkpointLock;
- private final TimestampedCollector<OT> collector;
+ private final SourceFunction.SourceContext<OT> readerContext;
private final Queue<FileInputSplit> pendingSplits;
@@ -202,16 +231,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
- TimestampedCollector<OT> collector,
+ SourceFunction.SourceContext<OT> readerContext,
Object checkpointLock,
Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
this.format = checkNotNull(format, "Unspecified FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified Serializer.");
+ this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
+ this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
- this.pendingSplits = new LinkedList<>();
- this.collector = collector;
- this.checkpointLock = checkpointLock;
+ this.pendingSplits = new ArrayDeque<>();
this.isRunning = true;
// this is the case where a task recovers from a previous failed attempt
@@ -221,7 +250,6 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = restoredState.f2;
for (FileInputSplit split : pending) {
- Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
pendingSplits.add(split);
}
@@ -231,9 +259,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
private void addSplit(FileInputSplit split) {
- Preconditions.checkNotNull(split);
+ checkNotNull(split, "Cannot insert a null value in the pending splits queue.");
synchronized (checkpointLock) {
- Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
this.pendingSplits.add(split);
}
}
@@ -269,7 +296,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
checkpointableFormat.reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
- LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
+ LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
format.open(currentSplit);
}
// reset the restored state to null for the next iteration
@@ -301,7 +328,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
synchronized (checkpointLock) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
- collector.collect(nextElement);
+ readerContext.collect(nextElement);
} else {
break;
}
@@ -360,7 +387,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
restoredFormatState;
return new Tuple3<>(snapshot, currentSplit, formatState);
} else {
- LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+ LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
}
} else {
@@ -431,8 +458,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = (S) ois.readObject();
// set the whole reader state for the open() to find.
- Preconditions.checkState(this.readerState == null,
- "The reader state has already been initialized.");
+ checkState(this.readerState == null, "The reader state has already been initialized.");
this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
div.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
new file mode 100644
index 0000000..12018a7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * An interface used by sources that may throw exceptions
+ * asynchronously like the {@link StreamSource}.
+ */
+public interface AsyncExceptionChecker {
+
+ /** Checks if an asynchronous exception was thrown. */
+ void checkAsyncException();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 38c948b..e914240 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledFuture;
*/
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
- extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
+ extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT>, AsyncExceptionChecker {
private static final long serialVersionUID = 1L;
@@ -70,7 +70,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
ctx = new NonTimestampContext<>(this, lockingObject, collector);
break;
default:
- throw new Exception(String.valueOf(timeCharacteristic));
+ throw new Exception("Invalid time characteristic: " + String.valueOf(timeCharacteristic));
}
// copy to a field to give the 'cancel()' method access
@@ -127,7 +127,8 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
* has caused an exception. If one of these threads caused an exception, this method will
* throw that exception.
*/
- void checkAsyncException() {
+ @Override
+ public void checkAsyncException() {
getContainingTask().checkTimerException();
}
@@ -141,12 +142,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
*/
public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
- private final StreamSource<?, ?> owner;
+ private final AsyncExceptionChecker owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
- public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
+ public NonTimestampContext(AsyncExceptionChecker owner, Object lockingObject, Output<StreamRecord<T>> output) {
this.owner = owner;
this.lockingObject = lockingObject;
this.output = output;
@@ -188,18 +189,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
*/
public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
- private final StreamSource<?, ?> owner;
+ private final AbstractStreamOperator<T> owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
-
+ private final AsyncExceptionChecker source;
+
private final ScheduledFuture<?> watermarkTimer;
private final long watermarkInterval;
private volatile long nextWatermarkTime;
public AutomaticWatermarkContext(
- final StreamSource<?, ?> owner,
+ final AbstractStreamOperator<T> owner,
final Object lockingObjectParam,
final Output<StreamRecord<T>> outputParam,
final long watermarkInterval) {
@@ -214,6 +216,16 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord<T>(null);
+ // if it is a source, then we cast and cache it
+ // here so that we do not have to do it in every collect(),
+ // collectWithTimestamp() and emitWatermark()
+
+ if (!(owner instanceof AsyncExceptionChecker)) {
+ throw new IllegalStateException("The AutomaticWatermarkContext can only be used " +
+ "with sources that implement the AsyncExceptionChecker interface.");
+ }
+ this.source = (AsyncExceptionChecker) owner;
+
long now = owner.getCurrentProcessingTime();
this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
@@ -221,7 +233,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void collect(T element) {
- owner.checkAsyncException();
+ source.checkAsyncException();
synchronized (lockingObject) {
final long currentTime = owner.getCurrentProcessingTime();
@@ -250,7 +262,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void emitWatermark(Watermark mark) {
- owner.checkAsyncException();
+ source.checkAsyncException();
if (mark.getTimestamp() == Long.MAX_VALUE) {
// allow it since this is the special end-watermark that for example the Kafka source emits
@@ -260,7 +272,9 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
}
// we can shutdown the timer now, no watermarks will be needed any more
- watermarkTimer.cancel(true);
+ if (watermarkTimer != null) {
+ watermarkTimer.cancel(true);
+ }
}
}
@@ -271,16 +285,18 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void close() {
- watermarkTimer.cancel(true);
+ if (watermarkTimer != null) {
+ watermarkTimer.cancel(true);
+ }
}
private class WatermarkEmittingTask implements Triggerable {
- private final StreamSource<?, ?> owner;
+ private final AbstractStreamOperator<T> owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
- private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) {
+ private WatermarkEmittingTask(AbstractStreamOperator<T> src, Object lock, Output<StreamRecord<T>> output) {
this.owner = src;
this.lockingObject = lock;
this.output = output;
@@ -299,7 +315,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
synchronized (lockingObject) {
if (currentTime > nextWatermarkTime) {
output.emitWatermark(new Watermark(watermarkTime));
- nextWatermarkTime += watermarkInterval;
+ nextWatermarkTime = watermarkTime + watermarkInterval;
}
}
}
@@ -320,12 +336,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
*/
public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
- private final StreamSource<?, ?> owner;
+ private final AsyncExceptionChecker owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
- public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
+ public ManualWatermarkContext(AsyncExceptionChecker owner, Object lockingObject, Output<StreamRecord<T>> output) {
this.owner = owner;
this.lockingObject = lockingObject;
this.output = output;
@@ -335,7 +351,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void collect(T element) {
owner.checkAsyncException();
-
synchronized (lockingObject) {
output.collect(reuse.replace(element));
}
@@ -344,7 +359,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void collectWithTimestamp(T element, long timestamp) {
owner.checkAsyncException();
-
synchronized (lockingObject) {
output.collect(reuse.replace(element, timestamp));
}
@@ -353,7 +367,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void emitWatermark(Watermark mark) {
owner.checkAsyncException();
-
synchronized (lockingObject) {
output.emitWatermark(mark);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 40086c5..5708639 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -156,6 +157,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}).when(mockTask).getCurrentProcessingTime();
}
+ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+ this.config.setTimeCharacteristic(timeCharacteristic);
+ }
+
+ public TimeCharacteristic getTimeCharacteristic() {
+ return this.config.getTimeCharacteristic();
+ }
+
public void setStateBackend(AbstractStateBackend stateBackend) {
this.stateBackend = stateBackend;
}