You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by HuangWHWHW <gi...@git.apache.org> on 2015/08/07 09:42:09 UTC

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

GitHub user HuangWHWHW opened a pull request:

    https://github.com/apache/flink/pull/999

    [FLINK-2495][fix]Add a null point check in API DataStream.union

    The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a  external interface for user.
    The parameter "streams" maybe null and it will throw NullPointerException error.
    
    This test below can be intuitive to explain this problem:
    
    package org.apache.flink.streaming.api;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import org.junit.Test;
    
    /**
     * Created by HuangWHWHW on 2015/8/7.
     */
    public class test {
    
    	public static class sourceFunction extends RichParallelSourceFunction<String> {
    
    		public sourceFunction() {
    		}
    
    		@Override
    		public void run(SourceContext<String> sourceContext) throws Exception {
    			sourceContext.collect("a");
    		}
    
    		@Override
    		public void cancel() {
    
    		}
    	}
    
    	@Test
    	public void testUnion(){
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		env.setParallelism(1);
    		DataStream<String> source = env.addSource(new sourceFunction());
    		DataStream<String> temp1 = null;
    		DataStream<String> temp2 = source.map(new MapFunction<String, String>() {
    			@Override
    			public String map(String value) throws Exception {
    				if (value == "a") {
    					return "This is for test temp2.";
    				}
    				return null;
    			}
    		});
    		DataStream<String> sink = temp2.union(temp1);
    		sink.print();
    		try {
    			env.execute();
    		}catch (Exception e){
    			e.printStackTrace();
    		}
    	}
    
    }


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HuangWHWHW/flink FLINK-2495

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/999.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #999
    
----
commit 89b4aa6c94e4e00ec382746e41ae893d83b55d86
Author: HuangWHWHW <40...@qq.com>
Date:   2015-08-07T07:38:21Z

    [FLINK-2495][fix]Add a null point check in API DataStream.union

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/999#discussion_r36508794
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---
    @@ -256,9 +256,11 @@ public ExecutionConfig getExecutionConfig() {
     		DataStream<OUT> returnStream = this.copy();
     
     		for (DataStream<OUT> stream : streams) {
    -			for (DataStream<OUT> ds : stream.unionedStreams) {
    -				validateUnion(ds.getId());
    -				returnStream.unionedStreams.add(ds.copy());
    +			if (stream != null) {
    --- End diff --
    
    I'm not sure if it is good practice to ignore `null` referenced streams. Why not let it fail here? We might hide other errors by skipping the problematic stream here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/999#discussion_r36515671
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---
    @@ -256,9 +256,11 @@ public ExecutionConfig getExecutionConfig() {
     		DataStream<OUT> returnStream = this.copy();
     
     		for (DataStream<OUT> stream : streams) {
    -			for (DataStream<OUT> ds : stream.unionedStreams) {
    -				validateUnion(ds.getId());
    -				returnStream.unionedStreams.add(ds.copy());
    +			if (stream != null) {
    --- End diff --
    
    Hi,
    you may look at this problem with my test above.
    As you see, if I ignore the null, code "temp2.union(temp1)" will just copy a new DataStream.
    And this new DataStream has absolutely no change comparing with temp2.
    So, If the temp2 is good, the new DataStream is good too.
    And the test or other case similar to this test will execute successfully.
    Instead, there just will be a error.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW closed the pull request at:

    https://github.com/apache/flink/pull/999


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/999#discussion_r36587360
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---
    @@ -256,9 +256,11 @@ public ExecutionConfig getExecutionConfig() {
     		DataStream<OUT> returnStream = this.copy();
     
     		for (DataStream<OUT> stream : streams) {
    -			for (DataStream<OUT> ds : stream.unionedStreams) {
    -				validateUnion(ds.getId());
    -				returnStream.unionedStreams.add(ds.copy());
    +			if (stream != null) {
    --- End diff --
    
    Right there will be an error, but the error is helpful because it indicates a problem with the user program. 
    If the union is silently ignored, the program might behave differently from what the user expects and produce invalid results. 
    I would argue, that a `union(null)` is never intended and should be be brought to the user's attention. Also, this error might reveal more problems of the user program.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/999#issuecomment-129357904
  
    Thanks for the contribution. I think both Max and Fabian have a valid point. This will lead to unexpected behaviour like `a.union(b)` working, but then `b.union(a)` throwing a `NullPointerException`.
    
    If you want, you can add an extra null check with a better error message in case of a `null`. Otherwise, I think we can close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2495][fix]Add a null point check in API...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/999#issuecomment-129452134
  
    Hi, all.
    I think you are right.
    Thank you for advices.
    I`ll close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---