You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/27 12:22:24 UTC
[2/3] flink git commit: [FLINK-4800] Introduce the
TimestampedFileInputSplit for Continuous File Processing
http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 2f0a16a..c8e9846 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -22,11 +22,9 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -43,44 +41,42 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.PriorityQueue;
import java.util.Queue;
+import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism
- * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has
- * a parallelism of 1.
+ * The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding
+ * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction}
+ * which has a parallelism of 1, this operator can have DOP > 1.
* <p/>
- * This operator will receive the split descriptors, put them in a queue, and have another
- * thread read the actual data from the split. This architecture allows the separation of the
- * reading thread, from the one emitting the checkpoint barriers, thus removing any potential
+ * As soon as a split descriptor is received, it is put in a queue, and have another
+ * thread read the actual data of the split. This architecture allows the separation of the
+ * reading thread from the one emitting the checkpoint barriers, thus removing any potential
* back-pressure.
*/
@Internal
-public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
+public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
- /** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */
- private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
-
private FileInputFormat<OUT> format;
private TypeSerializer<OUT> serializer;
private transient Object checkpointLock;
- private transient SplitReader<S, OUT> reader;
+ private transient SplitReader<OUT> reader;
private transient SourceFunction.SourceContext<OUT> readerContext;
- private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
+ private List<TimestampedFileInputSplit> restoredReaderState;
public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
this.format = checkNotNull(format);
@@ -110,13 +106,13 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
// and initialize the split reading thread
- this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
- this.readerState = null;
+ this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
+ this.restoredReaderState = null;
this.reader.start();
}
@Override
- public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
+ public void processElement(StreamRecord<TimestampedFileInputSplit> element) throws Exception {
reader.addSplit(element.getValue());
}
@@ -157,7 +153,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
reader = null;
readerContext = null;
- readerState = null;
+ restoredReaderState = null;
format = null;
serializer = null;
}
@@ -190,7 +186,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
output.close();
}
- private class SplitReader<S extends Serializable, OT> extends Thread {
+ private class SplitReader<OT> extends Thread {
private volatile boolean isRunning;
@@ -200,44 +196,39 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private final Object checkpointLock;
private final SourceFunction.SourceContext<OT> readerContext;
- private final Queue<FileInputSplit> pendingSplits;
-
- private FileInputSplit currentSplit = null;
+ private final Queue<TimestampedFileInputSplit> pendingSplits;
- private S restoredFormatState = null;
+ private TimestampedFileInputSplit currentSplit;
- private volatile boolean isSplitOpen = false;
+ private volatile boolean isSplitOpen;
private SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
SourceFunction.SourceContext<OT> readerContext,
Object checkpointLock,
- Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
+ List<TimestampedFileInputSplit> restoredState) {
this.format = checkNotNull(format, "Unspecified FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified Serializer.");
this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
- this.pendingSplits = new ArrayDeque<>();
this.isRunning = true;
- // this is the case where a task recovers from a previous failed attempt
- if (restoredState != null) {
- List<FileInputSplit> pending = restoredState.f0;
- FileInputSplit current = restoredState.f1;
- S formatState = restoredState.f2;
-
- for (FileInputSplit split : pending) {
- pendingSplits.add(split);
+ this.pendingSplits = new PriorityQueue<>(10, new Comparator<TimestampedFileInputSplit>() {
+ @Override
+ public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) {
+ return o1.compareTo(o2);
}
+ });
- this.currentSplit = current;
- this.restoredFormatState = formatState;
+ // this is the case where a task recovers from a previous failed attempt
+ if (restoredState != null) {
+ this.pendingSplits.addAll(restoredState);
}
}
- private void addSplit(FileInputSplit split) {
+ private void addSplit(TimestampedFileInputSplit split) {
checkNotNull(split, "Cannot insert a null value in the pending splits queue.");
synchronized (checkpointLock) {
this.pendingSplits.add(split);
@@ -259,43 +250,32 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
synchronized (checkpointLock) {
- if (this.currentSplit != null) {
-
- if (currentSplit.equals(EOS)) {
- isRunning = false;
- break;
- }
-
- if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
-
- @SuppressWarnings("unchecked")
- CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
- (CheckpointableInputFormat<FileInputSplit, S>) this.format;
-
- checkpointableFormat.reopen(currentSplit, restoredFormatState);
- } else {
- // this is the case of a non-checkpointable input format that will reprocess the last split.
- LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
- format.open(currentSplit);
- }
- // reset the restored state to null for the next iteration
- this.restoredFormatState = null;
- } else {
-
- // get the next split to read.
+ if (currentSplit == null) {
currentSplit = this.pendingSplits.poll();
-
if (currentSplit == null) {
checkpointLock.wait(50);
continue;
}
+ }
- if (currentSplit.equals(EOS)) {
- isRunning = false;
- break;
- }
+ if (currentSplit.equals(EOS)) {
+ isRunning = false;
+ break;
+ }
+
+ if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
+ // recovering after a node failure with an input
+ // format that supports resetting the offset
+ ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).
+ reopen(currentSplit, currentSplit.getSplitState());
+ } else {
+ // we either have a new split, or we recovered from a node
+ // failure but the input format does not support resetting the offset.
this.format.open(currentSplit);
}
+
+ // reset the restored state to null for the next iteration
+ this.currentSplit.resetSplitState();
this.isSplitOpen = true;
}
@@ -348,34 +328,17 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
}
- private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
- List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
- for (FileInputSplit split: this.pendingSplits) {
- snapshot.add(split);
- }
-
- // remove the current split from the list if inside.
- if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) {
- this.pendingSplits.remove();
- }
-
- if (this.currentSplit != null) {
- if (this.format instanceof CheckpointableInputFormat) {
- @SuppressWarnings("unchecked")
- CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
- (CheckpointableInputFormat<FileInputSplit, S>) this.format;
-
- S formatState = this.isSplitOpen ?
- checkpointableFormat.getCurrentState() :
- restoredFormatState;
- return new Tuple3<>(snapshot, currentSplit, formatState);
- } else {
- LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
- return new Tuple3<>(snapshot, currentSplit, null);
+ private List<TimestampedFileInputSplit> getReaderState() throws IOException {
+ List<TimestampedFileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
+ if (currentSplit != null ) {
+ if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
+ Serializable formatState = ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState();
+ this.currentSplit.setSplitState(formatState);
}
- } else {
- return new Tuple3<>(snapshot, null, null);
+ snapshot.add(this.currentSplit);
}
+ snapshot.addAll(this.pendingSplits);
+ return snapshot;
}
public void cancel() {
@@ -389,45 +352,27 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception {
final ObjectOutputStream oos = new ObjectOutputStream(os);
- Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
- List<FileInputSplit> pendingSplits = readerState.f0;
- FileInputSplit currSplit = readerState.f1;
- S formatState = readerState.f2;
-
- // write the current split
- oos.writeObject(currSplit);
- oos.writeInt(pendingSplits.size());
- for (FileInputSplit split : pendingSplits) {
+ List<TimestampedFileInputSplit> readerState = this.reader.getReaderState();
+ oos.writeInt(readerState.size());
+ for (TimestampedFileInputSplit split : readerState) {
oos.writeObject(split);
}
-
- // write the state of the reading channel
- oos.writeObject(formatState);
oos.flush();
}
@Override
public void restoreState(FSDataInputStream is) throws Exception {
- final ObjectInputStream ois = new ObjectInputStream(is);
- // read the split that was being read
- FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+ checkState(this.restoredReaderState == null,
+ "The reader state has already been initialized.");
+
+ final ObjectInputStream ois = new ObjectInputStream(is);
- // read the pending splits list
- List<FileInputSplit> pendingSplits = new ArrayList<>();
int noOfSplits = ois.readInt();
+ List<TimestampedFileInputSplit> pendingSplits = new ArrayList<>(noOfSplits);
for (int i = 0; i < noOfSplits; i++) {
- FileInputSplit split = (FileInputSplit) ois.readObject();
- pendingSplits.add(split);
+ pendingSplits.add((TimestampedFileInputSplit) ois.readObject());
}
-
- // read the state of the format
- @SuppressWarnings("unchecked")
- S formatState = (S) ois.readObject();
-
- // set the whole reader state for the open() to find.
- checkState(this.readerState == null, "The reader state has already been initialized.");
-
- this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
+ this.restoredReaderState = pendingSplits;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
index cdbeb2b..f8c4fba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
@@ -20,12 +20,15 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.PublicEvolving;
/**
- * Specifies when the computation of the {@link ContinuousFileMonitoringFunction}
- * will be triggered.
+ * The mode in which the {@link ContinuousFileMonitoringFunction} operates.
+ * This can be either {@link #PROCESS_ONCE} or {@link #PROCESS_CONTINUOUSLY}.
*/
@PublicEvolving
public enum FileProcessingMode {
- PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring.
- PROCESS_CONTINUOUSLY // Reprocesses the whole file when new data is appended.
+ /** Processes the current contents of the path and exits. */
+ PROCESS_ONCE,
+
+ /** Periodically scans the path for new data. */
+ PROCESS_CONTINUOUSLY
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
new file mode 100644
index 0000000..323b3ab
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * <ul>
+ * <li>The modification time of the file this split belongs to.</li>
+ * <li>When checkpointing, the state of the split at the moment of the checkpoint.</li>
+ * </ul>
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and the
+ * {@link ContinuousFileReaderOperator} to perform continuous file processing.
+ * */
+public class TimestampedFileInputSplit extends FileInputSplit implements Comparable<TimestampedFileInputSplit>{
+
+ /** The modification time of the file this split belongs to. */
+ private final long modificationTime;
+
+ /**
+ * The state of the split. This information is used when
+ * restoring from a checkpoint and allows to resume reading the
+ * underlying file from the point we left off.
+ * */
+ private Serializable splitState;
+
+ /** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/
+ public static final TimestampedFileInputSplit EOS =
+ new TimestampedFileInputSplit(Long.MIN_VALUE, -1, null, -1, -1, null);
+
+ /**
+ * Creates a {@link TimestampedFileInputSplit} based on the file modification time and
+ * the rest of the information of the {@link FileInputSplit}, as returned by the
+ * underlying filesystem.
+ *
+ * @param modificationTime the modification file of the file this split belongs to
+ * @param num the number of this input split
+ * @param file the file name
+ * @param start the position of the first byte in the file to process
+ * @param length the number of bytes in the file to process (-1 is flag for "read whole file")
+ * @param hosts the list of hosts containing the block, possibly {@code null}
+ */
+ public TimestampedFileInputSplit(long modificationTime, int num, Path file, long start, long length, String[] hosts) {
+ super(num, file, start, length, hosts);
+
+ Preconditions.checkArgument(modificationTime >= 0 || modificationTime == Long.MIN_VALUE,
+ "Invalid File Split Modification Time: "+ modificationTime +".");
+
+ this.modificationTime = modificationTime;
+ }
+
+ /**
+ * Sets the state of the split. This information is used when
+ * restoring from a checkpoint and allows to resume reading the
+ * underlying file from the point we left off.
+ * <p>
+ * This is applicable to {@link org.apache.flink.api.common.io.FileInputFormat FileInputFormats}
+ * that implement the {@link org.apache.flink.api.common.io.CheckpointableInputFormat
+ * CheckpointableInputFormat} interface.
+ * */
+ public void setSplitState(Serializable state) {
+ this.splitState = state;
+ }
+
+ /**
+ * Sets the state of the split to {@code null}.
+ */
+ public void resetSplitState() {
+ this.setSplitState(null);
+ }
+
+ /** @return the state of the split. */
+ public Serializable getSplitState() {
+ return this.splitState;
+ }
+
+ /** @return The modification time of the file this split belongs to. */
+ public long getModificationTime() {
+ return this.modificationTime;
+ }
+
+ @Override
+ public int compareTo(TimestampedFileInputSplit o) {
+ long modTimeComp = this.modificationTime - o.modificationTime;
+ if (modTimeComp != 0L) {
+ // we cannot just cast the modTimeComp to int
+ // because it may overflow
+ return modTimeComp > 0 ? 1 : -1;
+ }
+
+ int pathComp = this.getPath().compareTo(o.getPath());
+ return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o != null && o instanceof TimestampedFileInputSplit && super.equals(o)) {
+ TimestampedFileInputSplit that = (TimestampedFileInputSplit) o;
+ return this.modificationTime == that.modificationTime;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int res = 37 * (int)(this.modificationTime ^ (this.modificationTime >>> 32));
+ return 37 * res + super.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "[" + getSplitNumber() + "] " + getPath() +" mod@ "+
+ modificationTime + " : " + getStart() + " + " + getLength();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index a265c0a..0e9b054 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -57,18 +57,17 @@ import static org.junit.Assert.fail;
public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
- private static final int NO_OF_FILES = 9;
- private static final int LINES_PER_FILE = 200;
+ private static final int NO_OF_FILES = 5;
+ private static final int LINES_PER_FILE = 150;
private static final int NO_OF_RETRIES = 3;
- private static final int PARALLELISM = 4;
- private static final long INTERVAL = 2000;
+ private static final long INTERVAL = 100;
private static File baseDir;
- private static org.apache.hadoop.fs.FileSystem fs;
+ private static org.apache.hadoop.fs.FileSystem localFs;
private static String localFsURI;
private FileCreator fc;
- private static Map<Integer, List<String>> finalCollectedContent = new HashMap<>();
+ private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>();
@BeforeClass
public static void createHDFS() {
@@ -79,7 +78,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
localFsURI = "file:///" + baseDir +"/";
- fs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
+ localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
} catch(Throwable e) {
e.printStackTrace();
@@ -100,22 +99,22 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
public void testProgram(StreamExecutionEnvironment env) {
// set the restart strategy.
- env.getConfig().setRestartStrategy(
- RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
- env.enableCheckpointing(20);
- env.setParallelism(PARALLELISM);
+ env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
+ env.enableCheckpointing(10);
// create and start the file creating thread.
fc = new FileCreator();
fc.start();
// create the monitoring source along with the necessary readers.
- TestingSinkFunction sink = new TestingSinkFunction();
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
DataStream<String> inputStream = env.readFile(format, localFsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
+ TestingSinkFunction sink = new TestingSinkFunction();
+
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
@@ -126,12 +125,17 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
@Override
public void postSubmit() throws Exception {
- Map<Integer, List<String>> collected = finalCollectedContent;
+
+ // be sure that the file creating thread is done.
+ fc.join();
+
+ Map<Integer, Set<String>> collected = actualCollectedContent;
Assert.assertEquals(collected.size(), fc.getFileContent().size());
+
for (Integer fileIdx: fc.getFileContent().keySet()) {
Assert.assertTrue(collected.keySet().contains(fileIdx));
- List<String> cntnt = collected.get(fileIdx);
+ List<String> cntnt = new ArrayList<>(collected.get(fileIdx));
Collections.sort(cntnt, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
@@ -147,105 +151,34 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
}
collected.clear();
- finalCollectedContent.clear();
+ actualCollectedContent.clear();
fc.clean();
}
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
- Assert.assertTrue(tkns.length == 6);
return Integer.parseInt(tkns[tkns.length - 1]);
}
- // --------------------------------------------------------------------------------------------
- // Custom Functions
- // --------------------------------------------------------------------------------------------
-
- // ------------------------- FILE CREATION -------------------------------
-
- /**
- * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
- * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
- * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
- * */
- private class FileCreator extends Thread {
-
- private final Set<Path> filesCreated = new HashSet<>();
- private final Map<Integer, String> fileContents = new HashMap<>();
-
- public void run() {
- try {
- for(int i = 0; i < NO_OF_FILES; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file =
- fillWithData(localFsURI, "file", i, "This is test line.");
- filesCreated.add(file.f0);
- fileContents.put(i, file.f1);
-
- Thread.sleep((int) (INTERVAL / (3.0/2)));
- }
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- void clean() throws IOException {
- assert (fs != null);
- for (org.apache.hadoop.fs.Path path: filesCreated) {
- fs.delete(path, false);
- }
- fileContents.clear();
- }
-
- Map<Integer, String> getFileContent() {
- return this.fileContents;
- }
- }
-
- /**
- * Fill the file with content and put the content in the {@code hdPathContents} list.
- * */
- private Tuple2<Path, String> fillWithData(
- String base, String fileName, int fileIdx, String sampleLine) throws IOException {
-
- assert (fs != null);
-
- org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-
- org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
- FSDataOutputStream stream = fs.create(tmp);
- StringBuilder str = new StringBuilder();
- for(int i = 0; i < LINES_PER_FILE; i++) {
- String line = fileIdx +": "+ sampleLine + " " + i +"\n";
- str.append(line);
- stream.write(line.getBytes());
- }
- stream.close();
-
- Assert.assertTrue("Result file present", !fs.exists(file));
- fs.rename(tmp, file);
- Assert.assertTrue("No result file present", fs.exists(file));
- return new Tuple2<>(file, str.toString());
- }
-
// -------------------------- Task Sink ------------------------------
private static class TestingSinkFunction extends RichSinkFunction<String>
implements Checkpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener {
- private static volatile boolean hasFailed = false;
+ private boolean hasFailed;
- private volatile int numSuccessfulCheckpoints;
-
- private long count;
+ private volatile boolean hasSuccessfulCheckpoints;
private long elementsToFailure;
- private long elementCounter = 0;
+ private long elementCounter;
- private Map<Integer, Set<String>> collectedContent = new HashMap<>();
+ private Map<Integer, Set<String>> actualContent = new HashMap<>();
TestingSinkFunction() {
hasFailed = false;
+ elementCounter = 0;
+ hasSuccessfulCheckpoints = false;
}
@Override
@@ -257,74 +190,157 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
long failurePosMax = (long) (0.7 * LINES_PER_FILE);
elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-
- if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
- finalCollectedContent = new HashMap<>();
- for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
- finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
- }
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() {
- try {
- super.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
}
@Override
public void invoke(String value) throws Exception {
- int fileIdx = Character.getNumericValue(value.charAt(0));
+ int fileIdx = getFileIdx(value);
- Set<String> content = collectedContent.get(fileIdx);
+ Set<String> content = actualContent.get(fileIdx);
if (content == null) {
content = new HashSet<>();
- collectedContent.put(fileIdx, content);
+ actualContent.put(fileIdx, content);
}
+ // detect duplicate lines.
if (!content.add(value + "\n")) {
fail("Duplicate line: " + value);
System.exit(0);
}
-
elementCounter++;
+
+ // this is termination
if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
- finalCollectedContent = new HashMap<>();
- for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
- finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
- }
+ actualCollectedContent = actualContent;
throw new SuccessException();
}
- count++;
- if (!hasFailed) {
- Thread.sleep(2);
- if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) {
- hasFailed = true;
- throw new Exception("Task Failure");
- }
+ // add some latency so that we have at least one checkpoint in
+ if (!hasFailed && !hasSuccessfulCheckpoints) {
+ Thread.sleep(5);
+ }
+
+ // simulate a node failure
+ if (!hasFailed && hasSuccessfulCheckpoints && elementCounter >= elementsToFailure) {
+ throw new Exception("Task Failure @ elem: " + elementCounter + " / " + elementsToFailure);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
@Override
public Tuple2<Long, Map<Integer, Set<String>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return new Tuple2<>(elementCounter, collectedContent);
+ return new Tuple2<>(elementCounter, actualContent);
}
@Override
public void restoreState(Tuple2<Long, Map<Integer, Set<String>>> state) throws Exception {
+ this.hasFailed = true;
this.elementCounter = state.f0;
- this.collectedContent = state.f1;
+ this.actualContent = state.f1;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- numSuccessfulCheckpoints++;
+ hasSuccessfulCheckpoints = true;
}
+
+ private int getFileIdx(String line) {
+ String[] tkns = line.split(":");
+ return Integer.parseInt(tkns[0]);
+ }
+ }
+
+ // ------------------------- FILE CREATION -------------------------------
+
+ /**
+ * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+ * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+ * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+ * */
+ private class FileCreator extends Thread {
+
+ private final Set<Path> filesCreated = new HashSet<>();
+ private final Map<Integer, String> fileContents = new HashMap<>();
+
+ /** The modification time of the last created file. */
+ private long lastCreatedModTime = Long.MIN_VALUE;
+
+ public void run() {
+ try {
+ for(int i = 0; i < NO_OF_FILES; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
+ long modTime;
+ do {
+
+ // give it some time so that the files have
+ // different modification timestamps.
+ Thread.sleep(50);
+
+ tmpFile = fillWithData(localFsURI, "file", i, "This is test line.");
+
+ modTime = localFs.getFileStatus(tmpFile.f0).getModificationTime();
+ if (modTime <= lastCreatedModTime) {
+ // delete the last created file to recreate it with a different timestamp
+ localFs.delete(tmpFile.f0, false);
+ }
+ } while (modTime <= lastCreatedModTime);
+ lastCreatedModTime = modTime;
+
+ // rename the file
+ org.apache.hadoop.fs.Path file =
+ new org.apache.hadoop.fs.Path(localFsURI + "/file" + i);
+ localFs.rename(tmpFile.f0, file);
+ Assert.assertTrue(localFs.exists(file));
+
+ filesCreated.add(file);
+ fileContents.put(i, tmpFile.f1);
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ void clean() throws IOException {
+ assert (localFs != null);
+ for (org.apache.hadoop.fs.Path path: filesCreated) {
+ localFs.delete(path, false);
+ }
+ fileContents.clear();
+ }
+
+ Map<Integer, String> getFileContent() {
+ return this.fileContents;
+ }
+ }
+
+ /**
+ * Fill the file with content and put the content in the {@code hdPathContents} list.
+ * */
+ private Tuple2<Path, String> fillWithData(
+ String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {
+
+ assert (localFs != null);
+
+ org.apache.hadoop.fs.Path tmp =
+ new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+
+ FSDataOutputStream stream = localFs.create(tmp);
+ StringBuilder str = new StringBuilder();
+ for(int i = 0; i < LINES_PER_FILE; i++) {
+ String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+ str.append(line);
+ stream.write(line.getBytes());
+ }
+ stream.close();
+ return new Tuple2<>(tmp, str.toString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
new file mode 100644
index 0000000..88bd822
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TimestampedFileInputSplitTest {
+
+ @Test
+ public void testSplitEquality() {
+
+ TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS;
+ TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS;
+
+ Assert.assertEquals(eos1, eos2);
+
+ TimestampedFileInputSplit richFirstSplit =
+ new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
+ Assert.assertNotEquals(eos1, richFirstSplit);
+
+ TimestampedFileInputSplit richSecondSplit =
+ new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
+ Assert.assertEquals(richFirstSplit, richSecondSplit);
+
+ TimestampedFileInputSplit richModSecondSplit =
+ new TimestampedFileInputSplit(11, 2, new Path("test"), 0, 100, null);
+ Assert.assertNotEquals(richSecondSplit, richModSecondSplit);
+
+ TimestampedFileInputSplit richThirdSplit =
+ new TimestampedFileInputSplit(10, 2, new Path("test/test1"), 0, 100, null);
+ Assert.assertEquals(richThirdSplit.getModificationTime(), 10);
+ Assert.assertNotEquals(richFirstSplit, richThirdSplit);
+
+ TimestampedFileInputSplit richThirdSplitCopy =
+ new TimestampedFileInputSplit(10, 2, new Path("test/test1"), 0, 100, null);
+ Assert.assertEquals(richThirdSplitCopy, richThirdSplit);
+ }
+
+ @Test
+ public void testSplitComparison() {
+ TimestampedFileInputSplit richFirstSplit =
+ new TimestampedFileInputSplit(10, 3, new Path("test/test1"), 0, 100, null);
+
+ TimestampedFileInputSplit richSecondSplit =
+ new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 0, 100, null);
+
+ TimestampedFileInputSplit richThirdSplit =
+ new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+
+ TimestampedFileInputSplit richForthSplit =
+ new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+
+ // lexicographically on the path order
+ Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0);
+ Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+ // same mod time, same file so smaller split number first
+ Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0);
+
+ // smaller modification time first
+ Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+ }
+
+ @Test
+ public void testIllegalArgument() {
+ try {
+ new TimestampedFileInputSplit(-10, 2, new Path("test"), 0, 100, null); // invalid modification time
+ } catch (Exception e) {
+ if (!(e instanceof IllegalArgumentException)) {
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+}