You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/27 15:08:15 UTC

[1/2] flink git commit: [hotfix] [tests] Speed up StreamCheckpointNotifierITCase

Repository: flink
Updated Branches:
  refs/heads/master 4fbc6316f -> ef96054d1


[hotfix] [tests] Speed up StreamCheckpointNotifierITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc597f6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc597f6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc597f6f

Branch: refs/heads/master
Commit: fc597f6f516c1d4248b7827853932cd4b2a7cb51
Parents: 4fbc631
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 26 17:03:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 27 16:02:37 2017 +0100

----------------------------------------------------------------------
 .../StreamCheckpointNotifierITCase.java         | 62 ++++----------------
 1 file changed, 13 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc597f6f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index be3fac5..d76d674 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -21,10 +21,9 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,12 +34,11 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +50,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
@@ -73,45 +72,11 @@ import static org.junit.Assert.fail;
  * successfully completed checkpoint.
  */
 @SuppressWarnings("serial")
-public class StreamCheckpointNotifierITCase extends TestLogger {
+public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTestBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
-	
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	private static LocalFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
-			cluster = new LocalFlinkMiniCluster(config, false);
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
 
-	@AfterClass
-	public static void stopCluster() {
-		try {
-			cluster.stop();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
+	private static final int PARALLELISM = 4;
 
 	/**
 	 * Runs the following program:
@@ -123,18 +88,17 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 	@Test
 	public void testProgram() {
 		try {
-			StreamExecutionEnvironment env = 
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-			
-			env.setParallelism(PARALLELISM);
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			assertEquals("test setup broken", PARALLELISM, env.getParallelism());
+
 			env.enableCheckpointing(500);
-			env.getConfig().disableSysoutLogging();
-			
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
 			final int numElements = 10000;
 			final int numTaskTotal = PARALLELISM * 5; 
-			
+
 			DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
-	
+
 			stream
 					// -------------- first vertex, chained to the src ----------------
 					.filter(new LongRichFilterFunction())


[2/2] flink git commit: [FLINK-5612] [code] Make GlobPathFilter serializable

Posted by se...@apache.org.
[FLINK-5612] [code] Make GlobPathFilter serializable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef96054d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef96054d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef96054d

Branch: refs/heads/master
Commit: ef96054d18ca1fb35f933fd0c715bb1ecc39b8c9
Parents: fc597f6
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Wed Jan 25 09:24:21 2017 +0000
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 27 16:03:13 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileInputFormat.java    |  2 +-
 .../flink/api/common/io/GlobFilePathFilter.java | 35 +++++++++++---
 .../api/common/io/GlobFilePathFilterTest.java   | 50 +++++++++++++++-----
 3 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 785fb3b..4e81dab 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -481,7 +481,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 			totalLength += pathFile.getLen();
 		}
 		// returns if unsplittable
-		if(unsplittable) {
+		if (unsplittable) {
 			int splitNum = 0;
 			for (final FileStatus file : files) {
 				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());

http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
index 0ee6f03..748ed28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
 
 import java.nio.file.FileSystem;
 import java.nio.file.FileSystems;
@@ -52,8 +53,13 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 	private static final long serialVersionUID = 1L;
 
-	private final ArrayList<PathMatcher> includeMatchers;
-	private final ArrayList<PathMatcher> excludeMatchers;
+	private final List<String> includePatterns;
+	private final List<String> excludePatterns;
+
+	// Path matchers are not serializable so we are delaying their
+	// creation until they are used
+	private transient ArrayList<PathMatcher> includeMatchers;
+	private transient ArrayList<PathMatcher> excludeMatchers;
 
 	/**
 	 * Constructor for GlobFilePathFilter that will match all files
@@ -69,8 +75,8 @@ public class GlobFilePathFilter extends FilePathFilter {
 	 * @param excludePatterns glob patterns for files to exclude
 	 */
 	public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) {
-		includeMatchers = buildPatterns(includePatterns);
-		excludeMatchers = buildPatterns(excludePatterns);
+		this.includePatterns = Preconditions.checkNotNull(includePatterns);
+		this.excludePatterns = Preconditions.checkNotNull(excludePatterns);
 	}
 
 	private ArrayList<PathMatcher> buildPatterns(List<String> patterns) {
@@ -86,7 +92,7 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 	@Override
 	public boolean filterPath(Path filePath) {
-		if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+		if (getIncludeMatchers().isEmpty() && getExcludeMatchers().isEmpty()) {
 			return false;
 		}
 
@@ -97,7 +103,7 @@ public class GlobFilePathFilter extends FilePathFilter {
 
 		final java.nio.file.Path nioPath = Paths.get(path);
 
-		for (PathMatcher matcher : includeMatchers) {
+		for (PathMatcher matcher : getIncludeMatchers()) {
 			if (matcher.matches(nioPath)) {
 				return shouldExclude(nioPath);
 			}
@@ -106,12 +112,27 @@ public class GlobFilePathFilter extends FilePathFilter {
 		return true;
 	}
 
+	private ArrayList<PathMatcher> getIncludeMatchers() {
+		if (includeMatchers == null) {
+			includeMatchers = buildPatterns(includePatterns);
+		}
+		return includeMatchers;
+	}
+
+	private ArrayList<PathMatcher> getExcludeMatchers() {
+		if (excludeMatchers == null) {
+			excludeMatchers = buildPatterns(excludePatterns);
+		}
+		return excludeMatchers;
+	}
+
 	private boolean shouldExclude(java.nio.file.Path nioPath) {
-		for (PathMatcher matcher : excludeMatchers) {
+		for (PathMatcher matcher : getExcludeMatchers()) {
 			if (matcher.matches(nioPath)) {
 				return true;
 			}
 		}
 		return false;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef96054d/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
index bced076..c9f8da4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -18,8 +18,10 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 
 import static org.junit.Assert.assertFalse;
@@ -27,13 +29,13 @@ import static org.junit.Assert.assertTrue;
 
 public class GlobFilePathFilterTest {
 	@Test
-	public void defaultConstructorCreateMatchAllFilter() {
+	public void testDefaultConstructorCreateMatchAllFilter() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter();
 		assertFalse(matcher.filterPath(new Path("dir/file.txt")));
 	}
 
 	@Test
-	public void matchAllFilesByDefault() {
+	public void testMatchAllFilesByDefault() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.<String>emptyList(),
 			Collections.<String>emptyList());
@@ -42,7 +44,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeFilesNotInIncludePatterns() {
+	public void testExcludeFilesNotInIncludePatterns() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/*"),
 			Collections.<String>emptyList());
@@ -52,7 +54,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeFilesIfMatchesExclude() {
+	public void testExcludeFilesIfMatchesExclude() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/*"),
 			Collections.singletonList("dir/file.txt"));
@@ -61,7 +63,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void includeFileWithAnyCharacterMatcher() {
+	public void testIncludeFileWithAnyCharacterMatcher() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/?.txt"),
 			Collections.<String>emptyList());
@@ -71,7 +73,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void includeFileWithCharacterSetMatcher() {
+	public void testIncludeFileWithCharacterSetMatcher() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/[acd].txt"),
 			Collections.<String>emptyList());
@@ -83,7 +85,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void includeFileWithCharacterRangeMatcher() {
+	public void testIncludeFileWithCharacterRangeMatcher() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("dir/[a-d].txt"),
 			Collections.<String>emptyList());
@@ -96,7 +98,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeHDFSFile() {
+	public void testExcludeHDFSFile() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("**"),
 			Collections.singletonList("/dir/file2.txt"));
@@ -107,7 +109,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void excludeFilenameWithStart() {
+	public void testExcludeFilenameWithStart() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("**"),
 			Collections.singletonList("\\*"));
@@ -118,7 +120,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void singleStarPattern() {
+	public void testSingleStarPattern() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("*"),
 			Collections.<String>emptyList());
@@ -129,7 +131,7 @@ public class GlobFilePathFilterTest {
 	}
 
 	@Test
-	public void doubleStarPattern() {
+	public void testDoubleStarPattern() {
 		GlobFilePathFilter matcher = new GlobFilePathFilter(
 			Collections.singletonList("**"),
 			Collections.<String>emptyList());
@@ -138,4 +140,30 @@ public class GlobFilePathFilterTest {
 		assertFalse(matcher.filterPath(new Path("a/b")));
 		assertFalse(matcher.filterPath(new Path("a/b/c")));
 	}
+
+	@Test(expected = NullPointerException.class)
+	public void testIncluePatternIsNull() {
+		new GlobFilePathFilter(
+			null,
+			Collections.<String>emptyList());
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void testExcludePatternIsNull() {
+		new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			null);
+	}
+
+	@Test
+	public void testGlobFilterSerializable() throws IOException {
+		GlobFilePathFilter matcher = new GlobFilePathFilter(
+			Collections.singletonList("**"),
+			Collections.<String>emptyList());
+
+		GlobFilePathFilter matcherCopy = CommonTestUtils.createCopySerializable(matcher);
+		assertFalse(matcher.filterPath(new Path("a")));
+		assertFalse(matcher.filterPath(new Path("a/b")));
+		assertFalse(matcher.filterPath(new Path("a/b/c")));
+	}
 }