You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by manuzhang <gi...@git.apache.org> on 2017/04/02 11:46:46 UTC

[GitHub] flink pull request #3661: [FLINK-4953] Allow access to "time" in ProcessWind...

GitHub user manuzhang opened a pull request:

    https://github.com/apache/flink/pull/3661

    [FLINK-4953] Allow access to "time" in ProcessWindowFunction.Context

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/manuzhang/flink process_window_function

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3661.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3661
    
----
commit 2f37836f462c217d627088f0266e8a3222b9d919
Author: manuzhang <ow...@gmail.com>
Date:   2017-04-02T11:27:14Z

    [FLINK-4953] Allow access to "time" in ProcessWindowFunction.Context

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    @aljoscha Yeah, that looks better. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Not sure what happened here although tests passed in most cases. Looks unrelated to this PR.
    
    ```
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on project flink-runtime-web_2.10: There are test failures.
    [ERROR] 
    [ERROR] Please refer to /home/travis/build/apache/flink/flink-runtime-web/target/surefire-reports for the individual test results.
    [ERROR] -> [Help 1]
    [ERROR] 
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR] 
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
    [ERROR] 
    [ERROR] After correcting the problems, you can resume the build with the command
    [ERROR]   mvn <goals> -rf :flink-runtime-web_2.10
    Trying to KILL watchdog (1513).
    ./tools/travis_mvn_watchdog.sh: line 210:  1513 Terminated              watchdog
    MVN exited with EXIT CODE: 1.
    java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file or directory)
    	at java.util.zip.ZipFile.open(Native Method)
    	at java.util.zip.ZipFile.<init>(ZipFile.java:215)
    	at java.util.zip.ZipFile.<init>(ZipFile.java:145)
    	at java.util.zip.ZipFile.<init>(ZipFile.java:116)
    	at sun.tools.jar.Main.list(Main.java:1004)
    	at sun.tools.jar.Main.run(Main.java:245)
    	at sun.tools.jar.Main.main(Main.java:1177)
    find: `./flink-yarn-tests/target/flink-yarn-tests*': No such file or directory
    PRODUCED build artifacts.
    1.log  2.log  build_info  mvn.out
    COMPRESSING build artifacts.
    1.log
    2.log
    build_info
    mvn.out
    Uploading to transfer.sh
    The command "./tools/travis_mvn_watchdog.sh 300" exited with 1.
    Done. Your build exited with 1.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Thanks, @manuzhang I had another look at your changes. I would merge them now but simplify the tests and `WindowOperator` a little, if that's alright with you.
    
    In `WindowOperator` I would change `WindowContext` to query the `InternalTimerService` directly, as in:
    ```
    @Override
    public long currentProcessingTime() {
    	return internalTimerService.currentProcessingTime();
    }
    
    @Override
    public long currentWatermark() {
    	return internalTimerService.currentWatermark();
    }
    ```
    I would introduce a specific test, like this:
    ```
    	@Test
    	public void testEventTimeQuerying() throws Exception {
    		testCurrentTimeQuerying(new EventTimeAdaptor());
    	}
    
    	@Test
    	public void testProcessingTimeQuerying() throws Exception {
    		testCurrentTimeQuerying(new ProcessingTimeAdaptor());
    	}
    
    	public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception {
    		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
    		timeAdaptor.setIsEventTime(mockAssigner);
    		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
    		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
    
    		final KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
    				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
    
    		testHarness.open();
    
    		shouldFireOnElement(mockTrigger);
    
    		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
    				.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
    
    		doAnswer(new Answer<Object>() {
    			@Override
    			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
    				timeAdaptor.verifyCorrectTime(testHarness, context);
    				return null;
    			}
    		}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
    
    		doAnswer(new Answer<Object>() {
    			@Override
    			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
    				timeAdaptor.verifyCorrectTime(testHarness, context);
    				return null;
    			}
    		}).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext());
    
    		timeAdaptor.advanceTime(testHarness, 10);
    
    		testHarness.processElement(new StreamRecord<>(0, 0L));
    
    		verify(mockWindowFunction, times(1)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
    
    		timeAdaptor.advanceTime(testHarness, 100);
    
    		verify(mockWindowFunction, times(1)).clear(anyTimeWindow(), anyInternalWindowContext());
    	}
    ```
    
    What do you think? I would change your commit and commit as one thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    I'd like to keep it for now, since it might help catch some bugs and doesn't take long to run.
    
    What was the bug in the comparator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3661: [FLINK-4953] Allow access to "time" in ProcessWind...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3661#discussion_r109821341
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---
    @@ -2560,7 +2560,7 @@ public int compare(Object o1, Object o2) {
     					if (comparison != 0) {
     						return comparison;
     					}
    -					return (int) (sr0.getValue().f1 - sr1.getValue().f1);
    +					return (int) (sr0.getValue().f2 - sr1.getValue().f2);
    --- End diff --
    
    @aljoscha this line compares second field again rather than the third field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Thanks! \U0001f603 
    
    I'll let travis run and then I'm merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    By the way, great conference and great contents from http://sf.flink-forward.org/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3661: [FLINK-4953] Allow access to "time" in ProcessWind...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang closed the pull request at:

    https://github.com/apache/flink/pull/3661


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    @aljoscha Updated as you proposed although not sure I've got all the needed tests there (properly).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    It seems `WindowOperatorContractTest` is for `InternalWindowFunction` while there is no existing tests for `ProcessWindowFunction`. Should I add them there ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Okay. Shall we open a JIRA to deprecate `WindowOperatorTest` ? I found a minor bug in `WindowOperatorTest#Tuple3ResultSortComparator` (same as `WindowOperatorMigrationTest`), do we want to fix it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Thanks for opening this PR. \U0001f603 
    
    I had a few comments and I think it's easier to collect them here instead of commenting on the individual files:
     - I would prefer to not call `context.getCurrent*Time()` overtime even though it's not needed. This can be costly. Instead we can pass the `Context` and only invoke the methods when needed.
     - I regard `WindowOperatorTest` as legacy, new tests should be added to `WindowOperatorContractTest`. This is a base class with a lot of test cases that is extended by `RegularWindowOperatorTest` and `EvictingWindowOperatorTest`.
    
    Both of these are hard to know and it's not obvious what test should go where, sorry for the inconvenience. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    Merged. Thanks again for working on this! \U0001f604 
    
    Could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    I'm currently in SF for http://sf.flink-forward.org and quite busy. That's why my responses are quite slow, sorry for that.
    
    A quick answer: There is the (slightly hard to find) `InternalWindowFunctionTest` which tests that the internal window functions correctly forward calls to a process window function and other (user facing) window functions. We should test the internal logic of the new time access code for `InternalWindowFunction` in `WindowOperatorContractTest`, (see, for example `testPerWindowStateSetAndClearedOnEventTimePurge()`). We should add more tests that verify that per-window state and also the new time access is correctly forwarded in `InternalWindowFunctionTest`.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/flink/pull/3661
  
    @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---