You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/13 06:17:29 UTC
[3/5] flink git commit: [FLINK-9563][cep][tests] Migrate CEPITCase to
collect()
[FLINK-9563][cep][tests] Migrate CEPITCase to collect()
This closes #6170.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40f9131e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40f9131e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40f9131e
Branch: refs/heads/master
Commit: 40f9131e9136f4f956c59e4c0c837afba8b9bb4d
Parents: ca0fa96
Author: Deepak Sharnma <de...@gmail.com>
Authored: Tue Jun 12 22:41:27 2018 -0400
Committer: zentol <ch...@apache.org>
Committed: Fri Jul 13 06:11:54 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/cep/CEPITCase.java | 141 ++++++++++---------
1 file changed, 75 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40f9131e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 9b35788..6d1013c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -35,45 +35,22 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Either;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
+
/**
* End to end tests of both CEP operators and {@link NFA}.
*/
@SuppressWarnings("serial")
public class CEPITCase extends AbstractTestBase {
- private String resultPath;
- private String expected;
-
- private String lateEventPath;
- private String expectedLateEvents;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception {
- resultPath = tempFolder.newFile().toURI().toString();
- expected = "";
-
- lateEventPath = tempFolder.newFile().toURI().toString();
- expectedLateEvents = "";
- }
-
- @After
- public void after() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
- }
-
/**
* Checks that a certain event sequence is recognized.
*
@@ -133,12 +110,11 @@ public class CEPITCase extends AbstractTestBase {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<String> resultList = new ArrayList<>();
- // expected sequence of matching event ids
- expected = "2,6,8";
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- env.execute();
+ assertEquals(Arrays.asList("2,6,8"), resultList);
}
@Test
@@ -208,12 +184,13 @@ public class CEPITCase extends AbstractTestBase {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<String> resultList = new ArrayList<>();
+
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- // the expected sequences of matching event ids
- expected = "2,2,2\n3,3,3\n42,42,42";
+ resultList.sort(String::compareTo);
- env.execute();
+ assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"), resultList);
}
@Test
@@ -286,12 +263,13 @@ public class CEPITCase extends AbstractTestBase {
}
);
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<String> resultList = new ArrayList<>();
- // the expected sequence of matching event ids
- expected = "1,5,4";
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- env.execute();
+ resultList.sort(String::compareTo);
+
+ assertEquals(Arrays.asList("1,5,4"), resultList);
}
@Test
@@ -375,12 +353,13 @@ public class CEPITCase extends AbstractTestBase {
}
);
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<String> resultList = new ArrayList<>();
+
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- // the expected sequences of matching event ids
- expected = "1,1,1\n2,2,2";
+ resultList.sort(String::compareTo);
- env.execute();
+ assertEquals(Arrays.asList("1,1,1", "2,2,2"), resultList);
}
@Test
@@ -408,11 +387,11 @@ public class CEPITCase extends AbstractTestBase {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<Tuple2<Integer, Integer>> resultList = new ArrayList<>();
- expected = "(0,1)";
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- env.execute();
+ assertEquals(Arrays.asList(new Tuple2<>(0, 1)), resultList);
}
@Test
@@ -431,11 +410,11 @@ public class CEPITCase extends AbstractTestBase {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<Integer> resultList = new ArrayList<>();
- expected = "3";
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- env.execute();
+ assertEquals(Arrays.asList(3), resultList);
}
@Test
@@ -512,12 +491,20 @@ public class CEPITCase extends AbstractTestBase {
}
);
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<Either<String, String>> resultList = new ArrayList<>();
+
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- // the expected sequences of matching event ids
- expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
+ resultList.sort(Comparator.comparing(either -> either.toString()));
- env.execute();
+ List<Either<String, String>> expected = Arrays.asList(
+ Either.Left.of("1.0"),
+ Either.Left.of("2.0"),
+ Either.Left.of("2.0"),
+ Either.Right.of("2.0,2.0,2.0")
+ );
+
+ assertEquals(expected, resultList);
}
/**
@@ -580,12 +567,22 @@ public class CEPITCase extends AbstractTestBase {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<String> resultList = new ArrayList<>();
+
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+ List<String> expected = Arrays.asList(
+ "1,5,6",
+ "1,2,3",
+ "4,5,6",
+ "1,2,6"
+ );
+
+ expected.sort(String::compareTo);
- // expected sequence of matching event ids
- expected = "1,5,6\n1,2,3\n4,5,6\n1,2,6";
+ resultList.sort(String::compareTo);
- env.execute();
+ assertEquals(expected, resultList);
}
/**
@@ -666,12 +663,20 @@ public class CEPITCase extends AbstractTestBase {
}
);
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<String> resultList = new ArrayList<>();
- // the expected sequence of matching event ids
- expected = "1,6,4\n1,5,4";
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
- env.execute();
+ List<String> expected = Arrays.asList(
+ "1,6,4",
+ "1,5,4"
+ );
+
+ expected.sort(String::compareTo);
+
+ resultList.sort(String::compareTo);
+
+ assertEquals(expected, resultList);
}
private static class CustomEventComparator implements EventComparator<Event> {
@@ -708,10 +713,14 @@ public class CEPITCase extends AbstractTestBase {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ List<Tuple2<Integer, String>> resultList = new ArrayList<>();
+
+ DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+ resultList.sort(Comparator.comparing(tuple2 -> tuple2.toString()));
- expected = "(1,a)\n(3,a)";
+ List<Tuple2<Integer, String>> expected = Arrays.asList(Tuple2.of(1, "a"), Tuple2.of(3, "a"));
- env.execute();
+ assertEquals(expected, resultList);
}
}