You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/20 21:58:59 UTC

[GitHub] [beam] robertwb opened a new pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

robertwb opened a new pull request #11766:
URL: https://github.com/apache/beam/pull/11766


   Also adds (naive) dataframe.agg() that uses this.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r434214501



##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartition_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
+  """
+
+  _INDEX_PARTITIONS = 100

Review comment:
       Oh, I was just testing things. I'll change it back. (It would be great to get rid of this altogether, as it limits parallelism, but that's not part of this change.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r434217321



##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartition_of(self, other):

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #11766:
URL: https://github.com/apache/beam/pull/11766


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r435422511



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -85,16 +87,10 @@ def evaluate_at(self, session):  # type: (Session) -> T
     """Returns the result of self with the bindings given in session."""
     raise NotImplementedError(type(self))
 
-  def requires_partition_by_index(self):  # type: () -> bool
-    """Whether this expression requires its argument(s) to be partitioned
-    by index."""
-    # TODO: It might be necessary to support partitioning by part of the index,
-    # for some args, which would require returning more than a boolean here.
+  def requires_partition_by(self):  # type: () -> Partitioning
     raise NotImplementedError(type(self))
 
-  def preserves_partition_by_index(self):  # type: () -> bool
-    """Whether the result of this expression will be partitioned by index
-    whenever all of its inputs are partitioned by index."""
+  def preserves_partition_by(self):  # type: () -> Partitioning

Review comment:
       Ah that makes sense. And I guess the name "preserves" is actually intuitive now that I understand it's setting an upper bound on the output partitioning.
   
   I think the complexity is worth it, unless there's a chance those operations will never materialize. Can you just add a docstring indicating that "preserves" sets an upper bound on the output partitioning (or any other language to make sure readers can grok it)? A similar comment about requires would be good too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r434215127



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -85,16 +87,10 @@ def evaluate_at(self, session):  # type: (Session) -> T
     """Returns the result of self with the bindings given in session."""
     raise NotImplementedError(type(self))
 
-  def requires_partition_by_index(self):  # type: () -> bool
-    """Whether this expression requires its argument(s) to be partitioned
-    by index."""
-    # TODO: It might be necessary to support partitioning by part of the index,
-    # for some args, which would require returning more than a boolean here.
+  def requires_partition_by(self):  # type: () -> Partitioning
     raise NotImplementedError(type(self))
 
-  def preserves_partition_by_index(self):  # type: () -> bool
-    """Whether the result of this expression will be partitioned by index
-    whenever all of its inputs are partitioned by index."""
+  def preserves_partition_by(self):  # type: () -> Partitioning

Review comment:
       Yes, it's a function of both the input and the operation. E.g. an elementwise operation preserves all existing partitioning, but does not guarantee any. 

##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartition_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()

Review comment:
       Clarified. 

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -23,6 +23,7 @@
 
 from apache_beam.dataframe import expressions
 from apache_beam.dataframe import frame_base
+from apache_beam.dataframe import frames  # pylint: disable=unused-import

Review comment:
       This ensures the deferred dataframe subclasses are registered for wrapping.

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -23,6 +23,7 @@
 
 from apache_beam.dataframe import expressions
 from apache_beam.dataframe import frame_base
+from apache_beam.dataframe import frames  # pylint: disable=unused-import

Review comment:
       It makes sure the wrapper code is populated with the various types of frames. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r435426414



##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartitioning_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
+
+  The ordering is implemented via the is_subpartitioning_of method, where the
+  examples on the right are subpartitionings of the examples on the left above.
+  """
+
+  _INDEX_PARTITIONS = 10
+
+  def __init__(self, levels=None):
+    self._levels = levels
+
+  def __eq__(self, other):
+    return type(self) == type(other) and self._levels == other._levels
+
+  def __hash__(self):
+    if self._levels:
+      return hash(tuple(sorted(self._levels)))
+    else:
+      return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    if isinstance(other, Nothing):
+      return True
+    elif isinstance(other, Index):
+      if self._levels is None:
+        return True
+      elif other._levels is None:
+        return False
+      else:
+        return all(level in other._levels for level in self._levels)
+    else:
+      return False
+
+  def partition_fn(self, df):
+    if self._levels is None:
+      levels = list(range(df.index.nlevels))
+    else:
+      levels = self._levels
+    hashes = sum(
+        pd.util.hash_array(df.index.get_level_values(level))
+        for level in levels)
+    for key in range(self._INDEX_PARTITIONS):
+      yield key, df[hashes % self._INDEX_PARTITIONS == key]
+
+
+class Singleton(Partitioning):
+  """A partitioning co-locating all data to a singleton partition.
+  """
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    return True
+
+  def partition_fn(self, df):
+    yield None, df
+
+
+class Nothing(Partitioning):
+  """A partitioning imposing no constraints on the actual partitioning.
+  """
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    return not other
+
+  def __bool__(self):
+    return False
+
+  __nonzero__ = __bool__

Review comment:
       I think that making Nothing falsy and relying on that in logic elsewhere harms readability. What do you think about dropping this and just explicitly checking for Nothing when needed?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -138,36 +140,35 @@ def evaluate(partition, stage=self.stage):
     class Stage(object):
       """Used to build up a set of operations that can be fused together.
       """
-      def __init__(self, inputs, is_grouping):
+      def __init__(self, inputs, partitioning):
         self.inputs = set(inputs)
-        self.is_grouping = is_grouping or len(self.inputs) > 1
+        if len(self.inputs) > 1 and not partitioning:
+          # We have to shuffle to co-locate, might as well partition.
+          self.partitioning = partitionings.Index()
+        else:
+          self.partitioning = partitioning
         self.ops = []
         self.outputs = set()
 
     # First define some helper functions.
-    def output_is_partitioned_by_index(expr, stage):
-      if expr in stage.inputs:
-        return stage.is_grouping
-      elif expr.preserves_partition_by_index():
-        if expr.requires_partition_by_index():
+    def output_is_partitioned_by(expr, stage, partitioning):
+      if not partitioning:
+        return True
+      elif stage.partitioning == partitionings.Singleton():
+        # Within a stage, the singleton partitioning is trivially preserved.
+        return True
+      elif expr in stage.inputs:
+        return stage.partitioning.is_subpartitioning_of(partitioning)
+      elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
+        if expr.requires_partition_by().is_subpartitioning_of(partitioning):

Review comment:
       Had trouble justifying this logic to myself, ended up writing a little proof which I'm a bit embarrassed to share with a Math PhD:
   ```
   output partitioning of expr = min(expr.preserves, input partitioning)
   input partitioning >= expr.requires
   
   thus if expr.requires >= required output AND expr.preserves >= required output
   then output partitioning of expr >= required output
   
   Otherwise we need to go up the tree of inputs to figure out their partitionings
   ```
   
   This may be the least concise way to express this so I don't know if it's worth putting in a comment verbatim, but something to that effect would be helpful (assuming I've got it right)

##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartitioning_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
+
+  The ordering is implemented via the is_subpartitioning_of method, where the
+  examples on the right are subpartitionings of the examples on the left above.
+  """
+
+  _INDEX_PARTITIONS = 10
+
+  def __init__(self, levels=None):
+    self._levels = levels
+
+  def __eq__(self, other):
+    return type(self) == type(other) and self._levels == other._levels
+
+  def __hash__(self):
+    if self._levels:
+      return hash(tuple(sorted(self._levels)))
+    else:
+      return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    if isinstance(other, Nothing):
+      return True
+    elif isinstance(other, Index):
+      if self._levels is None:
+        return True
+      elif other._levels is None:
+        return False
+      else:
+        return all(level in other._levels for level in self._levels)
+    else:
+      return False
+
+  def partition_fn(self, df):
+    if self._levels is None:
+      levels = list(range(df.index.nlevels))
+    else:
+      levels = self._levels
+    hashes = sum(
+        pd.util.hash_array(df.index.get_level_values(level))
+        for level in levels)
+    for key in range(self._INDEX_PARTITIONS):
+      yield key, df[hashes % self._INDEX_PARTITIONS == key]
+
+
+class Singleton(Partitioning):
+  """A partitioning co-locating all data to a singleton partition.

Review comment:
       Isn't `Singleton` completely partitioning the data into one element per partition? This description doesn't seem consistent to me, maybe I'm misunderstanding




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r435632933



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -85,16 +87,10 @@ def evaluate_at(self, session):  # type: (Session) -> T
     """Returns the result of self with the bindings given in session."""
     raise NotImplementedError(type(self))
 
-  def requires_partition_by_index(self):  # type: () -> bool
-    """Whether this expression requires its argument(s) to be partitioned
-    by index."""
-    # TODO: It might be necessary to support partitioning by part of the index,
-    # for some args, which would require returning more than a boolean here.
+  def requires_partition_by(self):  # type: () -> Partitioning
     raise NotImplementedError(type(self))
 
-  def preserves_partition_by_index(self):  # type: () -> bool
-    """Whether the result of this expression will be partitioned by index
-    whenever all of its inputs are partitioned by index."""
+  def preserves_partition_by(self):  # type: () -> Partitioning

Review comment:
       Docstring comments added. 

##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartitioning_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
+
+  The ordering is implemented via the is_subpartitioning_of method, where the
+  examples on the right are subpartitionings of the examples on the left above.
+  """
+
+  _INDEX_PARTITIONS = 10
+
+  def __init__(self, levels=None):
+    self._levels = levels
+
+  def __eq__(self, other):
+    return type(self) == type(other) and self._levels == other._levels
+
+  def __hash__(self):
+    if self._levels:
+      return hash(tuple(sorted(self._levels)))
+    else:
+      return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    if isinstance(other, Nothing):
+      return True
+    elif isinstance(other, Index):
+      if self._levels is None:
+        return True
+      elif other._levels is None:
+        return False
+      else:
+        return all(level in other._levels for level in self._levels)
+    else:
+      return False
+
+  def partition_fn(self, df):
+    if self._levels is None:
+      levels = list(range(df.index.nlevels))
+    else:
+      levels = self._levels
+    hashes = sum(
+        pd.util.hash_array(df.index.get_level_values(level))
+        for level in levels)
+    for key in range(self._INDEX_PARTITIONS):
+      yield key, df[hashes % self._INDEX_PARTITIONS == key]
+
+
+class Singleton(Partitioning):
+  """A partitioning co-locating all data to a singleton partition.

Review comment:
       Reworded. 

##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartitioning_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
+
+  The ordering is implemented via the is_subpartitioning_of method, where the
+  examples on the right are subpartitionings of the examples on the left above.
+  """
+
+  _INDEX_PARTITIONS = 10
+
+  def __init__(self, levels=None):
+    self._levels = levels
+
+  def __eq__(self, other):
+    return type(self) == type(other) and self._levels == other._levels
+
+  def __hash__(self):
+    if self._levels:
+      return hash(tuple(sorted(self._levels)))
+    else:
+      return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    if isinstance(other, Nothing):
+      return True
+    elif isinstance(other, Index):
+      if self._levels is None:
+        return True
+      elif other._levels is None:
+        return False
+      else:
+        return all(level in other._levels for level in self._levels)
+    else:
+      return False
+
+  def partition_fn(self, df):
+    if self._levels is None:
+      levels = list(range(df.index.nlevels))
+    else:
+      levels = self._levels
+    hashes = sum(
+        pd.util.hash_array(df.index.get_level_values(level))
+        for level in levels)
+    for key in range(self._INDEX_PARTITIONS):
+      yield key, df[hashes % self._INDEX_PARTITIONS == key]
+
+
+class Singleton(Partitioning):
+  """A partitioning co-locating all data to a singleton partition.
+  """
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    return True
+
+  def partition_fn(self, df):
+    yield None, df
+
+
+class Nothing(Partitioning):
+  """A partitioning imposing no constraints on the actual partitioning.
+  """
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
+  def is_subpartitioning_of(self, other):
+    return not other
+
+  def __bool__(self):
+    return False
+
+  __nonzero__ = __bool__

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -138,36 +140,35 @@ def evaluate(partition, stage=self.stage):
     class Stage(object):
       """Used to build up a set of operations that can be fused together.
       """
-      def __init__(self, inputs, is_grouping):
+      def __init__(self, inputs, partitioning):
         self.inputs = set(inputs)
-        self.is_grouping = is_grouping or len(self.inputs) > 1
+        if len(self.inputs) > 1 and not partitioning:
+          # We have to shuffle to co-locate, might as well partition.
+          self.partitioning = partitionings.Index()
+        else:
+          self.partitioning = partitioning
         self.ops = []
         self.outputs = set()
 
     # First define some helper functions.
-    def output_is_partitioned_by_index(expr, stage):
-      if expr in stage.inputs:
-        return stage.is_grouping
-      elif expr.preserves_partition_by_index():
-        if expr.requires_partition_by_index():
+    def output_is_partitioned_by(expr, stage, partitioning):
+      if not partitioning:
+        return True
+      elif stage.partitioning == partitionings.Singleton():
+        # Within a stage, the singleton partitioning is trivially preserved.
+        return True
+      elif expr in stage.inputs:
+        return stage.partitioning.is_subpartitioning_of(partitioning)
+      elif expr.preserves_partition_by().is_subpartitioning_of(partitioning):
+        if expr.requires_partition_by().is_subpartitioning_of(partitioning):

Review comment:
       Yeah, fleshing this out with more comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r434916844



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -85,16 +87,10 @@ def evaluate_at(self, session):  # type: (Session) -> T
     """Returns the result of self with the bindings given in session."""
     raise NotImplementedError(type(self))
 
-  def requires_partition_by_index(self):  # type: () -> bool
-    """Whether this expression requires its argument(s) to be partitioned
-    by index."""
-    # TODO: It might be necessary to support partitioning by part of the index,
-    # for some args, which would require returning more than a boolean here.
+  def requires_partition_by(self):  # type: () -> Partitioning
     raise NotImplementedError(type(self))
 
-  def preserves_partition_by_index(self):  # type: () -> bool
-    """Whether the result of this expression will be partitioned by index
-    whenever all of its inputs are partitioned by index."""
+  def preserves_partition_by(self):  # type: () -> Partitioning

Review comment:
       Ah makes sense. So perhaps "preserves" could be thought of as an upper bound on the partitioning of the output (similar to how "requires" is a lower bound on the partitioning of the input).
   
   It looks like every current expression has preserves set to either Nothing or Singleton. Wouldn't it be simpler to just keep preserves as a boolean? Or maybe you have some other expression in mind where a boolean won't be sufficient?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r435373732



##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -85,16 +87,10 @@ def evaluate_at(self, session):  # type: (Session) -> T
     """Returns the result of self with the bindings given in session."""
     raise NotImplementedError(type(self))
 
-  def requires_partition_by_index(self):  # type: () -> bool
-    """Whether this expression requires its argument(s) to be partitioned
-    by index."""
-    # TODO: It might be necessary to support partitioning by part of the index,
-    # for some args, which would require returning more than a boolean here.
+  def requires_partition_by(self):  # type: () -> Partitioning
     raise NotImplementedError(type(self))
 
-  def preserves_partition_by_index(self):  # type: () -> bool
-    """Whether the result of this expression will be partitioned by index
-    whenever all of its inputs are partitioned by index."""
+  def preserves_partition_by(self):  # type: () -> Partitioning

Review comment:
       There are operations, such as setting a column to be an additional level of the index, that would do partial preservation. But perhaps that's not worth the additional complexity. I can change this to a boolean if you'd rather. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #11766: [BEAM-10036] More flexible dataframes partitioning.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #11766:
URL: https://github.com/apache/beam/pull/11766#discussion_r433537957



##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartition_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()

Review comment:
       This ordering is determined by `is_subpartition_of` correct? I wonder if there's a way to clearly say that in this docstring?

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -23,6 +23,7 @@
 
 from apache_beam.dataframe import expressions
 from apache_beam.dataframe import frame_base
+from apache_beam.dataframe import frames  # pylint: disable=unused-import

Review comment:
       What is this for?

##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartition_of(self, other):
+    # type: (Partitioning) -> bool
+
+    """Returns whether self is a sub-partition of other.
+
+    Specifically, returns whether something partitioned by self is necissarily
+    also partitioned by other.
+    """
+    raise NotImplementedError
+
+  def partition_fn(self, df):
+    # type: (Frame) -> Iterable[Tuple[Any, Frame]]
+
+    """A callable that actually performs the partitioning of a Frame df.
+
+    This will be invoked via a FlatMap in conjunction with a GroupKey to
+    achieve the desired partitioning.
+    """
+    raise NotImplementedError
+
+
+class Index(Partitioning):
+  """A partitioning by index (either fully or partially).
+
+  If the set of "levels" of the index to consider is not specified, the entire
+  index is used.
+
+  These form a partial order, given by
+
+      Nothing() < Index([i]) < Index([i, j]) < ... < Index() < Singleton()
+  """
+
+  _INDEX_PARTITIONS = 100

Review comment:
       Previously this was 10 right (in `partitioned_by_index`)? Assuming this intentional, but I just wanted to double-check its not a typo.

##########
File path: sdks/python/apache_beam/dataframe/partitionings.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from typing import Any
+from typing import Iterable
+from typing import TypeVar
+
+import pandas as pd
+
+Frame = TypeVar('Frame', bound=pd.core.generic.NDFrame)
+
+
+class Partitioning(object):
+  """A class representing a (consistent) partitioning of dataframe objects.
+  """
+  def is_subpartition_of(self, other):

Review comment:
       nit: I think I'd prefer `is_subpartitioning_of`

##########
File path: sdks/python/apache_beam/dataframe/expressions.py
##########
@@ -85,16 +87,10 @@ def evaluate_at(self, session):  # type: (Session) -> T
     """Returns the result of self with the bindings given in session."""
     raise NotImplementedError(type(self))
 
-  def requires_partition_by_index(self):  # type: () -> bool
-    """Whether this expression requires its argument(s) to be partitioned
-    by index."""
-    # TODO: It might be necessary to support partitioning by part of the index,
-    # for some args, which would require returning more than a boolean here.
+  def requires_partition_by(self):  # type: () -> Partitioning
     raise NotImplementedError(type(self))
 
-  def preserves_partition_by_index(self):  # type: () -> bool
-    """Whether the result of this expression will be partitioned by index
-    whenever all of its inputs are partitioned by index."""
+  def preserves_partition_by(self):  # type: () -> Partitioning

Review comment:
       The meaning of this function is a little confusing now since it implies some connection to the input partitioning, but it also has it's own partitioning.  Would renaming it to `outputs_..` or `produces_..` still be accurate, or is the output partitioning actually a function of both "preserves" and the input?
   
   I also think we should consider changing `.._partition_by` to `.._partitioning` for clarity.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org