You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GEOFBOT <gi...@git.apache.org> on 2016/06/20 13:38:11 UTC

[GitHub] flink pull request #2136: [FLINK-3626] [py] zipWithIndex in Python API

GitHub user GEOFBOT opened a pull request:

    https://github.com/apache/flink/pull/2136

    [FLINK-3626] [py] zipWithIndex in Python API

    Implement zip_with_index in the Python API along with relevant tests and documentation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GEOFBOT/flink FLINK-3626

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2136
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2136#discussion_r67720318
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py ---
    @@ -25,13 +25,13 @@ def __init__(self):
             self._keys1 = None
             self._keys2 = None
     
    -    def _configure(self, input_file, output_file, port, env, info):
    +    def _configure(self, input_file, output_file, port, env, info, task_id):
    --- End diff --
    
    this parameter should be called subtask_index


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2136#discussion_r67720631
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py ---
    @@ -25,13 +25,13 @@ def __init__(self):
             self._keys1 = None
             self._keys2 = None
     
    -    def _configure(self, input_file, output_file, port, env, info):
    +    def _configure(self, input_file, output_file, port, env, info, task_id):
    --- End diff --
    
    this also applies to all other places where this parameter is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2136
  
    Very nice work, only had a few minor comments :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2136
  
    merging



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2136#discussion_r67720542
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -572,6 +572,41 @@ def set_parallelism(self, parallelism):
             self._info.parallelism.value = parallelism
             return self
     
    +    def count_elements_per_partition(self):
    --- End diff --
    
    please add a docstring for both count_elements_per_pertition and zip_with_index. You should be able to just copy the ones from the Java DataSetUtils class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2136


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2136: [FLINK-3626] [py] zipWithIndex in Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2136#discussion_r67719598
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -572,6 +572,41 @@ def set_parallelism(self, parallelism):
             self._info.parallelism.value = parallelism
             return self
     
    +    def count_elements_per_partition(self):
    +        class CountElementsPerPartitionMapper(MapPartitionFunction):
    +
    +            def map_partition(self, iterator, collector):
    +                counter = 0
    +                for x in iterator:
    +                    counter += 1
    +
    +                collector.collect((self.context.get_index_of_this_subtask(), counter))
    +        return self.map_partition(CountElementsPerPartitionMapper())
    +
    +    def zip_with_index(self):
    +        element_count = self.count_elements_per_partition()
    +        class ZipWithIndexMapper(MapPartitionFunction):
    +            start = 0
    --- End diff --
    
    if we would start at -1 we wouldn't have to decrement it when collecting the value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---