You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/27 15:08:16 UTC
[2/2] flink git commit: [FLINK-5612] [code] Make GlobPathFilter
serializable
[FLINK-5612] [code] Make GlobPathFilter serializable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef96054d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef96054d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef96054d
Branch: refs/heads/master
Commit: ef96054d18ca1fb35f933fd0c715bb1ecc39b8c9
Parents: fc597f6
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Wed Jan 25 09:24:21 2017 +0000
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 27 16:03:13 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/io/FileInputFormat.java | 2 +-
.../flink/api/common/io/GlobFilePathFilter.java | 35 +++++++++++---
.../api/common/io/GlobFilePathFilterTest.java | 50 +++++++++++++++-----
3 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 785fb3b..4e81dab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -481,7 +481,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
totalLength += pathFile.getLen();
}
// returns if unsplittable
- if(unsplittable) {
+ if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
index 0ee6f03..748ed28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
@@ -52,8 +53,13 @@ public class GlobFilePathFilter extends FilePathFilter {
private static final long serialVersionUID = 1L;
- private final ArrayList<PathMatcher> includeMatchers;
- private final ArrayList<PathMatcher> excludeMatchers;
+ private final List<String> includePatterns;
+ private final List<String> excludePatterns;
+
+ // Path matchers are not serializable so we are delaying their
+ // creation until they are used
+ private transient ArrayList<PathMatcher> includeMatchers;
+ private transient ArrayList<PathMatcher> excludeMatchers;
/**
* Constructor for GlobFilePathFilter that will match all files
@@ -69,8 +75,8 @@ public class GlobFilePathFilter extends FilePathFilter {
* @param excludePatterns glob patterns for files to exclude
*/
public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
- includeMatchers = buildPatterns(includePatterns);
- excludeMatchers = buildPatterns(excludePatterns);
+ this.includePatterns = Preconditions.checkNotNull(includePatterns);
+ this.excludePatterns = Preconditions.checkNotNull(excludePatterns);
}
private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {
@@ -86,7 +92,7 @@ public class GlobFilePathFilter extends FilePathFilter {
@Override
public boolean filterPath(Path filePath) {
- if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+ if (getIncludeMatchers().isEmpty() && getExcludeMatchers().isEmpty()) {
return false;
}
@@ -97,7 +103,7 @@ public class GlobFilePathFilter extends FilePathFilter {
final java.nio.file.Path nioPath = Paths.get(path);
- for (PathMatcher matcher : includeMatchers) {
+ for (PathMatcher matcher : getIncludeMatchers()) {
if (matcher.matches(nioPath)) {
return shouldExclude(nioPath);
}
@@ -106,12 +112,27 @@ public class GlobFilePathFilter extends FilePathFilter {
return true;
}
+ private ArrayList<PathMatcher> getIncludeMatchers() {
+ if (includeMatchers == null) {
+ includeMatchers = buildPatterns(includePatterns);
+ }
+ return includeMatchers;
+ }
+
+ private ArrayList<PathMatcher> getExcludeMatchers() {
+ if (excludeMatchers == null) {
+ excludeMatchers = buildPatterns(excludePatterns);
+ }
+ return excludeMatchers;
+ }
+
private boolean shouldExclude(java.nio.file.Path nioPath) {
- for (PathMatcher matcher : excludeMatchers) {
+ for (PathMatcher matcher : getExcludeMatchers()) {
if (matcher.matches(nioPath)) {
return true;
}
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
index bced076..c9f8da4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.api.common.io;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertFalse;
@@ -27,13 +29,13 @@ import static org.junit.Assert.assertTrue;
public class GlobFilePathFilterTest {
@Test
- public void defaultConstructorCreateMatchAllFilter() {
+ public void testDefaultConstructorCreateMatchAllFilter() {
GlobFilePathFilter matcher = new GlobFilePathFilter();
assertFalse(matcher.filterPath(new Path("dir/file.txt")));
}
@Test
- public void matchAllFilesByDefault() {
+ public void testMatchAllFilesByDefault() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.<String>emptyList(),
Collections.<String>emptyList());
@@ -42,7 +44,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeFilesNotInIncludePatterns() {
+ public void testExcludeFilesNotInIncludePatterns() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/*"),
Collections.<String>emptyList());
@@ -52,7 +54,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeFilesIfMatchesExclude() {
+ public void testExcludeFilesIfMatchesExclude() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/*"),
Collections.singletonList("dir/file.txt"));
@@ -61,7 +63,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void includeFileWithAnyCharacterMatcher() {
+ public void testIncludeFileWithAnyCharacterMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/?.txt"),
Collections.<String>emptyList());
@@ -71,7 +73,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void includeFileWithCharacterSetMatcher() {
+ public void testIncludeFileWithCharacterSetMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/[acd].txt"),
Collections.<String>emptyList());
@@ -83,7 +85,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void includeFileWithCharacterRangeMatcher() {
+ public void testIncludeFileWithCharacterRangeMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/[a-d].txt"),
Collections.<String>emptyList());
@@ -96,7 +98,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeHDFSFile() {
+ public void testExcludeHDFSFile() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("/dir/file2.txt"));
@@ -107,7 +109,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeFilenameWithStart() {
+ public void testExcludeFilenameWithStart() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("\\*"));
@@ -118,7 +120,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void singleStarPattern() {
+ public void testSingleStarPattern() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("*"),
Collections.<String>emptyList());
@@ -129,7 +131,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void doubleStarPattern() {
+ public void testDoubleStarPattern() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.<String>emptyList());
@@ -138,4 +140,30 @@ public class GlobFilePathFilterTest {
assertFalse(matcher.filterPath(new Path("a/b")));
assertFalse(matcher.filterPath(new Path("a/b/c")));
}
+
+ @Test(expected = NullPointerException.class)
+ public void testIncluePatternIsNull() {
+ new GlobFilePathFilter(
+ null,
+ Collections.<String>emptyList());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testExcludePatternIsNull() {
+ new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ null);
+ }
+
+ @Test
+ public void testGlobFilterSerializable() throws IOException {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Collections.<String>emptyList());
+
+ GlobFilePathFilter matcherCopy = CommonTestUtils.createCopySerializable(matcher);
+ assertFalse(matcher.filterPath(new Path("a")));
+ assertFalse(matcher.filterPath(new Path("a/b")));
+ assertFalse(matcher.filterPath(new Path("a/b/c")));
+ }
}