You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ramkrish86 <gi...@git.apache.org> on 2016/07/27 10:28:50 UTC

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

GitHub user ramkrish86 opened a pull request:

    https://github.com/apache/flink/pull/2301

    FLINK-3674 Add an interface for EventTime aware User Function (Ram)

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ramkrish86/flink FLINK-3674

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2301.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2301
    
----
commit 7c9045b0eedcdab1cfd1e1650886ecaaab5dd746
Author: Ramkrishna <ra...@intel.com>
Date:   2016-07-27T10:26:46Z

    FLINK-3674 Add an interface for EventTime aware User Function (Ram)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2301: FLINK-3674 Add an interface for EventTime aware User Func...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2301
  
    Can we go back to the JIRA issue for a step, and first decide how we actually want his feature to look like?
    For API additions, it is crucial to not do "something fast", but discuss and understand deeply what the feature should actually be like.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72416804
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java ---
    @@ -83,4 +86,19 @@ public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig exec
     			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
     		}
     	}
    +
    +	@Override
    +	public void onWatermark(Watermark watermark) {
    +		if(this.wrappedFunction instanceof EventTimeFunction) {
    --- End diff --
    
    missing space after if, again on L99


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72416820
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java ---
    @@ -85,4 +88,19 @@ public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig exec
     			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
     		}
     	}
    +
    +	@Override
    +	public void onWatermark(Watermark watermark) {
    +		if(this.wrappedFunction instanceof EventTimeFunction) {
    --- End diff --
    
    missing space after if, again on L101


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2301: FLINK-3674 Add an interface for EventTime aware User Func...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2301
  
    Created two interfaces EventTimeFunction and a WindowTimer (interface). The WindowTimer interface allows users to implement their own watermark timers and the way they could be fired. The EventTimeFunction could be used with any user defined functions and allows the creation of the WindowTimer. So the WindowOperator will see if the UDF is EventTimeFunction and uses the WindowTimer created by the EventimeFunction. If not it will use the DefaultTimer as in the current case.  I ran mvn verify to ensure there are no checkstyle or compilation errors. 
    @aljoscha 
    Any feedback/ suggestions?  This will now allow users to customize the timers. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72417872
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---
    @@ -55,13 +56,18 @@
     	
     	/** the user function */
     	protected final F userFunction;
    +
    +	protected EventTimeFunction eventTimeFunction;
     	
     	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
     	private transient boolean functionsClosed = false;
     	
     	
     	public AbstractUdfStreamOperator(F userFunction) {
     		this.userFunction = requireNonNull(userFunction);
    +		if(userFunction instanceof  EventTimeFunction) {
    --- End diff --
    
    I got that. It is Ctrl+Shift+L here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72416957
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---
    @@ -2003,6 +2048,53 @@ public void apply(String key,
     
     	}
     
    +	public static class RichSumReducerWithEventTime<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W>
    +		implements EventTimeFunction<String, W> {
    +		private static final long serialVersionUID = 1L;
    +
    +		private boolean openCalled = false;
    +		private static int count = 0;
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			super.open(parameters);
    +			openCalled = true;
    +		}
    +
    +		@Override
    +		public void close() throws Exception {
    +			super.close();
    +			closeCalled.incrementAndGet();
    +		}
    +
    +		@Override
    +		public void apply(String key,
    +						  W window,
    +						  Iterable<Tuple2<String, Integer>> input,
    --- End diff --
    
    this _should_ fail checkstyle, as the indentation is partly done with spaces. we only use tabs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72417108
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---
    @@ -55,13 +56,18 @@
     	
     	/** the user function */
     	protected final F userFunction;
    +
    +	protected EventTimeFunction eventTimeFunction;
     	
     	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
     	private transient boolean functionsClosed = false;
     	
     	
     	public AbstractUdfStreamOperator(F userFunction) {
     		this.userFunction = requireNonNull(userFunction);
    +		if(userFunction instanceof  EventTimeFunction) {
    --- End diff --
    
    Thanks for the super quick review. A quick question - In intellij IDE how should a formatting  be applied?  In Eclipse Ctrl+Shift+F applies the formatter that was configured. How about here?  I can update the next commit based on that. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 closed the pull request at:

    https://github.com/apache/flink/pull/2301


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72416839
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java ---
    @@ -85,4 +88,19 @@ public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig exec
     			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
     		}
     	}
    +
    +	@Override
    +	public void onWatermark(Watermark watermark) {
    +		if(this.wrappedFunction instanceof EventTimeFunction) {
    --- End diff --
    
    missing space after if, again on L101


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72416769
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java ---
    @@ -83,4 +86,19 @@ public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig exec
     			((OutputTypeConfigurable<OUT>)this.wrappedFunction).setOutputType(outTypeInfo, executionConfig);
     		}
     	}
    +
    +	@Override
    +	public void onWatermark(Watermark watermark) {
    +		if(this.wrappedFunction instanceof EventTimeFunction) {
    --- End diff --
    
    missing space after if, ang again on L99


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2301: FLINK-3674 Add an interface for EventTime aware User Func...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2301
  
    Closing this as discussed. I will wait for more feedback on what is expected here and then proceed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2301: FLINK-3674 Add an interface for EventTime aware User Func...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2301
  
    I would suggest to close this PR and start with a proper design discussion.
    The approach taken here has a bunch of decisions/implications that I am not sure about, like
    
      - Mixing UDFs with "onWatermark" functionality with timers
      - Mixing windowed and not windowed code
    
    In a clean design, this feature should not touch any window operator code.
    
    Sorry for that, but that is why we encourage people to share designs before going full on implementing - at least for more complex additions like this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2301: FLINK-3674 Add an interface for EventTime aware Us...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2301#discussion_r72416703
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---
    @@ -55,13 +56,18 @@
     	
     	/** the user function */
     	protected final F userFunction;
    +
    +	protected EventTimeFunction eventTimeFunction;
     	
     	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
     	private transient boolean functionsClosed = false;
     	
     	
     	public AbstractUdfStreamOperator(F userFunction) {
     		this.userFunction = requireNonNull(userFunction);
    +		if(userFunction instanceof  EventTimeFunction) {
    --- End diff --
    
    missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---