You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by anisnasir <gi...@git.apache.org> on 2015/08/27 13:01:02 UTC

[GitHub] flink pull request: New Partitioner for better load balancing for ...

GitHub user anisnasir opened a pull request:

    https://github.com/apache/flink/pull/1069

    New Partitioner for better load balancing for skewed data

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/anisnasir/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1069
    
----
commit 25753316971f80e5d65bfbc60b70e6d846e1d798
Author: System Administrator <root@heatsettle-lm.(none)>
Date:   2015-03-18T13:46:18Z

    [FLINK-1725]- new partitioner added for load balancing for skewed data

commit 7410cc7a69ffcdf505b0fb32fc3db7406dfe8972
Author: Anis Nasir <an...@hotmail.com>
Date:   2015-04-01T22:55:59Z

    Merge branch 'master' of https://github.com/apache/flink

commit d79038e8b61292f69c82fc6eaa95e5b26eebcbc3
Author: Anis Nasir <an...@hotmail.com>
Date:   2015-08-26T22:19:13Z

    fixed casting to int exception

commit c82631856ba25f02ac04f0cd4fdc4522cf45c0ea
Author: Anis Nasir <an...@hotmail.com>
Date:   2015-08-27T10:59:17Z

    test case for Partial Partitioner

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1069: [FLINK-1725]- New Partitioner for better load balancing f...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/1069
  
    Can we close this pull request and revisit the feature later?
    The partial grouping does currently not work for windows, rescaling, etc, and it would be quite involved to add this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by anisnasir <gi...@git.apache.org>.
Github user anisnasir commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-135905274
  
    @tillrohrmann you are absolutely right with your observation that high skews require more than two workers to process the most frequent keys. However, most of the real world datasets do not have high skews [1] and can be handled by just splitting keys into two components.
    
    I was planning to write a wordcount example with both HashPartitioner and PartialPartitioner. Can you explain a little more on how one could check that the skew of the input data decreases after the partitioning.
    
    [1]. https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-136617574
  
    @anisnasir, actually I didn't thought about automatic adaption. First I thought about making it configurable. But if you could do it also automatically, then it would even be better. But I guess that it's fine to start with the configurable solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-136290092
  
    @anisnasir, good to know that most real world data sets can be handled by just splitting keys into two components. But what about the rest? Wouldn't it be nice to have a partitioner which works for all? How hard would it be to generalize your approach? We could set the default number of distributing channels to 2 to mimic your initial implementation.
    
    Concerning the test, you could for example create a `DataStream` which only contains a single key. Then you group on this key and then apply some other operation where you use the `PartialPartitioner`. In this latter operation you can assign the sub index of the task which processes the elements. Having this index, you should be able to calculate the distribution of the data. If you execute this test on 2 TMs with a single slot or a single TM with 2 slots, then you should get a 50/50 distribution if I'm not mistaken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-137156333
  
    @gdfm, could you please explain to me why 1 to 2 gives you an exponential gain (what kind of gain? reduction in data skew?) and anything further only a gain by a constant factor? I'm at a loss here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1069#discussion_r38091104
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/PartialPartitioner.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.partitioner;
    +
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.runtime.plugable.SerializationDelegate;
    +import org.apache.flink.streaming.api.streamrecord.StreamRecord;
    +import com.google.common.hash.HashFunction;
    +import com.google.common.hash.Hashing;
    +
    +/**
    + * Partitioner that map each key on any two channels using power of two choices.
    + *
    + * @param <T>
    + *            Type of the Tuple
    + */
    +public class PartialPartitioner<T> extends StreamPartitioner<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	private long[] targetTaskStats; // maintain past history of forwarded messages
    --- End diff --
    
    Maybe call it `targetChannelStats` for consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-137140076
  
    @gdfm, thanks for the great explanation. A good way to visualize what's happening.
    
    I'm just wondering why you shouldn't be able to connect more than 2 containers, let's say 3. In cases where you have an extremely high data skew, this might be helpful. Imagine that you have 10 containers and only 2 of them are full. Then in the best case you'll get 4 half filled containers after connecting two containers. But this still leaves 6 unused containers. Wouldn't it be better to connect for example 5 containers in this case? Then in the best case you would use all available containers. But of course this strongly depends on your actual data and therefore I'd vote to make the number of connected containers configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-139478888
  
    Will do, once I've cleared the rest of my backlog.
    
    On Thu, Sep 10, 2015 at 4:26 PM, Fabian Hueske <no...@github.com>
    wrote:
    
    > @tillrohrmann <https://github.com/tillrohrmann>, @mbalassi
    > <https://github.com/mbalassi> can you have another look at this PR and
    > check if it is good to merge? Thanks!
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/1069#issuecomment-139256994>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by gdfm <gi...@git.apache.org>.
Github user gdfm commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-137163339
  
    @tillrohrmann here a paper that describes the effect for a very similar setting:
    http://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
    The same arguments apply in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-135838438
  
    @anisnasir thanks for your contribution. Out of curiosity I was wondering why the `PartialPartitioner` distributes the data exactly between two channels. Wouldn't it also be conceivable to distribute it between an arbitrary number? Then one could adjust the `PartialPartitioner` depending on the actual data skew. I assume that there are situations where your data is still skewed even after distributing it onto two different consumers.
    
    It would be great if you could add another test which tests the functioning of the partitioner in a more applied scenario, if possible. Maybe one could check that the skew of the input data decreases after the partitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-139256994
  
    @tillrohrmann, @mbalassi can you have another look at this PR and check if it is good to merge? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by gdfm <gi...@git.apache.org>.
Github user gdfm commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-137096426
  
    @tillrohrmann the reason why 2 is a "magic number" is the following.
    Immagine channels as containers, and the load (the incoming data stream) as a liquid.
    When a key is split in two channels, it creates a "bridge" between these channels.
    You can imagine it as a pipe between the two specific containers that enables sharing the load, and brings the liquid to the same level (this happens thanks to the fact that the new messages are sent to the least loaded of the two containers).
    Now, if you have enough of these pipes between pairs of containers, you will effectively establish a network of load sharing among them. Any increase in pressure on one of the containers can be shared across the network effectively, which brings the load balance.
    The trick is to have "enough" keys to create enough pipes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-135418178
  
    Thanks for the fast response on your side addressing my comments. :) 
    
    I generally love the idea described at [FLINK-1725](https://issues.apache.org/jira/browse/FLINK-1725), I have a couple of concerns though:
    
    1. Having the code in is nice, but we need a way to surface it in the `DataStream` API. The current implementation can not do that in a straight-forward way, as you need the number of output channels in a constructor parameter. That is an information that we do not have when creating the partitioning strategy for the data stream yet (E.g. [here](https://github.com/apache/flink/blob/master/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L502-L504)).
    
    2. The comment you have added to `PartialPartitioner` did not give me a clear picture of the behaviour. I much prefer the one you wrote [here](https://issues.apache.org/jira/browse/FLINK-1725?focusedCommentId=14371032&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14371032). Some comment would be nice on the test side too, I only got it after reading the aforementioned JIRA issue.
    
    3. How sensitive are your algorithms fed by this partitioning to loosing the state in `targetChannelTasks`? Because if a Flink operator was to go down this information would not be recovered after failure.
    
    4. I share the wish of @gdfm that it would be really nice if the implementation did not depend on `toString`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-137164902
  
    Thanks :-)
    
    On Wed, Sep 2, 2015 at 6:39 PM, Gianmarco De Francisci Morales <
    notifications@github.com> wrote:
    
    > @tillrohrmann <https://github.com/tillrohrmann> here a paper that
    > describes the effect for a very similar setting:
    > http://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
    > The same arguments apply in this case.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/1069#issuecomment-137163339>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1069#discussion_r38088713
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/PartialPartitioner.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.partitioner;
    +
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.runtime.plugable.SerializationDelegate;
    +import org.apache.flink.streaming.api.streamrecord.StreamRecord;
    +import com.google.common.hash.HashFunction;
    +import com.google.common.hash.Hashing;
    +
    +/**
    + * Partitioner that map each key on any two channels using power of two choices.
    + *
    + * @param <T>
    + *            Type of the Tuple
    + */
    +public class PartialPartitioner<T> extends StreamPartitioner<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	private long[] targetTaskStats; // maintain past history of forwarded messages
    +	private HashFunction h1 = Hashing.murmur3_128(13);
    +	private HashFunction h2 = Hashing.murmur3_128(17);
    +	KeySelector<T, ?> keySelector;
    +	private int[] returnArray = new int[1];
    +
    +	public PartialPartitioner(KeySelector<T, ?> keySelector, int numberOfOutputChannels) {
    +		super(PartitioningStrategy.PARTIAL);
    +		this.targetTaskStats = new long[numberOfOutputChannels];
    +		this.keySelector = keySelector;
    +	}
    +
    +	@Override
    +	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
    +			int numberOfOutputChannels) {
    +		String str = record.getInstance().getKey(keySelector).toString(); // assume key is the first field
    --- End diff --
    
    Comment is copy-pasted from the Storm [implementation](https://github.com/gdfm/partial-key-grouping/blob/master/src/main/java/com/yahoo/labs/slb/PartialKeyGrouping.java), please remove as does not apply for Flink. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by anisnasir <gi...@git.apache.org>.
Github user anisnasir commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-136496310
  
    @tillrohrmann Yes, we can design a system that adapts to the load imbalance and increases the number of partitions per key in real time. However, this comes with few other questions that are:
    1. When to increases the choices?
    2. How much to increase the choices? 
    3. Should we increase the choices for all the keys?
    A simple solution for the datasets that cannot be handled using our solution is to use ShufflePartitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by gdfm <gi...@git.apache.org>.
Github user gdfm commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-137144309
  
    Sure, you can connect multiple containers.
    But while the gain you have from going from 1 to 2 is exponential, the gain from 2 to 3 and forward is just a constant factor. Nevertheless, there might be datasets with extreme skew for which having more choices is necessary. So I agree to make it configurable with a default of 2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

Posted by anisnasir <gi...@git.apache.org>.
Github user anisnasir commented on the pull request:

    https://github.com/apache/flink/pull/1069#issuecomment-135452679
  
    Thank you very much for the feedback.
    
    1. I have updated the constructor and removed the dependency of number of channels. Now the constructs are similar to Field Partitioner.
    2. I have added new comments.
    3. The algorithm is not sensitive to failures and does not require recovery of lost states.
    4. I fixed this as well
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1069: [FLINK-1725]- New Partitioner for better load balancing f...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/flink/pull/1069
  
    
    [![Coverage Status](https://coveralls.io/builds/11790399/badge)](https://coveralls.io/builds/11790399)
    
    Changes Unknown when pulling **a2429551c2e498383ed61a7e3650224c12ec3933 on anisnasir:master** into ** on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---