You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by s1ck <gi...@git.apache.org> on 2015/08/30 00:09:17 UTC

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

GitHub user s1ck opened a pull request:

    https://github.com/apache/flink/pull/1075

    [FLINK-2590] fixing DataSetUtils.zipWithUniqueId()

    * modified algorithm as explained in the issue
    * updated method documentation

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/s1ck/flink FLINK-2590

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1075
    
----
commit ab362b5b5ae390449972cc03f398d75c0231cb3c
Author: Martin Junghanns <ma...@gmx.net>
Date:   2015-08-29T20:51:19Z

    [FLINK-2590] fixing DataSetUtils.zipWithUniqueId()
    
    * modified algorithm as explained in the issue
    * updated method documentation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136352327
  
    +1 for a test, otherwise this looks good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136545997
  
    Ah, thank you for the proof.
    And didn`t see the log2 in detail before, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1075


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136380943
  
    There is already a test case for zipWithUniqueId() in https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java#L66
    However, this test is under the assumption that there is only one task running, which is why it did not fail in the first place.
    If there are multiple tasks, the resulting unique id is not deterministic for a single dataset element. I would implement a test, that creates a dataset, applies the `zipWithUniqueId` method, calls `distinct(0)` on the created ids and checks the number of resulting elements (must be equal to the input dataset). Would this be sufficient?
    Furthermore, the current test cases for `DataSetUtils` assume a resulting dataset as string and check this after each test run. My proposed test would not fit in that scheme. Should I create a new test case class for this method?
    
    @StephanEwen I wanted to do this, but static doesn't work with anonymous classes. However, I can declare the UDF as a private inner class (didn't want to change much code).
    @HuangWHWHW the `log2` method already existed and in the issue, I proposed to rename it. Maybe `getBitSize(long value)`? As for the "proof": if each task id is smaller than the total number of parallel tasks t, its bit representation is also smaller than the bit representation of t. Thus, when we shift the counter by the number of bits of t, there cannot be a collision for different task ids


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136655355
  
    No problem @s1ck. It might be a bit redundant but it tests that the forwarding is done correctly. Therefore, I fixed the test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136129581
  
    Thanks a lot for the contribution.
    Can you add a test case for the method to make sure the issue is not re-introduced again when somebody else is changing the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136427373
  
    There is an issue that tracks the `ConcurrentModificationException`problem. As per discussion in that issue, can you use a `BroadcastVariableInitializer`? Safes redundant sorts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136426216
  
    @tillrohrmann While writing the new tests for both methods, I encountered that `zipWithIndex` is broken, too. It sometimes throws `ConcurrentModificationException`. This is because each task sorts a broadcasted list in the `open` method. This could not fail before due to parallelism = 1.
    I would fix this by creating a local copy of that list (which should be small in that specific case). Shall I fix this in the same issue or do you want me to create a new issue for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136619441
  
    @s1ck, it's important to note that `1` will be subtracted from `getRuntimeContext().getNumberOfParallelSubtasks()` and not `getBitSize()`. The reason is that we have `0` based indices for the subtasks. Thus, we only have to calculate the maximum needed bits for the highest index we can encounter. And this is `getRuntimeContext().getNumberOfParallelSubtasks() - 1`. Thus if `getNumberOfParallelSubtasks == 7`, then we would calculate `getBitSize(6) == 3`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136386406
  
    @s1ck, the `testZipWithUniqueId` test is bogus. You can remove this test case an replace it with your described test. It would also be great if you could set the parallelism of `testZipWithIndex` to something greater than `1`. Here it would also make sense to use `collect` instead of writing to disk.
    
    +1 for renaming `log2` into `getBitSize(long value)`. When you rename the method, could you also change the line `shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks())` into `shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1)`. That way, we would also get the right unique ids in case of `parallelism = 1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136633442
  
    @tillrohrmann of course you are right, I thought wrong about it. it's committed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136431165
  
    @StephanEwen thx for the hint. works fine! Will cleanup and commit now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136640020
  
    @s1ck, looks really good. Thanks for your contribution. Will merge it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136658385
  
    Ok, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1075#discussion_r38305177
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java ---
    @@ -121,6 +122,7 @@ public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) thr
     
     		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
     
    +			long maxLength = log2(Long.MAX_VALUE);
    --- End diff --
    
    You can make this `static final`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136245681
  
    @rmetzger  +1. I think add a test is helpful.
    Otherwise can you give us a infomation that prove the 'id = (counter << shifter) + taskId; ' will never generate the same id in different task?
    And a minor thing in you issue description:
    Is log2(8)=3 not 4?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136382814
  
    @s1ck Good idea. You can also call `collect()`, add the IDs to a set and make sure the set has the right cardinality. In general, avoiding temp files and Strings for comparison is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136445029
  
    @tillrohrmann I did not include the `shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1)` as your hint only applies for power of 2 values. E.g., `getBitSize(7)` returns 3 and we need 3 bits to cover the range from 0 to 6.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

Posted by s1ck <gi...@git.apache.org>.
Github user s1ck commented on the pull request:

    https://github.com/apache/flink/pull/1075#issuecomment-136654131
  
    Sorry, I did not see that there are also identical test cases in Scala which now fail due to the `-1` change. As those scala methods wrap the Java methods, is it necessary to run the same tests on them again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---