You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/27 15:08:15 UTC
[1/2] flink git commit: [hotfix] [tests] Speed up
StreamCheckpointNotifierITCase
Repository: flink
Updated Branches:
refs/heads/master 4fbc6316f -> ef96054d1
[hotfix] [tests] Speed up StreamCheckpointNotifierITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc597f6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc597f6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc597f6f
Branch: refs/heads/master
Commit: fc597f6f516c1d4248b7827853932cd4b2a7cb51
Parents: 4fbc631
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 26 17:03:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 27 16:02:37 2017 +0100
----------------------------------------------------------------------
.../StreamCheckpointNotifierITCase.java | 62 ++++----------------
1 file changed, 13 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc597f6f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index be3fac5..d76d674 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -21,10 +21,9 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,12 +34,11 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +50,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -73,45 +72,11 @@ import static org.junit.Assert.fail;
* successfully completed checkpoint.
*/
@SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase extends TestLogger {
+public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_TASK_SLOTS = 3;
- private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
- private static LocalFlinkMiniCluster cluster;
-
- @BeforeClass
- public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
- config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
- }
- @AfterClass
- public static void stopCluster() {
- try {
- cluster.stop();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
- }
+ private static final int PARALLELISM = 4;
/**
* Runs the following program:
@@ -123,18 +88,17 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
@Test
public void testProgram() {
try {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-
- env.setParallelism(PARALLELISM);
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ assertEquals("test setup broken", PARALLELISM, env.getParallelism());
+
env.enableCheckpointing(500);
- env.getConfig().disableSysoutLogging();
-
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
final int numElements = 10000;
final int numTaskTotal = PARALLELISM * 5;
-
+
DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
-
+
stream
// -------------- first vertex, chained to the src ----------------
.filter(new LongRichFilterFunction())
[2/2] flink git commit: [FLINK-5612] [code] Make GlobPathFilter
serializable
Posted by se...@apache.org.
[FLINK-5612] [code] Make GlobPathFilter serializable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef96054d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef96054d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef96054d
Branch: refs/heads/master
Commit: ef96054d18ca1fb35f933fd0c715bb1ecc39b8c9
Parents: fc597f6
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Wed Jan 25 09:24:21 2017 +0000
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 27 16:03:13 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/io/FileInputFormat.java | 2 +-
.../flink/api/common/io/GlobFilePathFilter.java | 35 +++++++++++---
.../api/common/io/GlobFilePathFilterTest.java | 50 +++++++++++++++-----
3 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 785fb3b..4e81dab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -481,7 +481,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
totalLength += pathFile.getLen();
}
// returns if unsplittable
- if(unsplittable) {
+ if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
index 0ee6f03..748ed28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
@@ -52,8 +53,13 @@ public class GlobFilePathFilter extends FilePathFilter {
private static final long serialVersionUID = 1L;
- private final ArrayList<PathMatcher> includeMatchers;
- private final ArrayList<PathMatcher> excludeMatchers;
+ private final List<String> includePatterns;
+ private final List<String> excludePatterns;
+
+ // Path matchers are not serializable so we are delaying their
+ // creation until they are used
+ private transient ArrayList<PathMatcher> includeMatchers;
+ private transient ArrayList<PathMatcher> excludeMatchers;
/**
* Constructor for GlobFilePathFilter that will match all files
@@ -69,8 +75,8 @@ public class GlobFilePathFilter extends FilePathFilter {
* @param excludePatterns glob patterns for files to exclude
*/
public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
- includeMatchers = buildPatterns(includePatterns);
- excludeMatchers = buildPatterns(excludePatterns);
+ this.includePatterns = Preconditions.checkNotNull(includePatterns);
+ this.excludePatterns = Preconditions.checkNotNull(excludePatterns);
}
private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {
@@ -86,7 +92,7 @@ public class GlobFilePathFilter extends FilePathFilter {
@Override
public boolean filterPath(Path filePath) {
- if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+ if (getIncludeMatchers().isEmpty() && getExcludeMatchers().isEmpty()) {
return false;
}
@@ -97,7 +103,7 @@ public class GlobFilePathFilter extends FilePathFilter {
final java.nio.file.Path nioPath = Paths.get(path);
- for (PathMatcher matcher : includeMatchers) {
+ for (PathMatcher matcher : getIncludeMatchers()) {
if (matcher.matches(nioPath)) {
return shouldExclude(nioPath);
}
@@ -106,12 +112,27 @@ public class GlobFilePathFilter extends FilePathFilter {
return true;
}
+ private ArrayList<PathMatcher> getIncludeMatchers() {
+ if (includeMatchers == null) {
+ includeMatchers = buildPatterns(includePatterns);
+ }
+ return includeMatchers;
+ }
+
+ private ArrayList<PathMatcher> getExcludeMatchers() {
+ if (excludeMatchers == null) {
+ excludeMatchers = buildPatterns(excludePatterns);
+ }
+ return excludeMatchers;
+ }
+
private boolean shouldExclude(java.nio.file.Path nioPath) {
- for (PathMatcher matcher : excludeMatchers) {
+ for (PathMatcher matcher : getExcludeMatchers()) {
if (matcher.matches(nioPath)) {
return true;
}
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
index bced076..c9f8da4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.api.common.io;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertFalse;
@@ -27,13 +29,13 @@ import static org.junit.Assert.assertTrue;
public class GlobFilePathFilterTest {
@Test
- public void defaultConstructorCreateMatchAllFilter() {
+ public void testDefaultConstructorCreateMatchAllFilter() {
GlobFilePathFilter matcher = new GlobFilePathFilter();
assertFalse(matcher.filterPath(new Path("dir/file.txt")));
}
@Test
- public void matchAllFilesByDefault() {
+ public void testMatchAllFilesByDefault() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.<String>emptyList(),
Collections.<String>emptyList());
@@ -42,7 +44,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeFilesNotInIncludePatterns() {
+ public void testExcludeFilesNotInIncludePatterns() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/*"),
Collections.<String>emptyList());
@@ -52,7 +54,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeFilesIfMatchesExclude() {
+ public void testExcludeFilesIfMatchesExclude() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/*"),
Collections.singletonList("dir/file.txt"));
@@ -61,7 +63,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void includeFileWithAnyCharacterMatcher() {
+ public void testIncludeFileWithAnyCharacterMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/?.txt"),
Collections.<String>emptyList());
@@ -71,7 +73,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void includeFileWithCharacterSetMatcher() {
+ public void testIncludeFileWithCharacterSetMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/[acd].txt"),
Collections.<String>emptyList());
@@ -83,7 +85,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void includeFileWithCharacterRangeMatcher() {
+ public void testIncludeFileWithCharacterRangeMatcher() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("dir/[a-d].txt"),
Collections.<String>emptyList());
@@ -96,7 +98,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeHDFSFile() {
+ public void testExcludeHDFSFile() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("/dir/file2.txt"));
@@ -107,7 +109,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void excludeFilenameWithStart() {
+ public void testExcludeFilenameWithStart() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.singletonList("\\*"));
@@ -118,7 +120,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void singleStarPattern() {
+ public void testSingleStarPattern() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("*"),
Collections.<String>emptyList());
@@ -129,7 +131,7 @@ public class GlobFilePathFilterTest {
}
@Test
- public void doubleStarPattern() {
+ public void testDoubleStarPattern() {
GlobFilePathFilter matcher = new GlobFilePathFilter(
Collections.singletonList("**"),
Collections.<String>emptyList());
@@ -138,4 +140,30 @@ public class GlobFilePathFilterTest {
assertFalse(matcher.filterPath(new Path("a/b")));
assertFalse(matcher.filterPath(new Path("a/b/c")));
}
+
+ @Test(expected = NullPointerException.class)
+ public void testIncluePatternIsNull() {
+ new GlobFilePathFilter(
+ null,
+ Collections.<String>emptyList());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testExcludePatternIsNull() {
+ new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ null);
+ }
+
+ @Test
+ public void testGlobFilterSerializable() throws IOException {
+ GlobFilePathFilter matcher = new GlobFilePathFilter(
+ Collections.singletonList("**"),
+ Collections.<String>emptyList());
+
+ GlobFilePathFilter matcherCopy = CommonTestUtils.createCopySerializable(matcher);
+ assertFalse(matcher.filterPath(new Path("a")));
+ assertFalse(matcher.filterPath(new Path("a/b")));
+ assertFalse(matcher.filterPath(new Path("a/b/c")));
+ }
}