You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by deepaks4077 <gi...@git.apache.org> on 2018/06/14 21:05:32 UTC

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

GitHub user deepaks4077 opened a pull request:

    https://github.com/apache/flink/pull/6170

    [FLINK-9563]: Using a custom sink function for tests in CEPITCase instead of writing to disk

    ## What is the purpose of the change
    
    This change modifies the CEPITCase integration test to use a custom sink function to collect and compare test results, instead of writing them to a file. It does not add/remove any constituent tests.
    
    ## Brief change log
    
    - Removed Before and After junit annotations
    - Added a custom sink function with a static arraylist to collect and compare test results
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as CEPITCase, which is the end to end test of the CEP API.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/deepaks4077/flink FLINK-9563

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6170
    
----
commit 8fc629a557af4a56ab7638cc5eb519e163267cdc
Author: Deepak Sharnma <de...@...>
Date:   2018-06-13T02:41:27Z

    [FLINK-9563]: Using a custom sink function for tests in CEPITCase

----


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    @zentol, do you have some time this week to review this? I can proceed with the other tests once I know that this is the right way to handle it :)


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195823948
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -360,27 +345,30 @@ public boolean filter(Event value) throws Exception {
     		});
     
     		DataStream<String> result = CEP.pattern(input, pattern).select(
    -			new PatternSelectFunction<Event, String>() {
    +				new PatternSelectFunction<Event, String>() {
     
    -				@Override
    -				public String select(Map<String, List<Event>> pattern) {
    -					StringBuilder builder = new StringBuilder();
    +					@Override
    --- End diff --
    
    indendation changes that weren't reversed


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r197717301
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -580,12 +563,17 @@ public String select(Map<String, List<Event>> pattern) {
     			}
     		});
     
    -		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +		List<String> resultList = new ArrayList<>();
    +
    +		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
    +
    +		List<String> expected = Arrays.asList("1,5,6\n1,2,3\n4,5,6\n1,2,6".split("\n"));
    --- End diff --
    
    changes this line to work like other tests, inline `Arrays.asList` into the `assertEquals` call, split the list right away instead of with `split()`.


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    alrighty, looks good to me, merging.


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195824130
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -573,19 +577,25 @@ public String select(Map<String, List<Event>> pattern) {
     				StringBuilder builder = new StringBuilder();
     
     				builder.append(pattern.get("start").get(0).getId()).append(",")
    -					.append(pattern.get("middle").get(0).getId()).append(",")
    -					.append(pattern.get("end").get(0).getId());
    +						.append(pattern.get("middle").get(0).getId()).append(",")
    --- End diff --
    
    indendation


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195824580
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -708,10 +724,32 @@ public boolean filter(Tuple2<Integer, String> rec) throws Exception {
     			}
     		});
     
    -		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +		CollectSink.VALUES.clear();
     
    -		expected = "(1,a)\n(3,a)";
    +		result.map(new MapFunction<Tuple2<Integer, String>, String>() {
    +			@Override
    +			public String map(Tuple2<Integer, String> value) throws Exception {
    +				return value.toString();
    --- End diff --
    
    please do not compare as strings. This was only done for simplicity in the previous code so we don't have to re-parse the contents of the text file.
    
    This also applies to other tests.


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    @zentol , please take a look when you get the chance.
    
    I think testSimpleAfterMatchSkip() is failing [here](https://api.travis-ci.org/v3/job/392551505/log.txt) because the program arrived at the assert statement before the result stream was fully added to the sink. This test was passing in the travis ci build of my fork. Maybe we can add a countdownlatch to control for the size of the array list VALUES? A rich sink function would then countdown upon closing. What would you suggest?


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6170


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r197716343
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -658,20 +668,26 @@ public String select(Map<String, List<Event>> pattern) {
     					StringBuilder builder = new StringBuilder();
     
     					builder.append(pattern.get("start").get(0).getId()).append(",")
    -						.append(pattern.get("middle").get(0).getId()).append(",")
    -						.append(pattern.get("end").get(0).getId());
    +							.append(pattern.get("middle").get(0).getId()).append(",")
    --- End diff --
    
    still needs to be reverted


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195824079
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -504,20 +498,30 @@ public String select(Map<String, List<Event>> pattern) {
     					StringBuilder builder = new StringBuilder();
     
     					builder.append(pattern.get("start").get(0).getPrice()).append(",")
    -						.append(pattern.get("middle").get(0).getPrice()).append(",")
    -						.append(pattern.get("end").get(0).getPrice());
    +							.append(pattern.get("middle").get(0).getPrice()).append(",")
    +							.append(pattern.get("end").get(0).getPrice());
    --- End diff --
    
    indendation change, revert like the others


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    Hi @zentol, just a friendly reminder about this pull request :)


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r197716849
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -512,12 +491,16 @@ public String select(Map<String, List<Event>> pattern) {
     			}
     		);
     
    -		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +		List<Either<String, String>> resultList = new ArrayList<>();
    +
    +		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
     
    -		// the expected sequences of matching event ids
    -		expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
    +		resultList.sort(Comparator.comparing(either -> either.toString()));
     
    -		env.execute();
    +		List<Either<String, String>> expected = Arrays.asList(Either.Left.of("1.0"), Either.Left.of("2.0"),
    --- End diff --
    
    put each  element on a new line:
    ```
    Arrays.asList(
    	a,
    	b
    	c
    ```	


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r197716214
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -229,7 +206,7 @@ public void testSimplePatternEventTime() throws Exception {
     			Tuple2.of(new Event(4, "end", 4.0), 10L),
     			Tuple2.of(new Event(5, "middle", 5.0), 7L),
     			// last element for high final watermark
    -			Tuple2.of(new Event(5, "middle", 5.0), 100L)
    +			Tuple2.of(new Event(6, "middle", 5.0), 100L)
    --- End diff --
    
    why is this change necessary?


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195826270
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -708,10 +724,32 @@ public boolean filter(Tuple2<Integer, String> rec) throws Exception {
     			}
     		});
     
    -		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +		CollectSink.VALUES.clear();
     
    -		expected = "(1,a)\n(3,a)";
    +		result.map(new MapFunction<Tuple2<Integer, String>, String>() {
    +			@Override
    +			public String map(Tuple2<Integer, String> value) throws Exception {
    +				return value.toString();
    +			}
    +		}).addSink(new CollectSink());
     
     		env.execute();
    +
    +		CollectSink.VALUES.sort(String::compareTo);
    +
    +		List<String> expected = Arrays.asList("(1,a)\n(3,a)".split("\n"));
    +
    +		assertEquals(expected, CollectSink.VALUES);
     	}
    +
    +	private static class CollectSink implements SinkFunction<String> {
    +
    +		public static final List<String> VALUES = new ArrayList<>();
    +
    +		@Override
    +		public synchronized void invoke(String value) throws Exception {
    --- End diff --
    
    this synchronization may not work correctly if multiple instances of this function exists. Synchronize on `VALUES` instead.


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    env.execute() should block until the job has finished execution, so your analysis may be incorrect.
    
    I can take a deeper look at possible semantic changes, but there are some conceptual issues I'd like to clear up first.


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r197716928
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -512,12 +491,16 @@ public String select(Map<String, List<Event>> pattern) {
     			}
     		);
     
    -		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +		List<Either<String, String>> resultList = new ArrayList<>();
    +
    +		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
     
    -		// the expected sequences of matching event ids
    -		expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
    +		resultList.sort(Comparator.comparing(either -> either.toString()));
     
    -		env.execute();
    +		List<Either<String, String>> expected = Arrays.asList(Either.Left.of("1.0"), Either.Left.of("2.0"),
    +												Either.Left.of("2.0"), Either.Right.of("2.0,2.0,2.0"));
    --- End diff --
    
    indentation should only be 1 tab


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    @zentol , thanks for your comments. I'll take a look soon!


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    @zentol, thanks, will do!


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    @zentol, I've updated the tests and all tests pass now. Please take a look when you get the chance.


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195824665
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -658,20 +668,26 @@ public String select(Map<String, List<Event>> pattern) {
     					StringBuilder builder = new StringBuilder();
     
     					builder.append(pattern.get("start").get(0).getId()).append(",")
    -						.append(pattern.get("middle").get(0).getId()).append(",")
    -						.append(pattern.get("end").get(0).getId());
    +							.append(pattern.get("middle").get(0).getId()).append(",")
    --- End diff --
    
    indentation


---

[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

Posted by deepaks4077 <gi...@git.apache.org>.
Github user deepaks4077 commented on the issue:

    https://github.com/apache/flink/pull/6170
  
    @zentol, thanks for reviewing. I've updated the PR, can you please take a look?


---

[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r197717352
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -666,12 +654,17 @@ public String select(Map<String, List<Event>> pattern) {
     			}
     		);
     
    -		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +		List<String> resultList = new ArrayList<>();
    +
    +		DataStreamUtils.collect(result).forEachRemaining(resultList::add);
    +
    +		List<String> expected = Arrays.asList("1,6,4\n1,5,4".split("\n"));
    --- End diff --
    
    same as above


---