You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gaborhermann <gi...@git.apache.org> on 2015/06/26 18:36:11 UTC

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

GitHub user gaborhermann opened a pull request:

    https://github.com/apache/flink/pull/872

    [FLINK-2138] Added custom partitioning to DataStream

    Custom partitioning added to DataStream in order to be more consistent with the batch API.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gaborhermann/flink FLINK-2138

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/872.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #872
    
----
commit 3352b0915725303193ab8dd0ebeb7bb38ad69ca3
Author: Gábor Hermann <re...@gmail.com>
Date:   2015-06-26T15:23:36Z

    [FLINK-2138] [streaming] Added custom partitioning to DataStream

commit 584f54c335fce61e4b8eea303619b943967f262a
Author: Gábor Hermann <re...@gmail.com>
Date:   2015-06-26T15:56:16Z

    [FLINK-2138] [streaming] Added custom partitioning to scala DataStream

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116745707
  
    The partitioner function in Scala was simply added as a mirror of the Java API.
    
    The batch API is stable, that means at most we can add a Scala function and deprecate the partitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-117544452
  
    -1
    Documentation is missing.
    
    http://flink.apache.org/coding-guidelines.html:
    > Documentation Updates. Many changes in the system will also affect the documentation (both JavaDocs and the user documentation in the docs/ directory.). Pull requests and patches are required to update the documentation accordingly, otherwise the change can not be accepted to the source code.
    
    Could you also add an IT case that ensures that the data is actually partitioned properly? The test you've added is only ensuring that the partitioning properties are set correctly in the `DataStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116710090
  
    I am confused now, what is it going to be?
      1. Overloading, such that it is Scala function and Partitioner, at the cost of redundant APIs.
      2. Only partitioner (sync with batch API)
      3. Only Scala function (break with batch API)
    
    I am not a big fan of (1), as these redundant options are confusing blow-ups of the APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116755999
  
    Okay, then I will
    * deprecate the partitioner implementation in the batch API
    * add the function implementation to the batch API
    * add the function implementation to the streaming API and remove the partitioner implementation (so streaming will only have function implementation). As this PR is not merged yet we do not break the streaming API.
    
    Is it okay?
    I guess it's worth it. This way Scala users will be able to write more concise code and they will not get confused by the overloaded functions because the ones with the partitioner will be deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-120506018
  
    Looks good to merge. If no objections, I will merge it tomorrow. :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/872


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-115832961
  
    Wouldn't it make sense to implement custom partitioning in a way that it allows to return a array of indexes like in the ChannelSelector interface? Returning only 1 index limits the partitioning very much.
    
    Maybe the users could implement a ChannelSelector and we would wrap that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-117117294
  
    How about we leave the batch API as it is for now and address that as a separate issue? There are quite some subtleties in how the optimizer assesses equality of partitioning (based on partitioners) that would have to be changed (and should retain backwards compatibility).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116528738
  
    I actually like this approach. We had the same discussion for the batch API and resolved to this, because:
    
      - You can always chain a `FlatMapFunction` with a `partitionCustom()` request to solve all the above situations.
      - This interface allows easy Java8-lambda implementation and it works well with the type extraction.
      - It seems to cover the majority of cases more elegantly, as there is no need for array wrapping in the user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-117141916
  
    Okay then. These are the effects of changing I did not know about. Let's stick to (2) and later on, we might reconsider this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116106372
  
    I think I could find several use cases if I wanted to :) For example I would often like to broadcast some model information to many downstream operators at once. (not exactly broadcast ,maybe only to a couple of them)
    
    Also even this does not give full flexibility. Imagine a scenario where I have a self loop, and I want to send something to all others (except myself), to do this I would also need to know my own channel index...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116103719
  
    I guess it is easier for the users to understand and partitioning to multiple channels at a time is rarely needed. Is there a use-case where it is needed?
    
    It should be consistent with the batch API in my opinion. Let's start a discussion about this if we would like to change the custom partitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-117644273
  
    Sorry.
    * updated the docs (custom partitioning was also missing in the Scala batch API docs)
    * added IT case tests (also for the other stream partitioning methods as they were missing too)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116736041
  
    Sorry for not making myself clear.
    
    I would actually go for
    4. Only the Scala function (both in the streaming and batch API)
    
    I don't understand how changing from partitioner implementation to function implementation in the batch API would mess up determining the compatibility of the partitioning. By compatibility I mean the type of the key must be the same as the input of the partitioner.
    
    I suppose there was another reason (that I do not understand) for choosing the partitioner implementation for the Scala batch API, so if (4) is not an option, I would go for (2) (only partitioner, sync with batch API).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116649358
  
    In the batch API, equality of the partitioners is used to determine compatibility of the partitioning. This may at some point become interesting for the streaming API as well.
    
    In any case, let's pick one of the two variants (function of partitioner implementation). Overloading the methods too much with equally powerful variants inevitable confuses users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116671285
  
    I'd prefer the function implementation (like `(K, Int) => Int`), but it should stay consistent with the batch API. I don't see why the wrapping would effect the compatibility checking of the partitioning.
    
    Is it okay, if I change it to the function implementation in both (Scala batch, Scala streaming) APIs? If not, then let's just stick with the partitioner implementation in the APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

Posted by gaborhermann <gi...@git.apache.org>.
Github user gaborhermann commented on the pull request:

    https://github.com/apache/flink/pull/872#issuecomment-116647912
  
    By the way, in the Scala DataSet the user should specify the Java `Partitioner[K]` class. Wouldn't it be more convenient to wrap a function like `(K, Int) => Int` into a `Partitioner[K]` similarly to the KeySelector?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---