You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/25 14:09:49 UTC
flink git commit: [FLINK-3677] FileInputFormat: Allow to specify
include/exclude file name patterns
Repository: flink
Updated Branches:
refs/heads/master 259a3a556 -> 481091043
[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns
This closes #2109
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48109104
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48109104
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48109104
Branch: refs/heads/master
Commit: 4810910431e01bf143ae77a6e93a86f2fafbccd0
Parents: 259a3a5
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Tue Jun 14 22:44:19 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Aug 25 16:08:18 2016 +0200
----------------------------------------------------------------------
flink-core/pom.xml | 7 +
.../flink/api/common/io/FileInputFormat.java | 20 ++-
.../flink/api/common/io/FilePathFilter.java | 69 ++++++++
.../flink/api/common/io/GlobFilePathFilter.java | 111 ++++++++++++
.../java/org/apache/flink/core/fs/Path.java | 10 +-
.../flink/core/fs/local/LocalFileStatus.java | 8 +
.../flink/api/common/io/DefaultFilterTest.java | 70 ++++++++
.../api/common/io/FileInputFormatTest.java | 174 ++++++++++++-------
.../api/common/io/GlobFilePathFilterTest.java | 141 +++++++++++++++
.../ContinuousFileMonitoringFunctionITCase.java | 4 +-
.../hdfstests/ContinuousFileMonitoringTest.java | 13 +-
.../environment/StreamExecutionEnvironment.java | 72 ++++++--
.../ContinuousFileMonitoringFunction.java | 8 +-
.../api/functions/source/FilePathFilter.java | 66 -------
.../api/scala/StreamExecutionEnvironment.scala | 43 ++++-
...ontinuousFileProcessingCheckpointITCase.java | 5 +-
16 files changed, 650 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 9e290a0..dcb2599 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -103,6 +103,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 72d6061..d0f5166 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
* The fraction that the last split may be larger than the others.
*/
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-
+
/**
* The timeout (in milliseconds) to wait for a filesystem stream to respond.
*/
@@ -218,7 +219,12 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
* structure is enabled.
*/
protected boolean enumerateNestedFiles = false;
-
+
+ /**
+ * Files filter for determining what files/directories should be included.
+ */
+ private FilePathFilter filesFilter = new GlobFilePathFilter();
+
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
@@ -332,6 +338,10 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
return splitLength;
}
+ public void setFilesFilter(FilePathFilter filesFilter) {
+ this.filesFilter = Preconditions.checkNotNull(filesFilter, "Files filter should not be null");
+ }
+
// --------------------------------------------------------------------------------------------
// Pre-flight: Configuration, Splits, Sampling
// --------------------------------------------------------------------------------------------
@@ -625,7 +635,9 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
*/
protected boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
- return !name.startsWith("_") && !name.startsWith(".");
+ return !name.startsWith("_")
+ && !name.startsWith(".")
+ && !filesFilter.filterPath(fileStatus.getPath());
}
/**
@@ -735,7 +747,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
"File Input (unknown file)" :
"File Input (" + this.filePath.toString() + ')';
}
-
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
new file mode 100644
index 0000000..4ab896c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files that
+ * are still being written.
+ */
+@PublicEvolving
+public abstract class FilePathFilter implements Serializable {
+
+ // Name of an unfinished Hadoop file
+ public static final String HADOOP_COPYING = "_COPYING_";
+
+ public static FilePathFilter createDefaultFilter() {
+ return new DefaultFilter();
+ }
+
+ /**
+ * Returns {@code true} if the {@code filePath} given is to be
+ * ignored when processing a directory, e.g.
+ * <pre>
+ * {@code
+ *
+ * public boolean filterPaths(Path filePath) {
+ * return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
+ * }
+ * }</pre>
+ */
+ public abstract boolean filterPath(Path filePath);
+
+ /**
+ * The default file path filtering method and is used
+ * if no other such function is provided. This filter leaves out
+ * files starting with ".", "_", and "_COPYING_".
+ */
+ public static class DefaultFilter extends FilePathFilter {
+
+ DefaultFilter() {}
+
+ @Override
+ public boolean filterPath(Path filePath) {
+ return filePath == null ||
+ filePath.getName().startsWith(".") ||
+ filePath.getName().startsWith("_") ||
+ filePath.getName().contains(HADOOP_COPYING);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
new file mode 100644
index 0000000..4aaf481
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class for determining if a particular file should be included or excluded
+ * based on a set of include and exclude glob filters.
+ *
+ * Glob filter support the following expressions:
+ * <ul>
+ * <li>* - matches any number of any characters including none</li>
+ * <li>** - matches any file in all subdirectories</li>
+ * <li>? - matches any single character</li>
+ * <li>[abc] - matches one of the characters listed in a brackets</li>
+ * <li>[a-z] - matches one character from the range given in the brackets</li>
+ * </ul>
+ *
+ * <p> If does not match an include pattern it is excluded. If it matches and include
+ * pattern but also matches an exclude pattern it is excluded.
+ *
+ * <p> If no patterns are provided all files are included
+ */
+@Internal
+public class GlobFilePathFilter extends FilePathFilter {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<PathMatcher> includeMatchers;
+ private final List<PathMatcher> excludeMatchers;
+
+ /**
+ * Constructor for GlobFilePathFilter that will match all files
+ */
+ public GlobFilePathFilter() {
+ this(Collections.<String>emptyList(), Collections.<String>emptyList());
+ }
+
+ /**
+ * Constructor for GlobFilePathFilter
+ *
+ * @param includePatterns glob patterns for files to include
+ * @param excludePatterns glob patterns for files to exclude
+ */
+ public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
+ includeMatchers = buildPatterns(includePatterns);
+ excludeMatchers = buildPatterns(excludePatterns);
+ }
+
+ private List<PathMatcher> buildPatterns(List<String> patterns) {
+ FileSystem fileSystem = FileSystems.getDefault();
+ List<PathMatcher> matchers = new ArrayList<>();
+
+ for (String patternStr : patterns) {
+ matchers.add(fileSystem.getPathMatcher("glob:" + patternStr));
+ }
+
+ return matchers;
+ }
+
+ @Override
+ public boolean filterPath(Path filePath) {
+ if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+ return false;
+ }
+
+ for (PathMatcher mather : includeMatchers) {
+ if (mather.matches(Paths.get(filePath.getPath()))) {
+ return shouldExclude(filePath);
+ }
+ }
+
+ return true;
+ }
+
+ private boolean shouldExclude(Path filePath) {
+ for (PathMatcher matcher : excludeMatchers) {
+ if (matcher.matches(Paths.get(filePath.getPath()))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 4c77199..7adfa42 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -145,7 +145,7 @@ public class Path implements IOReadableWritable, Serializable {
}
/**
- * Checks if the provided path string is either null or has zero length and throws
+ * Checks if the provided path string is either null or has zero length and throws
* a {@link IllegalArgumentException} if any of the two conditions apply.
* In addition, leading and tailing whitespaces are removed.
*
@@ -333,6 +333,14 @@ public class Path implements IOReadableWritable, Serializable {
}
/**
+ * Return full path.
+ * @return full path
+ */
+ public String getPath() {
+ return uri.getPath();
+ }
+
+ /**
* Returns the parent of a path, i.e., everything that precedes the last separator
* or <code>null</code> if at root.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 0aebd75..3e127ff 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -102,4 +102,12 @@ public class LocalFileStatus implements FileStatus {
public File getFile() {
return this.file;
}
+
+ @Override
+ public String toString() {
+ return "LocalFileStatus{" +
+ "file=" + file +
+ ", path=" + path +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
new file mode 100644
index 0000000..6956518
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"file.txt", false},
+
+ {".file.txt", true},
+ {"dir/.file.txt", true},
+ {".dir/file.txt", false},
+
+ {"_file.txt", true},
+ {"dir/_file.txt", true},
+ {"_dir/file.txt", false},
+
+ // Check filtering Hadoop's unfinished files
+ {FilePathFilter.HADOOP_COPYING, true},
+ {"dir/" + FilePathFilter.HADOOP_COPYING, true},
+ {FilePathFilter.HADOOP_COPYING + "/file.txt", false},
+ });
+ }
+
+ private final boolean shouldFilter;
+ private final String filePath;
+
+ public DefaultFilterTest(String filePath, boolean shouldFilter) {
+ this.filePath = filePath;
+ this.shouldFilter = shouldFilter;
+ }
+
+ @Test
+ public void test() {
+ FilePathFilter defaultFilter = FilePathFilter.createDefaultFilter();
+ Path path = new Path(filePath);
+ assertEquals(
+ String.format("File: %s", filePath),
+ shouldFilter,
+ defaultFilter.filterPath(path));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index ae8802b..dcd6583 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.io;
+import com.google.common.collect.Lists;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
@@ -27,16 +28,17 @@ import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.types.IntValue;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.Collections;
import static org.junit.Assert.*;
@@ -45,6 +47,9 @@ import static org.junit.Assert.*;
*/
public class FileInputFormatTest {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
// ------------------------------------------------------------------------
// Statistics
// ------------------------------------------------------------------------
@@ -257,41 +262,21 @@ public class FileInputFormatTest {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-
+
// create some accepted, some ignored files
-
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
- File f;
- do {
- f = new File(tempDir, TestFileUtils.randomFileName(""));
- }
- while (f.exists());
- assertTrue(f.mkdirs());
- f.deleteOnExit();
-
- File child1 = new File(f, "dataFile1.txt");
- File child2 = new File(f, "another_file.bin");
- File luigiFile = new File(f, "_luigi");
- File success = new File(f, "_SUCCESS");
-
- File[] files = { child1, child2, luigiFile, success };
-
- for (File child : files) {
- child.deleteOnExit();
-
- BufferedWriter out = new BufferedWriter(new FileWriter(child));
- try {
- out.write(contents);
- } finally {
- out.close();
- }
- }
-
+
+ File child1 = temporaryFolder.newFile("dataFile1.txt");
+ File child2 = temporaryFolder.newFile("another_file.bin");
+ File luigiFile = temporaryFolder.newFile("_luigi");
+ File success = temporaryFolder.newFile("_SUCCESS");
+
+ createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
+
// test that only the valid files are accepted
final DummyFileInputFormat format = new DummyFileInputFormat();
- format.setFilePath(f.toURI().toString());
+ format.setFilePath(temporaryFolder.getRoot().toURI().toString());
format.configure(new Configuration());
FileInputSplit[] splits = format.createInputSplits(1);
@@ -314,43 +299,95 @@ public class FileInputFormatTest {
}
@Test
+ public void testExcludeFiles() {
+ try {
+ final String contents = "CONTENTS";
+
+ // create some accepted, some ignored files
+
+ File child1 = temporaryFolder.newFile("dataFile1.txt");
+ File child2 = temporaryFolder.newFile("another_file.bin");
+
+ File[] files = { child1, child2 };
+
+ createTempFiles(contents.getBytes(), files);
+
+ // test that only the valid files are accepted
+
+ Configuration configuration = new Configuration();
+
+ final DummyFileInputFormat format = new DummyFileInputFormat();
+ format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+ format.configure(configuration);
+ format.setFilesFilter(new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Collections.singletonList("**/another_file.bin")));
+ FileInputSplit[] splits = format.createInputSplits(1);
+
+ Assert.assertEquals(1, splits.length);
+
+ final URI uri1 = splits[0].getPath().toUri();
+
+ final URI childUri1 = child1.toURI();
+
+ Assert.assertEquals(uri1, childUri1);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testReadMultiplePatterns() {
+ try {
+ final String contents = "CONTENTS";
+
+ // create some accepted, some ignored files
+
+ File child1 = temporaryFolder.newFile("dataFile1.txt");
+ File child2 = temporaryFolder.newFile("another_file.bin");
+ createTempFiles(contents.getBytes(), child1, child2);
+
+ // test that only the valid files are accepted
+
+ Configuration configuration = new Configuration();
+
+ final DummyFileInputFormat format = new DummyFileInputFormat();
+ format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+ format.configure(configuration);
+ format.setFilesFilter(new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Lists.newArrayList("**/another_file.bin", "**/dataFile1.txt")
+ ));
+ FileInputSplit[] splits = format.createInputSplits(1);
+
+ Assert.assertEquals(0, splits.length);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testGetStatsIgnoredUnderscoreFiles() {
try {
- final long SIZE = 2048;
+ final int SIZE = 2048;
final long TOTAL = 2*SIZE;
// create two accepted and two ignored files
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
- File f;
- do {
- f = new File(tempDir, TestFileUtils.randomFileName(""));
- }
- while (f.exists());
-
- assertTrue(f.mkdirs());
- f.deleteOnExit();
-
- File child1 = new File(f, "dataFile1.txt");
- File child2 = new File(f, "another_file.bin");
- File luigiFile = new File(f, "_luigi");
- File success = new File(f, "_SUCCESS");
-
- File[] files = { child1, child2, luigiFile, success };
+ File child1 = temporaryFolder.newFile("dataFile1.txt");
+ File child2 = temporaryFolder.newFile("another_file.bin");
+ File luigiFile = temporaryFolder.newFile("_luigi");
+ File success = temporaryFolder.newFile("_SUCCESS");
- for (File child : files) {
- child.deleteOnExit();
+ createTempFiles(new byte[SIZE], child1, child2, luigiFile, success);
- BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child));
- try {
- for (long bytes = SIZE; bytes > 0; bytes--) {
- out.write(0);
- }
- } finally {
- out.close();
- }
- }
final DummyFileInputFormat format = new DummyFileInputFormat();
- format.setFilePath(f.toURI().toString());
+ format.setFilePath(temporaryFolder.getRoot().toURI().toString());
format.configure(new Configuration());
// check that only valid files are used for statistics computation
@@ -406,7 +443,20 @@ public class FileInputFormatTest {
}
// ------------------------------------------------------------------------
-
+
+ private void createTempFiles(byte[] contents, File... files) throws IOException {
+ for (File child : files) {
+ child.deleteOnExit();
+
+ BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child));
+ try {
+ out.write(contents);
+ } finally {
+ out.close();
+ }
+ }
+ }
+
private class DummyFileInputFormat extends FileInputFormat<IntValue> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
new file mode 100644
index 0000000..bced076
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+ @Test
+ public void defaultConstructorCreateMatchAllFilter() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter();
+ assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+ }
+
+ @Test
+ public void matchAllFilesByDefault() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.<String>emptyList(),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+ }
+
+ @Test
+ public void excludeFilesNotInIncludePatterns() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("dir/*"),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+ assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+ }
+
+ @Test
+ public void excludeFilesIfMatchesExclude() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("dir/*"),
+ Collections.singletonList("dir/file.txt"));
+
+ assertTrue(matcher.filterPath(new Path("dir/file.txt")));
+ }
+
+ @Test
+ public void includeFileWithAnyCharacterMatcher() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("dir/?.txt"),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+ assertTrue(matcher.filterPath(new Path("dir/aa.txt")));
+ }
+
+ @Test
+ public void includeFileWithCharacterSetMatcher() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("dir/[acd].txt"),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+ assertFalse(matcher.filterPath(new Path("dir/c.txt")));
+ assertFalse(matcher.filterPath(new Path("dir/d.txt")));
+ assertTrue(matcher.filterPath(new Path("dir/z.txt")));
+ }
+
+ @Test
+ public void includeFileWithCharacterRangeMatcher() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("dir/[a-d].txt"),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+ assertFalse(matcher.filterPath(new Path("dir/b.txt")));
+ assertFalse(matcher.filterPath(new Path("dir/c.txt")));
+ assertFalse(matcher.filterPath(new Path("dir/d.txt")));
+ assertTrue(matcher.filterPath(new Path("dir/z.txt")));
+ }
+
+ @Test
+ public void excludeHDFSFile() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Collections.singletonList("/dir/file2.txt"));
+
+ assertFalse(matcher.filterPath(new Path("hdfs:///dir/file1.txt")));
+ assertTrue(matcher.filterPath(new Path("hdfs:///dir/file2.txt")));
+ assertFalse(matcher.filterPath(new Path("hdfs:///dir/file3.txt")));
+ }
+
+ @Test
+ public void excludeFilenameWithStart() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Collections.singletonList("\\*"));
+
+ assertTrue(matcher.filterPath(new Path("*")));
+ assertFalse(matcher.filterPath(new Path("**")));
+ assertFalse(matcher.filterPath(new Path("other.txt")));
+ }
+
+ @Test
+ public void singleStarPattern() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("*"),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("a")));
+ assertTrue(matcher.filterPath(new Path("a/b")));
+ assertTrue(matcher.filterPath(new Path("a/b/c")));
+ }
+
+ @Test
+ public void doubleStarPattern() {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Collections.<String>emptyList());
+
+ assertFalse(matcher.filterPath(new Path("a")));
+ assertFalse(matcher.filterPath(new Path("a/b")));
+ assertFalse(matcher.filterPath(new Path("a/b/c")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index e6cd5d9..663345c 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -122,9 +122,9 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
- FilePathFilter.createDefaultFilter(),
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), INTERVAL);
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index def9378..4aadaec 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -216,8 +216,9 @@ public class ContinuousFileMonitoringTest {
}
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ format.setFilesFilter(new PathFilter());
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI, new PathFilter(),
+ new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
monitoringFunction.open(new Configuration());
@@ -242,8 +243,9 @@ public class ContinuousFileMonitoringTest {
fc.start();
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+ new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
monitoringFunction.open(new Configuration());
@@ -291,8 +293,9 @@ public class ContinuousFileMonitoringTest {
Assert.assertTrue(fc.getFilesCreated().size() >= 1);
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+ new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
monitoringFunction.open(new Configuration());
@@ -427,7 +430,7 @@ public class ContinuousFileMonitoringTest {
assert (hdfs != null);
org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
- Assert.assertTrue (!hdfs.exists(file));
+ Assert.assertFalse(hdfs.exists(file));
org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
FSDataOutputStream stream = hdfs.create(tmp);
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 1913a36..ead9564 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -52,7 +52,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
@@ -917,11 +917,11 @@ public abstract class StreamExecutionEnvironment {
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
- return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1,
- FilePathFilter.createDefaultFilter(), typeInfo);
+ return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
/**
@@ -952,7 +952,52 @@ public abstract class StreamExecutionEnvironment {
*/
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath) {
- return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter());
+ return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
+ }
+
+ /**
+ *
+ * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
+ * on the provided {@link FileProcessingMode}.
+ * <p>
+ * See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)}
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+ * @param interval
+ * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+ * @param filter
+ * The files to be excluded from the processing
+ * @param <OUT>
+ * The type of the returned data stream
+ * @return The data stream that represents the data read from the given file
+ *
+ * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
+ * {@link StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)}
+ *
+ */
+ @PublicEvolving
+ @Deprecated
+ public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+ String filePath,
+ FileProcessingMode watchType,
+ long interval,
+ FilePathFilter filter) {
+ inputFormat.setFilesFilter(filter);
+
+ TypeInformation<OUT> typeInformation;
+ try {
+ typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
+ } catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be " +
+ "automatically determined. Please specify the TypeInformation of the produced type " +
+ "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
+ }
+ return readFile(inputFormat, filePath, watchType, interval, typeInformation);
}
/**
@@ -986,8 +1031,6 @@ public abstract class StreamExecutionEnvironment {
* The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
- * @param filter
- * The files to be excluded from the processing
* @param <OUT>
* The type of the returned data stream
* @return The data stream that represents the data read from the given file
@@ -996,8 +1039,7 @@ public abstract class StreamExecutionEnvironment {
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
- long interval,
- FilePathFilter filter) {
+ long interval) {
TypeInformation<OUT> typeInformation;
try {
@@ -1007,7 +1049,7 @@ public abstract class StreamExecutionEnvironment {
"automatically determined. Please specify the TypeInformation of the produced type " +
"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
}
- return readFile(inputFormat, filePath, watchType, interval, filter, typeInformation);
+ return readFile(inputFormat, filePath, watchType, interval, typeInformation);
}
/**
@@ -1057,8 +1099,6 @@ public abstract class StreamExecutionEnvironment {
* The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
* @param watchType
* The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
- * @param filter
- * The files to be excluded from the processing
* @param typeInformation
* Information on the type of the elements in the output stream
* @param interval
@@ -1072,7 +1112,6 @@ public abstract class StreamExecutionEnvironment {
String filePath,
FileProcessingMode watchType,
long interval,
- FilePathFilter filter,
TypeInformation<OUT> typeInformation) {
Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
@@ -1080,7 +1119,7 @@ public abstract class StreamExecutionEnvironment {
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
inputFormat.setFilePath(filePath);
- return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter, interval);
+ return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
}
/**
@@ -1250,8 +1289,7 @@ public abstract class StreamExecutionEnvironment {
if (inputFormat instanceof FileInputFormat) {
FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
source = createFileInput(format, typeInfo, "Custom File source",
- FileProcessingMode.PROCESS_ONCE,
- FilePathFilter.createDefaultFilter(), -1);
+ FileProcessingMode.PROCESS_ONCE, -1);
} else {
source = createInput(inputFormat, typeInfo, "Custom Source");
}
@@ -1270,14 +1308,12 @@ public abstract class StreamExecutionEnvironment {
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
- FilePathFilter pathFilter,
long interval) {
Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
- Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function.");
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
@@ -1286,7 +1322,7 @@ public abstract class StreamExecutionEnvironment {
ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
inputFormat, inputFormat.getFilePath().toString(),
- pathFilter, monitoringMode, getParallelism(), interval);
+ monitoringMode, getParallelism(), interval);
ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat);
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8ff4a2a..d36daab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
@@ -81,15 +82,13 @@ public class ContinuousFileMonitoringFunction<OUT>
private Long globalModificationTime;
- private FilePathFilter pathFilter;
-
private transient Object checkpointLock;
private volatile boolean isRunning = true;
public ContinuousFileMonitoringFunction(
FileInputFormat<OUT> format, String path,
- FilePathFilter filter, FileProcessingMode watchType,
+ FileProcessingMode watchType,
int readerParallelism, long interval) {
if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) {
@@ -98,7 +97,6 @@ public class ContinuousFileMonitoringFunction<OUT>
}
this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
- this.pathFilter = Preconditions.checkNotNull(filter, "Unspecified File Path Filter.");
this.interval = interval;
this.watchType = watchType;
@@ -274,7 +272,7 @@ public class ContinuousFileMonitoringFunction<OUT>
*/
private boolean shouldIgnore(Path filePath, long modificationTime) {
assert (Thread.holdsLock(checkpointLock));
- boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
+ boolean shouldIgnore = modificationTime <= globalModificationTime;
if (shouldIgnore) {
LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
deleted file mode 100644
index 1a359ab..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * An interface to be implemented by the user when using the {@link ContinuousFileMonitoringFunction}.
- * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
- * processing or not. This can serve to exclude temporary or partial files that
- * are still being written.
- */
-@PublicEvolving
-public abstract class FilePathFilter implements Serializable {
-
- public static FilePathFilter createDefaultFilter() {
- return new DefaultFilter();
- }
- /**
- * Returns {@code true} if the {@code filePath} given is to be
- * ignored when processing a directory, e.g.
- * <pre>
- * {@code
- *
- * public boolean filterPaths(Path filePath) {
- * return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
- * }
- * }</pre>
- */
- public abstract boolean filterPath(Path filePath);
-
- /**
- * The default file path filtering method and is used
- * if no other such function is provided. This filter leaves out
- * files starting with ".", "_", and "_COPYING_".
- */
- public static class DefaultFilter extends FilePathFilter {
-
- DefaultFilter() {}
-
- @Override
- public boolean filterPath(Path filePath) {
- return filePath == null ||
- filePath.getName().startsWith(".") ||
- filePath.getName().startsWith("_") ||
- filePath.getName().contains("_COPYING_");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f6dab1e..9cb36a5 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
-import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
+import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -467,6 +467,40 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Reads the contents of the user-specified path based on the given [[FileInputFormat]].
+ * Depending on the provided [[FileProcessingMode]].
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
+ * "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e. monitor path and react
+ * to new data, or process once and exit
+ * @param interval
+ * In the case of periodic path monitoring, this specifies the interval (in millis)
+ * between consecutive path scans
+ * @param filter
+ * The files to be excluded from the processing
+ * @return The data stream that represents the data read from the given file
+ *
+ * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
+ * {@link StreamExecutionEnvironment#readFile(FileInputFormat,
+ * String, FileProcessingMode, long)}
+ */
+ @PublicEvolving
+ @Deprecated
+ def readFile[T: TypeInformation](
+ inputFormat: FileInputFormat[T],
+ filePath: String,
+ watchType: FileProcessingMode,
+ interval: Long,
+ filter: FilePathFilter): DataStream[T] = {
+ asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter))
+ }
+
+ /**
+ * Reads the contents of the user-specified path based on the given [[FileInputFormat]].
* Depending on the provided [[FileProcessingMode]], the source
* may periodically monitor (every `interval` ms) the path for new data
* ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
@@ -496,8 +530,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis)
* between consecutive path scans
- * @param filter
- * The files to be excluded from the processing
* @return The data stream that represents the data read from the given file
*/
@PublicEvolving
@@ -505,10 +537,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode,
- interval: Long,
- filter: FilePathFilter): DataStream[T] = {
+ interval: Long): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
- asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter, typeInfo))
+ asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index d540a92..a265c0a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
@@ -112,8 +112,9 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
// create the monitoring source along with the necessary readers.
TestingSinkFunction sink = new TestingSinkFunction();
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream = env.readFile(format, localFsURI,
- FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, FilePathFilter.createDefaultFilter());
+ FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override