You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/27 15:08:16 UTC

[2/2] flink git commit: [FLINK-5612] [code] Make GlobPathFilter serializable

[FLINK-5612] [code] Make GlobPathFilter serializable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef96054d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef96054d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef96054d

Branch: refs/heads/master
Commit: ef96054d18ca1fb35f933fd0c715bb1ecc39b8c9
Parents: fc597f6
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Wed Jan 25 09:24:21 2017 +0000
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 27 16:03:13 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileInputFormat.java    |  2 +-
 .../flink/api/common/io/GlobFilePathFilter.java | 35 +++++++++++---
 .../api/common/io/GlobFilePathFilterTest.java   | 50 +++++++++++++++-----
 3 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 785fb3b..4e81dab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -481,7 +481,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 			totalLength += pathFile.getLen();
 		}
 		// returns if unsplittable
-		if(unsplittable) {
+		if (unsplittable) {
 			int splitNum = 0;
 			for (final FileStatus file : files) {
 				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());

http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
index 0ee6f03..748ed28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
 
 import java.nio.file.FileSystem;
 import java.nio.file.FileSystems;
@@ -52,8 +53,13 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 	private static final long serialVersionUID = 1L;
 
-	private final ArrayList<PathMatcher> includeMatchers;
-	private final ArrayList<PathMatcher> excludeMatchers;
+	private final List<String> includePatterns;
+	private final List<String> excludePatterns;
+
+	// Path matchers are not serializable so we are delaying their
+	// creation until they are used
+	private transient ArrayList<PathMatcher> includeMatchers;
+	private transient ArrayList<PathMatcher> excludeMatchers;
 
 	/**
 	 * Constructor for GlobFilePathFilter that will match all files
@@ -69,8 +75,8 @@ public class GlobFilePathFilter extends FilePathFilter {
 	 * @param excludePatterns glob patterns for files to exclude
 	 */
 	public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
-		includeMatchers = buildPatterns(includePatterns);
-		excludeMatchers = buildPatterns(excludePatterns);
+		this.includePatterns = Preconditions.checkNotNull(includePatterns);
+		this.excludePatterns = Preconditions.checkNotNull(excludePatterns);
 	}
 
 	private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {
@@ -86,7 +92,7 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 	@Override
 	public boolean filterPath(Path filePath) {
-		if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+		if (getIncludeMatchers().isEmpty() && getExcludeMatchers().isEmpty()) {
 			return false;
 		}
 
@@ -97,7 +103,7 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 		final java.nio.file.Path nioPath = Paths.get(path);
 
-		for (PathMatcher matcher : includeMatchers) {
+		for (PathMatcher matcher : getIncludeMatchers()) {
 			if (matcher.matches(nioPath)) {
 				return shouldExclude(nioPath);
 			}
@@ -106,12 +112,27 @@ public class GlobFilePathFilter extends FilePathFilter {
 		return true;
 	}
 
+	private ArrayList<PathMatcher> getIncludeMatchers() {
+		if (includeMatchers == null) {
+			includeMatchers = buildPatterns(includePatterns);
+		}
+		return includeMatchers;
+	}
+
+	private ArrayList<PathMatcher> getExcludeMatchers() {
+		if (excludeMatchers == null) {
+			excludeMatchers = buildPatterns(excludePatterns);
+		}
+		return excludeMatchers;
+	}
+
 	private boolean shouldExclude(java.nio.file.Path nioPath) {
-		for (PathMatcher matcher : excludeMatchers) {
+		for (PathMatcher matcher : getExcludeMatchers()) {
 			if (matcher.matches(nioPath)) {
 				return true;
 			}
 		}
 		return false;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
index bced076..c9f8da4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -18,8 +18,10 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 
 import static org.junit.Assert.assertFalse;
@@ -27,13 +29,13 @@ import static org.junit.Assert.assertTrue;
 
 public class GlobFilePathFilterTest {
 	@Test
-	public void defaultConstructorCreateMatchAllFilter() {
+	public void testDefaultConstructorCreateMatchAllFilter() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter();
 		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
 	}
 
 	@Test
-	public void matchAllFilesByDefault() {
+	public void testMatchAllFilesByDefault() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.<String>emptyList(),
 			Collections.<String>emptyList());
@@ -42,7 +44,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeFilesNotInIncludePatterns() {
+	public void testExcludeFilesNotInIncludePatterns() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/*"),
 			Collections.<String>emptyList());
@@ -52,7 +54,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeFilesIfMatchesExclude() {
+	public void testExcludeFilesIfMatchesExclude() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/*"),
 			Collections.singletonList("dir/file.txt"));
@@ -61,7 +63,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void includeFileWithAnyCharacterMatcher() {
+	public void testIncludeFileWithAnyCharacterMatcher() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/?.txt"),
 			Collections.<String>emptyList());
@@ -71,7 +73,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void includeFileWithCharacterSetMatcher() {
+	public void testIncludeFileWithCharacterSetMatcher() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/[acd].txt"),
 			Collections.<String>emptyList());
@@ -83,7 +85,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void includeFileWithCharacterRangeMatcher() {
+	public void testIncludeFileWithCharacterRangeMatcher() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/[a-d].txt"),
 			Collections.<String>emptyList());
@@ -96,7 +98,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeHDFSFile() {
+	public void testExcludeHDFSFile() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("**"),
 			Collections.singletonList("/dir/file2.txt"));
@@ -107,7 +109,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeFilenameWithStart() {
+	public void testExcludeFilenameWithStart() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("**"),
 			Collections.singletonList("\\*"));
@@ -118,7 +120,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void singleStarPattern() {
+	public void testSingleStarPattern() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("*"),
 			Collections.<String>emptyList());
@@ -129,7 +131,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void doubleStarPattern() {
+	public void testDoubleStarPattern() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("**"),
 			Collections.<String>emptyList());
@@ -138,4 +140,30 @@ public class GlobFilePathFilterTest {
 		assertFalse(matcher.filterPath(new Path("a/b")));
 		assertFalse(matcher.filterPath(new Path("a/b/c")));
 	}
+
+	@Test(expected = NullPointerException.class)
+	public void testIncluePatternIsNull() {
+		new GlobFilePathFilter(
+			null,
+			Collections.<String>emptyList());
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void testExcludePatternIsNull() {
+		new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			null);
+	}
+
+	@Test
+	public void testGlobFilterSerializable() throws IOException {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			Collections.<String>emptyList());
+
+		GlobFilePathFilter matcherCopy = CommonTestUtils.createCopySerializable(matcher);
+		assertFalse(matcher.filterPath(new Path("a")));
+		assertFalse(matcher.filterPath(new Path("a/b")));
+		assertFalse(matcher.filterPath(new Path("a/b/c")));
+	}
 }