You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/13 12:44:21 UTC

[4/4] flink git commit: [FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state abstractions.

[FLINK-5163] Port the ContinuousFileMonitoringFunction to the new state abstractions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/685c4f83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/685c4f83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/685c4f83

Branch: refs/heads/master
Commit: 685c4f836bdb79181fd1f62642736606eb81d847
Parents: 3698379
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 17 14:54:08 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 13 13:38:18 2016 +0100

----------------------------------------------------------------------
 .../ContinuousFileProcessingITCase.java         |  2 +-
 .../hdfstests/ContinuousFileProcessingTest.java | 95 ++++++++++++++++++--
 .../environment/StreamExecutionEnvironment.java |  4 +-
 .../ContinuousFileMonitoringFunction.java       | 79 +++++++++++++---
 4 files changed, 157 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index 3211a20..df68a76 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -124,7 +124,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
 		env.setParallelism(PARALLELISM);
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+			new ContinuousFileMonitoringFunction<>(format,
 				FileProcessingMode.PROCESS_CONTINUOUSLY,
 				env.getParallelism(), INTERVAL);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 6454c11..0cb1bad 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -35,9 +35,11 @@ import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOpera
 import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -117,10 +119,10 @@ public class ContinuousFileProcessingTest {
 	public void testInvalidPathSpecification() throws Exception {
 
 		String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/";
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(invalidPath));
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, invalidPath,
+			new ContinuousFileMonitoringFunction<>(format,
 				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 		try {
 			monitoringFunction.run(new DummySourceContext() {
@@ -135,7 +137,7 @@ public class ContinuousFileProcessingTest {
 			Assert.fail("Test passed with an invalid path.");
 
 		} catch (FileNotFoundException e) {
-			Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage());
+			Assert.assertEquals("The provided file path " + format.getFilePath().toString() + " does not exist.", e.getMessage());
 		}
 	}
 
@@ -491,6 +493,8 @@ public class ContinuousFileProcessingTest {
 
 	private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
 
+		private static final long serialVersionUID = -6727603565381560267L;
+
 		private final OneShotLatch latch;
 
 		private FileInputSplit split;
@@ -556,6 +560,9 @@ public class ContinuousFileProcessingTest {
 
 		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
 		format.setFilesFilter(new FilePathFilter() {
+
+			private static final long serialVersionUID = 2611449927338589804L;
+
 			@Override
 			public boolean filterPath(Path filePath) {
 				return filePath.getName().startsWith("**");
@@ -563,7 +570,7 @@ public class ContinuousFileProcessingTest {
 		});
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+			new ContinuousFileMonitoringFunction<>(format,
 				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
 		final FileVerifyingSourceContext context =
@@ -601,7 +608,7 @@ public class ContinuousFileProcessingTest {
 		FileInputSplit[] splits = format.createInputSplits(1);
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+			new ContinuousFileMonitoringFunction<>(format,
 				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
 		ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
@@ -633,7 +640,7 @@ public class ContinuousFileProcessingTest {
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+			new ContinuousFileMonitoringFunction<>(format,
 				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
 		final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);
@@ -683,6 +690,80 @@ public class ContinuousFileProcessingTest {
 	}
 
 	@Test
+	public void testFunctionRestore() throws Exception {
+
+		org.apache.hadoop.fs.Path path = null;
+		long fileModTime = Long.MIN_VALUE;
+		for (int i = 0; i < 1; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			path = file.f0;
+			fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+			new StreamSource<>(monitoringFunction);
+
+		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+
+		final OneShotLatch latch = new OneShotLatch();
+
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					monitoringFunction.run(new DummySourceContext() {
+						@Override
+						public void collect(TimestampedFileInputSplit element) {
+							latch.trigger();
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+		monitoringFunction.cancel();
+		runner.join();
+
+		testHarness.close();
+
+		final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
+			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
+			new StreamSource<>(monitoringFunctionCopy);
+
+		AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarnessCopy =
+			new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
+		testHarnessCopy.initializeState(snapshot);
+		testHarnessCopy.open();
+
+		Assert.assertNull(error[0]);
+		Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime());
+
+		hdfs.delete(path, false);
+	}
+
+	@Test
 	public void testProcessContinuously() throws Exception {
 		final OneShotLatch latch = new OneShotLatch();
 
@@ -698,7 +779,7 @@ public class ContinuousFileProcessingTest {
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+			new ContinuousFileMonitoringFunction<>(format,
 				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
 
 		final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES

http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 08e17a1..99784e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1351,9 +1351,7 @@ public abstract class StreamExecutionEnvironment {
 					ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
 
 		ContinuousFileMonitoringFunction<OUT> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(
-				inputFormat, inputFormat.getFilePath().toString(),
-				monitoringMode, getParallelism(), interval);
+			new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
 
 		ContinuousFileReaderOperator<OUT> reader =
 			new ContinuousFileReaderOperator<>(inputFormat);

http://git-wip-us.apache.org/repos/asf/flink/blob/685c4f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 54ab0ab..8723853 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -17,14 +17,20 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +64,7 @@ import java.util.TreeMap;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-	extends RichSourceFunction<TimestampedFileInputSplit> implements Checkpointed<Long> {
+	extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction {
 
 	private static final long serialVersionUID = 1L;
 
@@ -92,10 +98,13 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	private volatile boolean isRunning = true;
 
+	private transient ListState<Long> checkpointedState;
+
 	public ContinuousFileMonitoringFunction(
-		FileInputFormat<OUT> format, String path,
+		FileInputFormat<OUT> format,
 		FileProcessingMode watchType,
-		int readerParallelism, long interval) {
+		int readerParallelism,
+		long interval) {
 
 		Preconditions.checkArgument(
 			watchType == FileProcessingMode.PROCESS_ONCE || interval >= MIN_MONITORING_INTERVAL,
@@ -104,7 +113,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 		);
 
 		this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
-		this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
+		this.path = Preconditions.checkNotNull(format.getFilePath().toString(), "Unspecified Path.");
 
 		this.interval = interval;
 		this.watchType = watchType;
@@ -112,13 +121,56 @@ public class ContinuousFileMonitoringFunction<OUT>
 		this.globalModificationTime = Long.MIN_VALUE;
 	}
 
+	@VisibleForTesting
+	public long getGlobalModificationTime() {
+		return this.globalModificationTime;
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+
+		Preconditions.checkState(this.checkpointedState == null,
+			"The " + getClass().getSimpleName() + " has already been initialized.");
+
+		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
+			new ListStateDescriptor<>(
+				"file-monitoring-state",
+				LongSerializer.INSTANCE
+			)
+		);
+
+		if (context.isRestored()) {
+			LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+			List<Long> retrievedStates = new ArrayList<>();
+			for (Long entry : this.checkpointedState.get()) {
+				retrievedStates.add(entry);
+			}
+
+			// given that the parallelism of the function is 1, we can only have 1 state
+			Preconditions.checkArgument(retrievedStates.size() == 1,
+				getClass().getSimpleName() + " retrieved invalid state.");
+
+			this.globalModificationTime = retrievedStates.get(0);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("{} retrieved a global mod time of {}.",
+					getClass().getSimpleName(), globalModificationTime);
+			}
+
+		} else {
+			LOG.info("No state to restore for the {}.", getClass().getSimpleName());
+		}
+	}
+
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		format.configure(parameters);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Opened File Monitoring Source for path: " + path + ".");
+			LOG.debug("Opened {} (taskIdx= {}) for path: {}",
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), path);
 		}
 	}
 
@@ -294,12 +346,15 @@ public class ContinuousFileMonitoringFunction<OUT>
 	//	---------------------			Checkpointing			--------------------------
 
 	@Override
-	public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		return this.globalModificationTime;
-	}
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState != null,
+			"The " + getClass().getSimpleName() + " state has not been properly initialized.");
 
-	@Override
-	public void restoreState(Long state) throws Exception {
-		this.globalModificationTime = state;
+		this.checkpointedState.clear();
+		this.checkpointedState.add(this.globalModificationTime);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime);
+		}
 	}
 }