You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:49:13 UTC

[70/89] [abbrv] flink git commit: [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns

This closes #2109


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48109104
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48109104
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48109104

Branch: refs/heads/flip-6
Commit: 4810910431e01bf143ae77a6e93a86f2fafbccd0
Parents: 259a3a5
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Tue Jun 14 22:44:19 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Aug 25 16:08:18 2016 +0200

----------------------------------------------------------------------
 flink-core/pom.xml                              |   7 +
 .../flink/api/common/io/FileInputFormat.java    |  20 ++-
 .../flink/api/common/io/FilePathFilter.java     |  69 ++++++++
 .../flink/api/common/io/GlobFilePathFilter.java | 111 ++++++++++++
 .../java/org/apache/flink/core/fs/Path.java     |  10 +-
 .../flink/core/fs/local/LocalFileStatus.java    |   8 +
 .../flink/api/common/io/DefaultFilterTest.java  |  70 ++++++++
 .../api/common/io/FileInputFormatTest.java      | 174 ++++++++++++-------
 .../api/common/io/GlobFilePathFilterTest.java   | 141 +++++++++++++++
 .../ContinuousFileMonitoringFunctionITCase.java |   4 +-
 .../hdfstests/ContinuousFileMonitoringTest.java |  13 +-
 .../environment/StreamExecutionEnvironment.java |  72 ++++++--
 .../ContinuousFileMonitoringFunction.java       |   8 +-
 .../api/functions/source/FilePathFilter.java    |  66 -------
 .../api/scala/StreamExecutionEnvironment.scala  |  43 ++++-
 ...ontinuousFileProcessingCheckpointITCase.java |   5 +-
 16 files changed, 650 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 9e290a0..dcb2599 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -103,6 +103,13 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 72d6061..d0f5166 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +71,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	 * The fraction that the last split may be larger than the others.
 	 */
 	private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-	
+
 	/**
 	 * The timeout (in milliseconds) to wait for a filesystem stream to respond.
 	 */
@@ -218,7 +219,12 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	 * structure is enabled.
 	 */
 	protected boolean enumerateNestedFiles = false;
-	
+
+	/**
+	 * Files filter for determining what files/directories should be included.
+	 */
+	private FilePathFilter filesFilter = new GlobFilePathFilter();
+
 	// --------------------------------------------------------------------------------------------
 	//  Constructors
 	// --------------------------------------------------------------------------------------------	
@@ -332,6 +338,10 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 		return splitLength;
 	}
 
+	public void setFilesFilter(FilePathFilter filesFilter) {
+		this.filesFilter = Preconditions.checkNotNull(filesFilter, "Files filter should not be null");
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Pre-flight: Configuration, Splits, Sampling
 	// --------------------------------------------------------------------------------------------
@@ -625,7 +635,9 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	 */
 	protected boolean acceptFile(FileStatus fileStatus) {
 		final String name = fileStatus.getPath().getName();
-		return !name.startsWith("_") && !name.startsWith(".");
+		return !name.startsWith("_")
+			&& !name.startsWith(".")
+			&& !filesFilter.filterPath(fileStatus.getPath());
 	}
 
 	/**
@@ -735,7 +747,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 			"File Input (unknown file)" :
 			"File Input (" + this.filePath.toString() + ')';
 	}
-	
+
 	// ============================================================================================
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
new file mode 100644
index 0000000..4ab896c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files that
+ * are still being written.
+ */
+@PublicEvolving
+public abstract class FilePathFilter implements Serializable {
+
+	// Name of an unfinished Hadoop file
+	public static final String HADOOP_COPYING = "_COPYING_";
+
+	public static FilePathFilter createDefaultFilter() {
+		return new DefaultFilter();
+	}
+
+	/**
+	 * Returns {@code true} if the {@code filePath} given is to be
+	 * ignored when processing a directory, e.g.
+	 * <pre>
+	 * {@code
+	 *
+	 * public boolean filterPaths(Path filePath) {
+	 *     return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
+	 * }
+	 * }</pre>
+	 */
+	public abstract boolean filterPath(Path filePath);
+
+	/**
+	 * The default file path filtering method and is used
+	 * if no other such function is provided. This filter leaves out
+	 * files starting with ".", "_", and "_COPYING_".
+	 */
+	public static class DefaultFilter extends FilePathFilter {
+
+		DefaultFilter() {}
+
+		@Override
+		public boolean filterPath(Path filePath) {
+			return filePath == null ||
+				filePath.getName().startsWith(".") ||
+				filePath.getName().startsWith("_") ||
+				filePath.getName().contains(HADOOP_COPYING);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
new file mode 100644
index 0000000..4aaf481
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class for determining if a particular file should be included or excluded
+ * based on a set of include and exclude glob filters.
+ *
+ * Glob filter support the following expressions:
+ * <ul>
+ *     <li>* - matches any number of any characters including none</li>
+ *     <li>** - matches any file in all subdirectories</li>
+ *     <li>? - matches any single character</li>
+ *     <li>[abc] - matches one of the characters listed in a brackets</li>
+ *     <li>[a-z] - matches one character from the range given in the brackets</li>
+ * </ul>
+ *
+ * <p> If does not match an include pattern it is excluded. If it matches and include
+ * pattern but also matches an exclude pattern it is excluded.
+ *
+ * <p> If no patterns are provided all files are included
+ */
+@Internal
+public class GlobFilePathFilter extends FilePathFilter {
+
+	private static final long serialVersionUID = 1L;
+
+	private final List<PathMatcher> includeMatchers;
+	private final List<PathMatcher> excludeMatchers;
+
+	/**
+	 * Constructor for GlobFilePathFilter that will match all files
+	 */
+	public GlobFilePathFilter() {
+		this(Collections.<String>emptyList(), Collections.<String>emptyList());
+	}
+
+	/**
+	 * Constructor for GlobFilePathFilter
+	 *
+	 * @param includePatterns glob patterns for files to include
+	 * @param excludePatterns glob patterns for files to exclude
+	 */
+	public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
+		includeMatchers = buildPatterns(includePatterns);
+		excludeMatchers = buildPatterns(excludePatterns);
+	}
+
+	private List<PathMatcher> buildPatterns(List<String> patterns) {
+		FileSystem fileSystem = FileSystems.getDefault();
+		List<PathMatcher> matchers = new ArrayList<>();
+
+		for (String patternStr : patterns) {
+			matchers.add(fileSystem.getPathMatcher("glob:" + patternStr));
+		}
+
+		return matchers;
+	}
+
+	@Override
+	public boolean filterPath(Path filePath) {
+		if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+			return false;
+		}
+
+		for (PathMatcher mather : includeMatchers) {
+			if (mather.matches(Paths.get(filePath.getPath()))) {
+				return shouldExclude(filePath);
+			}
+		}
+
+		return true;
+	}
+
+	private boolean shouldExclude(Path filePath) {
+		for (PathMatcher matcher : excludeMatchers) {
+			if (matcher.matches(Paths.get(filePath.getPath()))) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 4c77199..7adfa42 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -145,7 +145,7 @@ public class Path implements IOReadableWritable, Serializable {
 	}
 
 	/**
- 	 * Checks if the provided path string is either null or has zero length and throws
+	 * Checks if the provided path string is either null or has zero length and throws
 	 * a {@link IllegalArgumentException} if any of the two conditions apply.
 	 * In addition, leading and tailing whitespaces are removed.
 	 *
@@ -333,6 +333,14 @@ public class Path implements IOReadableWritable, Serializable {
 	}
 
 	/**
+	 * Return full path.
+	 * @return full path
+	 */
+	public String getPath() {
+		return uri.getPath();
+	}
+
+	/**
 	 * Returns the parent of a path, i.e., everything that precedes the last separator
 	 * or <code>null</code> if at root.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 0aebd75..3e127ff 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -102,4 +102,12 @@ public class LocalFileStatus implements FileStatus {
 	public File getFile() {
 		return this.file;
 	}
+
+	@Override
+	public String toString() {
+		return "LocalFileStatus{" +
+			"file=" + file +
+			", path=" + path +
+			'}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
new file mode 100644
index 0000000..6956518
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+	@Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+			{"file.txt",			false},
+
+			{".file.txt",			true},
+			{"dir/.file.txt",		true},
+			{".dir/file.txt",		false},
+
+			{"_file.txt",			true},
+			{"dir/_file.txt",		true},
+			{"_dir/file.txt",		false},
+
+			// Check filtering Hadoop's unfinished files
+			{FilePathFilter.HADOOP_COPYING,			true},
+			{"dir/" + FilePathFilter.HADOOP_COPYING,		true},
+			{FilePathFilter.HADOOP_COPYING + "/file.txt",	false},
+		});
+	}
+
+	private final boolean shouldFilter;
+	private final String filePath;
+
+	public DefaultFilterTest(String filePath, boolean shouldFilter) {
+		this.filePath = filePath;
+		this.shouldFilter = shouldFilter;
+	}
+
+	@Test
+	public void test() {
+		FilePathFilter defaultFilter = FilePathFilter.createDefaultFilter();
+		Path path = new Path(filePath);
+		assertEquals(
+			String.format("File: %s", filePath),
+			shouldFilter,
+			defaultFilter.filterPath(path));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index ae8802b..dcd6583 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
+import com.google.common.collect.Lists;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
@@ -27,16 +28,17 @@ import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.Collections;
 
 import static org.junit.Assert.*;
 
@@ -45,6 +47,9 @@ import static org.junit.Assert.*;
  */
 public class FileInputFormatTest {
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	// ------------------------------------------------------------------------
 	//  Statistics
 	// ------------------------------------------------------------------------
@@ -257,41 +262,21 @@ public class FileInputFormatTest {
 	public void testIgnoredUnderscoreFiles() {
 		try {
 			final String contents = "CONTENTS";
-			
+
 			// create some accepted, some ignored files
-			
-			File tempDir = new File(System.getProperty("java.io.tmpdir"));
-			File f;
-			do {
-				f = new File(tempDir, TestFileUtils.randomFileName(""));
-			}
-			while (f.exists());
 
-			assertTrue(f.mkdirs());
-			f.deleteOnExit();
-			
-			File child1 = new File(f, "dataFile1.txt");
-			File child2 = new File(f, "another_file.bin");
-			File luigiFile = new File(f, "_luigi");
-			File success = new File(f, "_SUCCESS");
-			
-			File[] files = { child1, child2, luigiFile, success };
-			
-			for (File child : files) {
-				child.deleteOnExit();
-			
-				BufferedWriter out = new BufferedWriter(new FileWriter(child));
-				try { 
-					out.write(contents);
-				} finally {
-					out.close();
-				}
-			}
-			
+
+			File child1 = temporaryFolder.newFile("dataFile1.txt");
+			File child2 = temporaryFolder.newFile("another_file.bin");
+			File luigiFile = temporaryFolder.newFile("_luigi");
+			File success = temporaryFolder.newFile("_SUCCESS");
+
+			createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
+
 			// test that only the valid files are accepted
 			
 			final DummyFileInputFormat format = new DummyFileInputFormat();
-			format.setFilePath(f.toURI().toString());
+			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
 			format.configure(new Configuration());
 			FileInputSplit[] splits = format.createInputSplits(1);
 			
@@ -314,43 +299,95 @@ public class FileInputFormatTest {
 	}
 
 	@Test
+	public void testExcludeFiles() {
+		try {
+			final String contents = "CONTENTS";
+
+			// create some accepted, some ignored files
+
+			File child1 = temporaryFolder.newFile("dataFile1.txt");
+			File child2 = temporaryFolder.newFile("another_file.bin");
+
+			File[] files = { child1, child2 };
+
+			createTempFiles(contents.getBytes(), files);
+
+			// test that only the valid files are accepted
+
+			Configuration configuration = new Configuration();
+
+			final DummyFileInputFormat format = new DummyFileInputFormat();
+			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+			format.configure(configuration);
+			format.setFilesFilter(new GlobFilePathFilter(
+				Collections.singletonList("**"),
+				Collections.singletonList("**/another_file.bin")));
+			FileInputSplit[] splits = format.createInputSplits(1);
+
+			Assert.assertEquals(1, splits.length);
+
+			final URI uri1 = splits[0].getPath().toUri();
+
+			final URI childUri1 = child1.toURI();
+
+			Assert.assertEquals(uri1, childUri1);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testReadMultiplePatterns() {
+		try {
+			final String contents = "CONTENTS";
+
+			// create some accepted, some ignored files
+
+			File child1 = temporaryFolder.newFile("dataFile1.txt");
+			File child2 = temporaryFolder.newFile("another_file.bin");
+			createTempFiles(contents.getBytes(), child1, child2);
+
+			// test that only the valid files are accepted
+
+			Configuration configuration = new Configuration();
+
+			final DummyFileInputFormat format = new DummyFileInputFormat();
+			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+			format.configure(configuration);
+			format.setFilesFilter(new GlobFilePathFilter(
+				Collections.singletonList("**"),
+				Lists.newArrayList("**/another_file.bin", "**/dataFile1.txt")
+			));
+			FileInputSplit[] splits = format.createInputSplits(1);
+
+			Assert.assertEquals(0, splits.length);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testGetStatsIgnoredUnderscoreFiles() {
 		try {
-			final long SIZE = 2048;
+			final int SIZE = 2048;
 			final long TOTAL = 2*SIZE;
 
 			// create two accepted and two ignored files
-			File tempDir = new File(System.getProperty("java.io.tmpdir"));
-			File f;
-			do {
-				f = new File(tempDir, TestFileUtils.randomFileName(""));
-			}
-			while (f.exists());
-			
-			assertTrue(f.mkdirs());
-			f.deleteOnExit();
-
-			File child1 = new File(f, "dataFile1.txt");
-			File child2 = new File(f, "another_file.bin");
-			File luigiFile = new File(f, "_luigi");
-			File success = new File(f, "_SUCCESS");
-
-			File[] files = { child1, child2, luigiFile, success };
+			File child1 = temporaryFolder.newFile("dataFile1.txt");
+			File child2 = temporaryFolder.newFile("another_file.bin");
+			File luigiFile = temporaryFolder.newFile("_luigi");
+			File success = temporaryFolder.newFile("_SUCCESS");
 
-			for (File child : files) {
-				child.deleteOnExit();
+			createTempFiles(new byte[SIZE], child1, child2, luigiFile, success);
 
-				BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child));
-				try {
-					for (long bytes = SIZE; bytes > 0; bytes--) {
-						out.write(0);
-					}
-				} finally {
-					out.close();
-				}
-			}
 			final DummyFileInputFormat format = new DummyFileInputFormat();
-			format.setFilePath(f.toURI().toString());
+			format.setFilePath(temporaryFolder.getRoot().toURI().toString());
 			format.configure(new Configuration());
 
 			// check that only valid files are used for statistics computation
@@ -406,7 +443,20 @@ public class FileInputFormatTest {
 	}
 	
 	// ------------------------------------------------------------------------
-	
+
+	private void createTempFiles(byte[] contents, File... files) throws IOException {
+		for (File child : files) {
+			child.deleteOnExit();
+
+			BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child));
+			try {
+				out.write(contents);
+			} finally {
+				out.close();
+			}
+		}
+	}
+
 	private class DummyFileInputFormat extends FileInputFormat<IntValue> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
new file mode 100644
index 0000000..bced076
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+	@Test
+	public void defaultConstructorCreateMatchAllFilter() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter();
+		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+	}
+
+	@Test
+	public void matchAllFilesByDefault() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.<String>emptyList(),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+	}
+
+	@Test
+	public void excludeFilesNotInIncludePatterns() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("dir/*"),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+		assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+	}
+
+	@Test
+	public void excludeFilesIfMatchesExclude() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("dir/*"),
+			Collections.singletonList("dir/file.txt"));
+
+		assertTrue(matcher.filterPath(new Path("dir/file.txt")));
+	}
+
+	@Test
+	public void includeFileWithAnyCharacterMatcher() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("dir/?.txt"),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+		assertTrue(matcher.filterPath(new Path("dir/aa.txt")));
+	}
+
+	@Test
+	public void includeFileWithCharacterSetMatcher() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("dir/[acd].txt"),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+		assertFalse(matcher.filterPath(new Path("dir/c.txt")));
+		assertFalse(matcher.filterPath(new Path("dir/d.txt")));
+		assertTrue(matcher.filterPath(new Path("dir/z.txt")));
+	}
+
+	@Test
+	public void includeFileWithCharacterRangeMatcher() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("dir/[a-d].txt"),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+		assertFalse(matcher.filterPath(new Path("dir/b.txt")));
+		assertFalse(matcher.filterPath(new Path("dir/c.txt")));
+		assertFalse(matcher.filterPath(new Path("dir/d.txt")));
+		assertTrue(matcher.filterPath(new Path("dir/z.txt")));
+	}
+
+	@Test
+	public void excludeHDFSFile() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			Collections.singletonList("/dir/file2.txt"));
+
+		assertFalse(matcher.filterPath(new Path("hdfs:///dir/file1.txt")));
+		assertTrue(matcher.filterPath(new Path("hdfs:///dir/file2.txt")));
+		assertFalse(matcher.filterPath(new Path("hdfs:///dir/file3.txt")));
+	}
+
+	@Test
+	public void excludeFilenameWithStart() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			Collections.singletonList("\\*"));
+
+		assertTrue(matcher.filterPath(new Path("*")));
+		assertFalse(matcher.filterPath(new Path("**")));
+		assertFalse(matcher.filterPath(new Path("other.txt")));
+	}
+
+	@Test
+	public void singleStarPattern() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("*"),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("a")));
+		assertTrue(matcher.filterPath(new Path("a/b")));
+		assertTrue(matcher.filterPath(new Path("a/b/c")));
+	}
+
+	@Test
+	public void doubleStarPattern() {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			Collections.<String>emptyList());
+
+		assertFalse(matcher.filterPath(new Path("a")));
+		assertFalse(matcher.filterPath(new Path("a/b")));
+		assertFalse(matcher.filterPath(new Path("a/b/c")));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index e6cd5d9..663345c 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -122,9 +122,9 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(1);
 
+			format.setFilesFilter(FilePathFilter.createDefaultFilter());
 			ContinuousFileMonitoringFunction<String> monitoringFunction =
 				new ContinuousFileMonitoringFunction<>(format, hdfsURI,
-					FilePathFilter.createDefaultFilter(),
 					FileProcessingMode.PROCESS_CONTINUOUSLY,
 					env.getParallelism(), INTERVAL);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index def9378..4aadaec 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -216,8 +216,9 @@ public class ContinuousFileMonitoringTest {
 		}
 
 		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(new PathFilter());
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI, new PathFilter(),
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
 				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
 		monitoringFunction.open(new Configuration());
@@ -242,8 +243,9 @@ public class ContinuousFileMonitoringTest {
 		fc.start();
 
 		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
 				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
 
 		monitoringFunction.open(new Configuration());
@@ -291,8 +293,9 @@ public class ContinuousFileMonitoringTest {
 		Assert.assertTrue(fc.getFilesCreated().size() >= 1);
 
 		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
 				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
 		monitoringFunction.open(new Configuration());
@@ -427,7 +430,7 @@ public class ContinuousFileMonitoringTest {
 		assert (hdfs != null);
 
 		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-		Assert.assertTrue (!hdfs.exists(file));
+		Assert.assertFalse(hdfs.exists(file));
 
 		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
 		FSDataOutputStream stream = hdfs.create(tmp);

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 1913a36..ead9564 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -52,7 +52,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
@@ -917,11 +917,11 @@ public abstract class StreamExecutionEnvironment {
 		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
 
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 		format.setCharsetName(charsetName);
 
-		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1,
-			FilePathFilter.createDefaultFilter(), typeInfo);
+		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
 	}
 
 	/**
@@ -952,7 +952,52 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
 												String filePath) {
-		return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter());
+		return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
+	}
+
+	/**
+	 *
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
+	 * on the provided {@link FileProcessingMode}.
+	 * <p>
+	 * See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)}
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param watchType
+	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+	 * @param interval
+	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+	 * @param filter
+	 * 		The files to be excluded from the processing
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data read from the given file
+	 *
+	 * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
+	 * 		{@link StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)}
+	 *
+	 */
+	@PublicEvolving
+	@Deprecated
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath,
+												FileProcessingMode watchType,
+												long interval,
+												FilePathFilter filter) {
+		inputFormat.setFilesFilter(filter);
+
+		TypeInformation<OUT> typeInformation;
+		try {
+			typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
+		} catch (Exception e) {
+			throw new InvalidProgramException("The type returned by the input format could not be " +
+				"automatically determined. Please specify the TypeInformation of the produced type " +
+				"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
+		}
+		return readFile(inputFormat, filePath, watchType, interval, typeInformation);
 	}
 
 	/**
@@ -986,8 +1031,6 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
 	 * @param interval
 	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
-	 * @param filter
-	 * 		The files to be excluded from the processing
 	 * @param <OUT>
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data read from the given file
@@ -996,8 +1039,7 @@ public abstract class StreamExecutionEnvironment {
 	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
 												String filePath,
 												FileProcessingMode watchType,
-												long interval,
-												FilePathFilter filter) {
+												long interval) {
 
 		TypeInformation<OUT> typeInformation;
 		try {
@@ -1007,7 +1049,7 @@ public abstract class StreamExecutionEnvironment {
 				"automatically determined. Please specify the TypeInformation of the produced type " +
 				"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
 		}
-		return readFile(inputFormat, filePath, watchType, interval, filter, typeInformation);
+		return readFile(inputFormat, filePath, watchType, interval, typeInformation);
 	}
 
 	/**
@@ -1057,8 +1099,6 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
 	 * @param watchType
 	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
-	 * @param filter
-	 * 		The files to be excluded from the processing
 	 * @param typeInformation
 	 * 		Information on the type of the elements in the output stream
 	 * @param interval
@@ -1072,7 +1112,6 @@ public abstract class StreamExecutionEnvironment {
 												String filePath,
 												FileProcessingMode watchType,
 												long interval,
-												FilePathFilter filter,
 												TypeInformation<OUT> typeInformation) {
 
 		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
@@ -1080,7 +1119,7 @@ public abstract class StreamExecutionEnvironment {
 		Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
 
 		inputFormat.setFilePath(filePath);
-		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter, interval);
+		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
 	}
 
 	/**
@@ -1250,8 +1289,7 @@ public abstract class StreamExecutionEnvironment {
 		if (inputFormat instanceof FileInputFormat) {
 			FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
 			source = createFileInput(format, typeInfo, "Custom File source",
-				FileProcessingMode.PROCESS_ONCE,
-				FilePathFilter.createDefaultFilter(),  -1);
+				FileProcessingMode.PROCESS_ONCE, -1);
 		} else {
 			source = createInput(inputFormat, typeInfo, "Custom Source");
 		}
@@ -1270,14 +1308,12 @@ public abstract class StreamExecutionEnvironment {
 														TypeInformation<OUT> typeInfo,
 														String sourceName,
 														FileProcessingMode monitoringMode,
-														FilePathFilter pathFilter,
 														long interval) {
 
 		Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
 		Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
 		Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
 		Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
-		Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function.");
 
 		Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
 			interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
@@ -1286,7 +1322,7 @@ public abstract class StreamExecutionEnvironment {
 
 		ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
 			inputFormat, inputFormat.getFilePath().toString(),
-			pathFilter, monitoringMode, getParallelism(), interval);
+			monitoringMode, getParallelism(), interval);
 
 		ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8ff4a2a..d36daab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -81,15 +82,13 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	private Long globalModificationTime;
 
-	private FilePathFilter pathFilter;
-
 	private transient Object checkpointLock;
 
 	private volatile boolean isRunning = true;
 
 	public ContinuousFileMonitoringFunction(
 		FileInputFormat<OUT> format, String path,
-		FilePathFilter filter, FileProcessingMode watchType,
+		FileProcessingMode watchType,
 		int readerParallelism, long interval) {
 
 		if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) {
@@ -98,7 +97,6 @@ public class ContinuousFileMonitoringFunction<OUT>
 		}
 		this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
 		this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
-		this.pathFilter = Preconditions.checkNotNull(filter, "Unspecified File Path Filter.");
 
 		this.interval = interval;
 		this.watchType = watchType;
@@ -274,7 +272,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 	 */
 	private boolean shouldIgnore(Path filePath, long modificationTime) {
 		assert (Thread.holdsLock(checkpointLock));
-		boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
+		boolean shouldIgnore = modificationTime <= globalModificationTime;
 		if (shouldIgnore) {
 			LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
deleted file mode 100644
index 1a359ab..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * An interface to be implemented by the user when using the {@link ContinuousFileMonitoringFunction}.
- * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
- * processing or not. This can serve to exclude temporary or partial files that
- * are still being written.
- */
-@PublicEvolving
-public abstract class FilePathFilter implements Serializable {
-
-	public static FilePathFilter createDefaultFilter() {
-		return new DefaultFilter();
-	}
-	/**
-	 * Returns {@code true} if the {@code filePath} given is to be
-	 * ignored when processing a directory, e.g.
-	 * <pre>
-	 * {@code
-	 *
-	 * public boolean filterPaths(Path filePath) {
-	 *     return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
-	 * }
-	 * }</pre>
-	 */
-	public abstract boolean filterPath(Path filePath);
-
-	/**
-	 * The default file path filtering method and is used
-	 * if no other such function is provided. This filter leaves out
-	 * files starting with ".", "_", and "_COPYING_".
-	 */
-	public static class DefaultFilter extends FilePathFilter {
-
-		DefaultFilter() {}
-
-		@Override
-		public boolean filterPath(Path filePath) {
-			return filePath == null ||
-				filePath.getName().startsWith(".") ||
-				filePath.getName().startsWith("_") ||
-				filePath.getName().contains("_COPYING_");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f6dab1e..9cb36a5 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
-import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
+import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -467,6 +467,40 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
     * Reads the contents of the user-specified path based on the given [[FileInputFormat]].
+    * Depending on the provided [[FileProcessingMode]].
+    *
+    * @param inputFormat
+    *          The input format used to create the data stream
+    * @param filePath
+    *          The path of the file, as a URI (e.g., "file:///some/local/file" or
+    *          "hdfs://host:port/file/path")
+    * @param watchType
+    *          The mode in which the source should operate, i.e. monitor path and react
+    *          to new data, or process once and exit
+    * @param interval
+    *          In the case of periodic path monitoring, this specifies the interval (in millis)
+    *          between consecutive path scans
+    * @param filter
+    *          The files to be excluded from the processing
+    * @return The data stream that represents the data read from the given file
+    *
+    * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
+    *         {@link StreamExecutionEnvironment#readFile(FileInputFormat,
+      *              String, FileProcessingMode, long)}
+    */
+  @PublicEvolving
+  @Deprecated
+  def readFile[T: TypeInformation](
+                                    inputFormat: FileInputFormat[T],
+                                    filePath: String,
+                                    watchType: FileProcessingMode,
+                                    interval: Long,
+                                    filter: FilePathFilter): DataStream[T] = {
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter))
+  }
+
+  /**
+    * Reads the contents of the user-specified path based on the given [[FileInputFormat]].
     * Depending on the provided [[FileProcessingMode]], the source
     * may periodically monitor (every `interval` ms) the path for new data
     * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
@@ -496,8 +530,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     * @param interval
     *          In the case of periodic path monitoring, this specifies the interval (in millis)
     *          between consecutive path scans
-    * @param filter
-    *          The files to be excluded from the processing
     * @return The data stream that represents the data read from the given file
     */
   @PublicEvolving
@@ -505,10 +537,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
       inputFormat: FileInputFormat[T],
       filePath: String,
       watchType: FileProcessingMode,
-      interval: Long,
-      filter: FilePathFilter): DataStream[T] = {
+      interval: Long): DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
-    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter, typeInfo))
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index d540a92..a265c0a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
@@ -112,8 +112,9 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 		// create the monitoring source along with the necessary readers.
 		TestingSinkFunction sink = new TestingSinkFunction();
 		TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 		DataStream<String> inputStream = env.readFile(format, localFsURI,
-			FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, FilePathFilter.createDefaultFilter());
+			FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
 
 		inputStream.flatMap(new FlatMapFunction<String, String>() {
 			@Override