You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/09 23:01:56 UTC

[GitHub] [beam] TheNeuralBit opened a new pull request #13066: [BEAM-11052] Memoize to_pcollection

TheNeuralBit opened a new pull request #13066:
URL: https://github.com/apache/beam/pull/13066


   R: @robertwb 
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r503509711



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,21 +142,47 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(
+                         (ix, df._expr) for ix, df in enumerate(new_dataframes))
+                 )  # type: Dict[Any, pvalue.PCollection]
+
+  TO_PCOLLECTION_CACHE.update(
+      {new_dataframes[ix]._expr: pc
+       for ix, pc in new_results.items()})
+
+  raw_results = {
+      ix: TO_PCOLLECTION_CACHE[df._expr]
+      for ix, df in enumerate(dataframes)
+  }
 
   if yield_elements == "schemas":
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
-        for (key, pc) in results.items()
+        ix: _make_unbatched_pcoll(pc, dataframes[ix]._expr, include_indexes)
+        for ix,

Review comment:
       Done, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r512994848



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,19 +161,36 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r502717312



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,6 +68,9 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+TO_PCOLLECTION_CACHE = {}

Review comment:
       Yeah agreed it could be a problem in production as well. `WeakKeyDictionary` is an interesting idea, I hadn't seen the `weakref` library before. I'll give that a shot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r502706227



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,6 +68,9 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+TO_PCOLLECTION_CACHE = {}

Review comment:
       Note this uses a global cache which can be problematic in testing. I don't think we need to worry about pcollections being shared across pipelines though since the dataframe expressions should have a reference to the pipeline through the to_dataframe roots.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r508667777



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,7 +69,28 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+# PCollections generated by to_pcollection are memoized.
+# WeakValueDictionary is used so the caches are cleaned up with the parent
+# pipelines

Review comment:
       Good idea, added that language




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r503508554



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,6 +68,9 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+TO_PCOLLECTION_CACHE = {}

Review comment:
       Updated this to use `WeakValueDictionary` for the caches, PTAL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #13066:
URL: https://github.com/apache/beam/pull/13066


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13066:
URL: https://github.com/apache/beam/pull/13066#issuecomment-717506312


   @robertwb I think I've addressed all your comments, PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r508657178



##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_memoization(self):
+    with beam.Pipeline() as p:
+      a = pd.Series([1, 2, 3])
+      b = pd.Series([100, 200, 300])
+
+      pc_a = p | 'A' >> beam.Create([a])
+      pc_b = p | 'B' >> beam.Create([b])
+
+      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+      df_2a = 2 * df_a
+      df_3a = 3 * df_a
+      df_ab = df_a * df_b
+
+      # Converting multiple results at a time can be more efficient.
+      pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
+      # Converting the same expressions should yeild the same pcolls
+      pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
+
+      self.assertEqual(id(pc_2a), id(pc_2a_))
+      self.assertEqual(id(pc_ab), id(pc_ab_))
+
+      assert_that(pc_2a, equal_to(list(2 * a)), label='Check2a')
+      assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
+      assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+
+  def test_convert_memoization_yield_pandas(self):
+    with beam.Pipeline() as p:
+      a = pd.Series([1, 2, 3])
+      b = pd.Series([100, 200, 300])
+
+      pc_a = p | 'A' >> beam.Create([a])
+      pc_b = p | 'B' >> beam.Create([b])
+
+      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+      df_2a = 2 * df_a
+      df_3a = 3 * df_a
+      df_ab = df_a * df_b
+
+      # Converting multiple results at a time can be more efficient.
+      pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab,

Review comment:
       Done

##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_memoization(self):
+    with beam.Pipeline() as p:
+      a = pd.Series([1, 2, 3])
+      b = pd.Series([100, 200, 300])
+
+      pc_a = p | 'A' >> beam.Create([a])
+      pc_b = p | 'B' >> beam.Create([b])
+
+      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+      df_2a = 2 * df_a
+      df_3a = 3 * df_a
+      df_ab = df_a * df_b
+
+      # Converting multiple results at a time can be more efficient.
+      pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
+      # Converting the same expressions should yeild the same pcolls
+      pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
+
+      self.assertEqual(id(pc_2a), id(pc_2a_))

Review comment:
       thanks, done

##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_memoization(self):

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r502713670



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,21 +142,47 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(
+                         (ix, df._expr) for ix, df in enumerate(new_dataframes))
+                 )  # type: Dict[Any, pvalue.PCollection]
+
+  TO_PCOLLECTION_CACHE.update(
+      {new_dataframes[ix]._expr: pc
+       for ix, pc in new_results.items()})
+
+  raw_results = {
+      ix: TO_PCOLLECTION_CACHE[df._expr]
+      for ix, df in enumerate(dataframes)
+  }
 
   if yield_elements == "schemas":
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
-        for (key, pc) in results.items()
+        ix: _make_unbatched_pcoll(pc, dataframes[ix]._expr, include_indexes)
+        for ix,
+        pc in raw_results.items()
     }
+  else:
+    results = raw_results
 
   if len(results) == 1 and not always_return_tuple:
     return results[0]
   else:
     return tuple(value for key, value in sorted(results.items()))
+
+
+memoize = functools.lru_cache(maxsize=None)
+
+
+@memoize

Review comment:
       Again, this grows without bound.

##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,21 +142,47 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(
+                         (ix, df._expr) for ix, df in enumerate(new_dataframes))
+                 )  # type: Dict[Any, pvalue.PCollection]
+
+  TO_PCOLLECTION_CACHE.update(
+      {new_dataframes[ix]._expr: pc
+       for ix, pc in new_results.items()})
+
+  raw_results = {
+      ix: TO_PCOLLECTION_CACHE[df._expr]
+      for ix, df in enumerate(dataframes)
+  }
 
   if yield_elements == "schemas":
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
-        for (key, pc) in results.items()
+        ix: _make_unbatched_pcoll(pc, dataframes[ix]._expr, include_indexes)
+        for ix,

Review comment:
       Put ()'s around `ix, pc` for better formatting.

##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,6 +68,9 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+TO_PCOLLECTION_CACHE = {}

Review comment:
       I'm actually more worried about global caches in production than testing--the expressions themselves should not collide between pipelines. 
   
   Maybe we could use a weakref.WeakKeyDictionary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13066:
URL: https://github.com/apache/beam/pull/13066#issuecomment-720725073


   Windows failures are due to unrelated flake BEAM-10921. I'll go ahead and merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r503508259



##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,21 +142,47 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(
+                         (ix, df._expr) for ix, df in enumerate(new_dataframes))
+                 )  # type: Dict[Any, pvalue.PCollection]
+
+  TO_PCOLLECTION_CACHE.update(
+      {new_dataframes[ix]._expr: pc
+       for ix, pc in new_results.items()})
+
+  raw_results = {
+      ix: TO_PCOLLECTION_CACHE[df._expr]
+      for ix, df in enumerate(dataframes)
+  }
 
   if yield_elements == "schemas":
     results = {
-        key: pc
-        | "Unbatch '%s'" % dataframes[key]._expr._id >> schemas.UnbatchPandas(
-            dataframes[key]._expr.proxy(), include_indexes=include_indexes)
-        for (key, pc) in results.items()
+        ix: _make_unbatched_pcoll(pc, dataframes[ix]._expr, include_indexes)
+        for ix,
+        pc in raw_results.items()
     }
+  else:
+    results = raw_results
 
   if len(results) == 1 and not always_return_tuple:
     return results[0]
   else:
     return tuple(value for key, value in sorted(results.items()))
+
+
+memoize = functools.lru_cache(maxsize=None)
+
+
+@memoize

Review comment:
       Right I used the same approach for both.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13066:
URL: https://github.com/apache/beam/pull/13066#discussion_r506744373



##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_memoization(self):

Review comment:
       This test (and below) seem largely copies of the test above, but we don't even need to run the pipeline to test what this is testing. Perhaps it'd be better to limit it to what we're trying to test for clarity, i.e. the ids are the same. 

##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -138,19 +161,36 @@ def extract_input(placeholder):
 
   placeholders = frozenset.union(
       frozenset(), *[df._expr.placeholders() for df in dataframes])
-  results = {p: extract_input(p)
-             for p in placeholders
-             } | label >> transforms._DataframeExpressionsTransform(
-                 dict((ix, df._expr) for ix, df in enumerate(
-                     dataframes)))  # type: Dict[Any, pvalue.PCollection]
+
+  # Exclude any dataframes that have already been converted to PCollections.
+  # We only want to convert each DF expression once, then re-use.
+  new_dataframes = [
+      df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
+  ]
+  new_results = {p: extract_input(p)
+                 for p in placeholders
+                 } | label >> transforms._DataframeExpressionsTransform(
+                     dict(

Review comment:
       Dict comprehension (for consistency)? 

##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_memoization(self):
+    with beam.Pipeline() as p:
+      a = pd.Series([1, 2, 3])
+      b = pd.Series([100, 200, 300])
+
+      pc_a = p | 'A' >> beam.Create([a])
+      pc_b = p | 'B' >> beam.Create([b])
+
+      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+      df_2a = 2 * df_a
+      df_3a = 3 * df_a
+      df_ab = df_a * df_b
+
+      # Converting multiple results at a time can be more efficient.
+      pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
+      # Converting the same expressions should yeild the same pcolls
+      pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
+
+      self.assertEqual(id(pc_2a), id(pc_2a_))
+      self.assertEqual(id(pc_ab), id(pc_ab_))
+
+      assert_that(pc_2a, equal_to(list(2 * a)), label='Check2a')
+      assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
+      assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
+
+  def test_convert_memoization_yield_pandas(self):
+    with beam.Pipeline() as p:
+      a = pd.Series([1, 2, 3])
+      b = pd.Series([100, 200, 300])
+
+      pc_a = p | 'A' >> beam.Create([a])
+      pc_b = p | 'B' >> beam.Create([b])
+
+      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+      df_2a = 2 * df_a
+      df_3a = 3 * df_a
+      df_ab = df_a * df_b
+
+      # Converting multiple results at a time can be more efficient.
+      pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab,

Review comment:
       Move these tests into the former, so you can also assert that `to_pcollection(x, yield_elements='schema') != to_pcollection(x, yield_elements='pandas')` (i.e. no accidental cross-cache contamination). 

##########
File path: sdks/python/apache_beam/dataframe/convert_test.py
##########
@@ -85,6 +85,94 @@ def test_convert(self):
       assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
       assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
 
+  def test_convert_memoization(self):
+    with beam.Pipeline() as p:
+      a = pd.Series([1, 2, 3])
+      b = pd.Series([100, 200, 300])
+
+      pc_a = p | 'A' >> beam.Create([a])
+      pc_b = p | 'B' >> beam.Create([b])
+
+      df_a = convert.to_dataframe(pc_a, proxy=a[:0])
+      df_b = convert.to_dataframe(pc_b, proxy=b[:0])
+
+      df_2a = 2 * df_a
+      df_3a = 3 * df_a
+      df_ab = df_a * df_b
+
+      # Converting multiple results at a time can be more efficient.
+      pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
+      # Converting the same expressions should yeild the same pcolls
+      pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
+
+      self.assertEqual(id(pc_2a), id(pc_2a_))

Review comment:
       assertIs?

##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -67,7 +69,28 @@ def to_dataframe(
       expressions.PlaceholderExpression(proxy, pcoll))
 
 
+# PCollections generated by to_pcollection are memoized.
+# WeakValueDictionary is used so the caches are cleaned up with the parent
+# pipelines

Review comment:
       Nice. Perhaps it's worth noting that the pipeline (indirectly) holds references to the transforms which keep both the collections and expressions alive. (Keeping the expressions alive is important to ensure their ids never get accidentally re-used.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13066: [BEAM-11052] Memoize to_pcollection

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13066:
URL: https://github.com/apache/beam/pull/13066#issuecomment-718251075


   Rebased to resolve merge conflicts


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org