You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/13 06:17:29 UTC

[3/5] flink git commit: [FLINK-9563][cep][tests] Migrate CEPITCase to collect()

[FLINK-9563][cep][tests] Migrate CEPITCase to collect()

This closes #6170.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40f9131e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40f9131e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40f9131e

Branch: refs/heads/master
Commit: 40f9131e9136f4f956c59e4c0c837afba8b9bb4d
Parents: ca0fa96
Author: Deepak Sharnma <de...@gmail.com>
Authored: Tue Jun 12 22:41:27 2018 -0400
Committer: zentol <ch...@apache.org>
Committed: Fri Jul 13 06:11:54 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/cep/CEPITCase.java    | 141 ++++++++++---------
 1 file changed, 75 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40f9131e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 9b35788..6d1013c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -35,45 +35,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Either;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * End to end tests of both CEP operators and {@link NFA}.
  */
 @SuppressWarnings("serial")
 public class CEPITCase extends AbstractTestBase {
 
-	private String resultPath;
-	private String expected;
-
-	private String lateEventPath;
-	private String expectedLateEvents;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-		expected = "";
-
-		lateEventPath = tempFolder.newFile().toURI().toString();
-		expectedLateEvents = "";
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
-	}
-
 	/**
 	 * Checks that a certain event sequence is recognized.
 	 *
@@ -133,12 +110,11 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<String> resultList = new ArrayList<>();
 
-		// expected sequence of matching event ids
-		expected = "2,6,8";
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		env.execute();
+		assertEquals(Arrays.asList("2,6,8"), resultList);
 	}
 
 	@Test
@@ -208,12 +184,13 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<String> resultList = new ArrayList<>();
+
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		// the expected sequences of matching event ids
-		expected = "2,2,2\n3,3,3\n42,42,42";
+		resultList.sort(String::compareTo);
 
-		env.execute();
+		assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"), resultList);
 	}
 
 	@Test
@@ -286,12 +263,13 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		);
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<String> resultList = new ArrayList<>();
 
-		// the expected sequence of matching event ids
-		expected = "1,5,4";
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		env.execute();
+		resultList.sort(String::compareTo);
+
+		assertEquals(Arrays.asList("1,5,4"), resultList);
 	}
 
 	@Test
@@ -375,12 +353,13 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		);
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<String> resultList = new ArrayList<>();
+
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		// the expected sequences of matching event ids
-		expected = "1,1,1\n2,2,2";
+		resultList.sort(String::compareTo);
 
-		env.execute();
+		assertEquals(Arrays.asList("1,1,1", "2,2,2"), resultList);
 	}
 
 	@Test
@@ -408,11 +387,11 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<Tuple2<Integer, Integer>> resultList = new ArrayList<>();
 
-		expected = "(0,1)";
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		env.execute();
+		assertEquals(Arrays.asList(new Tuple2<>(0, 1)), resultList);
 	}
 
 	@Test
@@ -431,11 +410,11 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<Integer> resultList = new ArrayList<>();
 
-		expected = "3";
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		env.execute();
+		assertEquals(Arrays.asList(3), resultList);
 	}
 
 	@Test
@@ -512,12 +491,20 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		);
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<Either<String, String>> resultList = new ArrayList<>();
+
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		// the expected sequences of matching event ids
-		expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
+		resultList.sort(Comparator.comparing(either -> either.toString()));
 
-		env.execute();
+		List<Either<String, String>> expected = Arrays.asList(
+			Either.Left.of("1.0"),
+			Either.Left.of("2.0"),
+			Either.Left.of("2.0"),
+			Either.Right.of("2.0,2.0,2.0")
+		);
+
+		assertEquals(expected, resultList);
 	}
 
 	/**
@@ -580,12 +567,22 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<String> resultList = new ArrayList<>();
+
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+		List<String> expected = Arrays.asList(
+			"1,5,6",
+			"1,2,3",
+			"4,5,6",
+			"1,2,6"
+		);
+
+		expected.sort(String::compareTo);
 
-		// expected sequence of matching event ids
-		expected = "1,5,6\n1,2,3\n4,5,6\n1,2,6";
+		resultList.sort(String::compareTo);
 
-		env.execute();
+		assertEquals(expected, resultList);
 	}
 
 	/**
@@ -666,12 +663,20 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		);
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<String> resultList = new ArrayList<>();
 
-		// the expected sequence of matching event ids
-		expected = "1,6,4\n1,5,4";
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
 
-		env.execute();
+		List<String> expected = Arrays.asList(
+			"1,6,4",
+			"1,5,4"
+		);
+
+		expected.sort(String::compareTo);
+
+		resultList.sort(String::compareTo);
+
+		assertEquals(expected, resultList);
 	}
 
 	private static class CustomEventComparator implements EventComparator<Event> {
@@ -708,10 +713,14 @@ public class CEPITCase extends AbstractTestBase {
 			}
 		});
 
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		List<Tuple2<Integer, String>> resultList = new ArrayList<>();
+
+		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+		resultList.sort(Comparator.comparing(tuple2 -> tuple2.toString()));
 
-		expected = "(1,a)\n(3,a)";
+		List<Tuple2<Integer, String>> expected = Arrays.asList(Tuple2.of(1, "a"), Tuple2.of(3, "a"));
 
-		env.execute();
+		assertEquals(expected, resultList);
 	}
 }