You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/13 12:44:21 UTC
[4/4] flink git commit: [FLINK-5163] Port the
ContinuousFileMonitoringFunction to the new state abstractions.
[FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state abstractions.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/685c4f83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/685c4f83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/685c4f83
Branch: refs/heads/master
Commit: 685c4f836bdb79181fd1f62642736606eb81d847
Parents: 3698379
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 17 14:54:08 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 13 13:38:18 2016 +0100
----------------------------------------------------------------------
.../ContinuousFileProcessingITCase.java | 2 +-
.../hdfstests/ContinuousFileProcessingTest.java | 95 ++++++++++++++++++--
.../environment/StreamExecutionEnvironment.java | 4 +-
.../ContinuousFileMonitoringFunction.java | 79 +++++++++++++---
4 files changed, 157 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index 3211a20..df68a76 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -124,7 +124,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
env.setParallelism(PARALLELISM);
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+ new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), INTERVAL);
http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 6454c11..0cb1bad 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -35,9 +35,11 @@ import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOpera
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -117,10 +119,10 @@ public class ContinuousFileProcessingTest {
public void testInvalidPathSpecification() throws Exception {
String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/";
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(invalidPath));
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, invalidPath,
+ new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
try {
monitoringFunction.run(new DummySourceContext() {
@@ -135,7 +137,7 @@ public class ContinuousFileProcessingTest {
Assert.fail("Test passed with an invalid path.");
} catch (FileNotFoundException e) {
- Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage());
+ Assert.assertEquals("The provided file path " + format.getFilePath().toString() + " does not exist.", e.getMessage());
}
}
@@ -491,6 +493,8 @@ public class ContinuousFileProcessingTest {
private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
+ private static final long serialVersionUID = -6727603565381560267L;
+
private final OneShotLatch latch;
private FileInputSplit split;
@@ -556,6 +560,9 @@ public class ContinuousFileProcessingTest {
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new FilePathFilter() {
+
+ private static final long serialVersionUID = 2611449927338589804L;
+
@Override
public boolean filterPath(Path filePath) {
return filePath.getName().startsWith("**");
@@ -563,7 +570,7 @@ public class ContinuousFileProcessingTest {
});
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+ new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
final FileVerifyingSourceContext context =
@@ -601,7 +608,7 @@ public class ContinuousFileProcessingTest {
FileInputSplit[] splits = format.createInputSplits(1);
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+ new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
@@ -633,7 +640,7 @@ public class ContinuousFileProcessingTest {
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+ new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);
@@ -683,6 +690,80 @@ public class ContinuousFileProcessingTest {
}
@Test
+ public void testFunctionRestore() throws Exception {
+
+ org.apache.hadoop.fs.Path path = null;
+ long fileModTime = Long.MIN_VALUE;
+ for (int i = 0; i < 1; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ path = file.f0;
+ fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
+ }
+
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+ final ContinuousFileMonitoringFunction<String> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+ StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+ new StreamSource<>(monitoringFunction);
+
+ final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+ new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+ testHarness.open();
+
+ final Throwable[] error = new Throwable[1];
+
+ final OneShotLatch latch = new OneShotLatch();
+
+ // run the source asynchronously
+ Thread runner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ monitoringFunction.run(new DummySourceContext() {
+ @Override
+ public void collect(TimestampedFileInputSplit element) {
+ latch.trigger();
+ }
+ });
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ error[0] = t;
+ }
+ }
+ };
+ runner.start();
+
+ if (!latch.isTriggered()) {
+ latch.await();
+ }
+
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+ monitoringFunction.cancel();
+ runner.join();
+
+ testHarness.close();
+
+ final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
+ new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+ StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
+ new StreamSource<>(monitoringFunctionCopy);
+
+ AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarnessCopy =
+ new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
+ testHarnessCopy.initializeState(snapshot);
+ testHarnessCopy.open();
+
+ Assert.assertNull(error[0]);
+ Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime());
+
+ hdfs.delete(path, false);
+ }
+
+ @Test
public void testProcessContinuously() throws Exception {
final OneShotLatch latch = new OneShotLatch();
@@ -698,7 +779,7 @@ public class ContinuousFileProcessingTest {
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+ new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 08e17a1..99784e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1351,9 +1351,7 @@ public abstract class StreamExecutionEnvironment {
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(
- inputFormat, inputFormat.getFilePath().toString(),
- monitoringMode, getParallelism(), interval);
+ new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
ContinuousFileReaderOperator<OUT> reader =
new ContinuousFileReaderOperator<>(inputFormat);
http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 54ab0ab..8723853 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -17,14 +17,20 @@
package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +64,7 @@ import java.util.TreeMap;
*/
@Internal
public class ContinuousFileMonitoringFunction<OUT>
- extends RichSourceFunction<TimestampedFileInputSplit> implements Checkpointed<Long> {
+ extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
@@ -92,10 +98,13 @@ public class ContinuousFileMonitoringFunction<OUT>
private volatile boolean isRunning = true;
+ private transient ListState<Long> checkpointedState;
+
public ContinuousFileMonitoringFunction(
- FileInputFormat<OUT> format, String path,
+ FileInputFormat<OUT> format,
FileProcessingMode watchType,
- int readerParallelism, long interval) {
+ int readerParallelism,
+ long interval) {
Preconditions.checkArgument(
watchType == FileProcessingMode.PROCESS_ONCE || interval >= MIN_MONITORING_INTERVAL,
@@ -104,7 +113,7 @@ public class ContinuousFileMonitoringFunction<OUT>
);
this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
- this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
+ this.path = Preconditions.checkNotNull(format.getFilePath().toString(), "Unspecified Path.");
this.interval = interval;
this.watchType = watchType;
@@ -112,13 +121,56 @@ public class ContinuousFileMonitoringFunction<OUT>
this.globalModificationTime = Long.MIN_VALUE;
}
+ @VisibleForTesting
+ public long getGlobalModificationTime() {
+ return this.globalModificationTime;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+
+ Preconditions.checkState(this.checkpointedState == null,
+ "The " + getClass().getSimpleName() + " has already been initialized.");
+
+ this.checkpointedState = context.getOperatorStateStore().getOperatorState(
+ new ListStateDescriptor<>(
+ "file-monitoring-state",
+ LongSerializer.INSTANCE
+ )
+ );
+
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+ List<Long> retrievedStates = new ArrayList<>();
+ for (Long entry : this.checkpointedState.get()) {
+ retrievedStates.add(entry);
+ }
+
+ // given that the parallelism of the function is 1, we can only have 1 state
+ Preconditions.checkArgument(retrievedStates.size() == 1,
+ getClass().getSimpleName() + " retrieved invalid state.");
+
+ this.globalModificationTime = retrievedStates.get(0);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} retrieved a global mod time of {}.",
+ getClass().getSimpleName(), globalModificationTime);
+ }
+
+ } else {
+ LOG.info("No state to restore for the {}.", getClass().getSimpleName());
+ }
+ }
+
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
format.configure(parameters);
if (LOG.isDebugEnabled()) {
- LOG.debug("Opened File Monitoring Source for path: " + path + ".");
+ LOG.debug("Opened {} (taskIdx= {}) for path: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), path);
}
}
@@ -294,12 +346,15 @@ public class ContinuousFileMonitoringFunction<OUT>
// --------------------- Checkpointing --------------------------
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return this.globalModificationTime;
- }
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ Preconditions.checkState(this.checkpointedState != null,
+ "The " + getClass().getSimpleName() + " state has not been properly initialized.");
- @Override
- public void restoreState(Long state) throws Exception {
- this.globalModificationTime = state;
+ this.checkpointedState.clear();
+ this.checkpointedState.add(this.globalModificationTime);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
+ }
}
}