You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mjsax <gi...@git.apache.org> on 2015/11/19 18:19:15 UTC

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

GitHub user mjsax opened a pull request:

    https://github.com/apache/flink/pull/1387

    [FLINK-2861] Fields grouping on split streams fails

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mjsax/flink flink-2861-fieldsGrouping-SplitStream

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1387
    
----
commit f31c895f38678a5b393d74fc52064d3c4844f5de
Author: mjsax <mj...@informatik.hu-berlin.de>
Date:   2015-11-06T10:51:31Z

    [FLINK-2861] Fields grouping on split streams fails

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-158416101
  
    I ran into that issue a while back. The problem was that you emit elements that have the name/tag of the split stream (to use the split selector) and over that, you run the regular partitioner.
    
    Splitting this into two steps should work: Splitting, having a mapper that removes the "split tag" and then doing the field partitioning.
    
    Would be nice to make this nice in Flink, but it is tricky. Not all functions go though a collector, and not all collectors can split.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-158538063
  
    I don't understand you comment? What you describe is exactly how it works (even before this PR). The problem was actually just a missing `TypeInformation` I added here... However, I introduced this `token` into the name of an operator here, because the `.select(...).map(...)` get chained which alters the operator name and I need the original name later on in `WrapperSetupHelper` -- this part is "hacky". But I am not sure if there is a better solution to it. Look for the new variable `FlinkTopologyBuilder.token`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-159227726
  
    Ah, okay, so I skipped a version by accident ;-) I think that original change could not quite have worked, as it would have propagated the wrong TypeInfos to the ArrayKeySelector and OutputSerializer. The one with `map()` looks good...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-159224189
  
    @mjsax Okay, I probably misread the original code. I thought the `map()` that drops the tag was not there...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-158126101
  
    In `FlinkTopologyBuilder` I had to introduce a *token* for operator names which is used in `WrapperSetupHelper` to get the original name of an operator (Flink changes operator names when chaining happens). This is quite hacky. If anybody has a better idea how to solve this, please let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-158765658
  
    Maybe a stupid question but why don't you send the operator name along with the BoltWrapper? Then there is no need to extract it from the Flink task name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1387#discussion_r45559955
  
    --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java ---
    @@ -118,6 +119,11 @@ static synchronized TopologyContext createTopologyContext(
     			// prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here
     			operatorName = operatorName.substring(8);
     		}
    +		final int idx = operatorName.indexOf(FlinkTopologyBuilder.token);
    +		if (idx != -1) {
    +			// in case of operator chaining Flink alters the name -- need to get rir of it here
    --- End diff --
    
    Typo: "rid"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1387#discussion_r45559371
  
    --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---
    @@ -209,12 +209,8 @@ public void open() throws Exception {
     		super.open();
     
     		this.flinkCollector = new TimestampedCollector<OUT>(output);
    -		OutputCollector stormCollector = null;
    -
    -		if (this.numberOfAttributes.size() > 0) {
    -			stormCollector = new OutputCollector(new BoltCollector<OUT>(
    -					this.numberOfAttributes, flinkCollector));
    -		}
    +		final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
    +				this.numberOfAttributes, flinkCollector));
    --- End diff --
    
    Yes, I fixed that in my code base as well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-158952162
  
    That's  actually not a stupid question. I did not do it in the first place to avoid "redundant" code (I was not aware that Flink changes names). I just changed it to your suggestion. If Travis passed, I would merge this. Or are there any objections?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1387


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-158538344
  
    Btw: Travis fails due to a instable test -- I already create a JIRA for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2861] Fields grouping on split streams ...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1387#issuecomment-159226912
  
    @StephanEwen In the original code that was actually the case. The `BoltWrapper` receiving the data was able to extract the tuples by itself. This got changed as I introduces proper generic types in `FlinkTopologyBuilder`.
    
    Merging this now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---