You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/31 20:45:23 UTC

[GitHub] [beam] rohdesamuel commented on a change in pull request #14368: [BEAM-10708] Read/Write Intermediate PCollections

rohdesamuel commented on a change in pull request #14368:
URL: https://github.com/apache/beam/pull/14368#discussion_r605160116



##########
File path: sdks/python/apache_beam/runners/interactive/caching/read_cache.py
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to read cache of computed PCollections.
+
+For internal use only; no backward-compatibility guarantees.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.interactive import cache_manager as cache
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.pipeline_context import PipelineContext
+from apache_beam.transforms.ptransform import PTransform
+
+
+class ReadCache:
+  """Class that facilitates reading cache of computed PCollections.
+  """
+  def __init__(
+      self,
+      pipeline: beam_runner_api_pb2.Pipeline,
+      context: PipelineContext,
+      cache_manager: cache.CacheManager,
+      cacheable: Cacheable):
+    self._pipeline = pipeline
+    self._context = context
+    self._cache_manager = cache_manager
+    self._cacheable = cacheable
+    self._key = repr(cacheable.to_key())
+    self._label = '{}{}'.format('_cache_', self._key)
+
+  def read_cache(self) -> Tuple[str, str]:
+    """Reads cache of the cacheable PCollection and wires the cache into the
+    pipeline proto. Returns the pipeline-scoped ids of the cacheable PCollection
+    and the cache reading output PCollection that replaces it.
+    """
+    template, read_output = self._build_runner_api_template()
+    output_id = self._context.pcollections.get_id(read_output)
+    source_id = self._context.pcollections.get_id(self._cacheable.pcoll)
+    # Copy cache reading subgraph from the template to the pipeline proto.
+    for pcoll_id in template.components.pcollections:
+      if pcoll_id in self._pipeline.components.pcollections:
+        continue
+      self._pipeline.components.pcollections[pcoll_id].CopyFrom(
+          template.components.pcollections[pcoll_id])
+    for coder_id in template.components.coders:
+      if coder_id in self._pipeline.components.coders:
+        continue
+      self._pipeline.components.coders[coder_id].CopyFrom(
+          template.components.coders[coder_id])
+    for windowing_strategy_id in template.components.windowing_strategies:
+      if windowing_strategy_id in \
+          self._pipeline.components.windowing_strategies:

Review comment:
       Prefer wrapping line in parentheses instead of using backslash.

##########
File path: sdks/python/apache_beam/runners/interactive/caching/read_cache.py
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to read cache of computed PCollections.
+
+For internal use only; no backward-compatibility guarantees.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.interactive import cache_manager as cache
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.pipeline_context import PipelineContext
+from apache_beam.transforms.ptransform import PTransform
+
+
+class ReadCache:
+  """Class that facilitates reading cache of computed PCollections.
+  """
+  def __init__(
+      self,
+      pipeline: beam_runner_api_pb2.Pipeline,
+      context: PipelineContext,
+      cache_manager: cache.CacheManager,
+      cacheable: Cacheable):
+    self._pipeline = pipeline
+    self._context = context
+    self._cache_manager = cache_manager
+    self._cacheable = cacheable
+    self._key = repr(cacheable.to_key())
+    self._label = '{}{}'.format('_cache_', self._key)
+
+  def read_cache(self) -> Tuple[str, str]:

Review comment:
       Can you please add more comments in here to inform the reader what these proto transformations are doing and why?

##########
File path: sdks/python/apache_beam/runners/interactive/caching/write_cache.py
##########
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to write cache for PCollections being computed.
+
+For internal use only; no backward-compatibility guarantees.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.interactive import cache_manager as cache
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.pipeline_context import PipelineContext
+from apache_beam.testing import test_stream
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import WindowedValue
+
+
+class WriteCache:
+  """Class that facilitates writing cache for PCollections being computed.
+  """
+  def __init__(
+      self,
+      pipeline: beam_runner_api_pb2.Pipeline,
+      context: PipelineContext,
+      cache_manager: cache.CacheManager,
+      cacheable: Cacheable):
+    self._pipeline = pipeline
+    self._context = context
+    self._cache_manager = cache_manager
+    self._cacheable = cacheable
+    self._key = repr(cacheable.to_key())
+    self._label = '{}{}'.format('_cache_', self._key)
+
+  def write_cache(self) -> None:

Review comment:
       More comments in this method too, please.

##########
File path: sdks/python/apache_beam/runners/interactive/caching/write_cache.py
##########
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to write cache for PCollections being computed.
+
+For internal use only; no backward-compatibility guarantees.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.interactive import cache_manager as cache
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.pipeline_context import PipelineContext
+from apache_beam.testing import test_stream
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import WindowedValue
+
+
+class WriteCache:
+  """Class that facilitates writing cache for PCollections being computed.
+  """
+  def __init__(
+      self,
+      pipeline: beam_runner_api_pb2.Pipeline,
+      context: PipelineContext,
+      cache_manager: cache.CacheManager,
+      cacheable: Cacheable):
+    self._pipeline = pipeline
+    self._context = context
+    self._cache_manager = cache_manager
+    self._cacheable = cacheable
+    self._key = repr(cacheable.to_key())
+    self._label = '{}{}'.format('_cache_', self._key)
+
+  def write_cache(self) -> None:
+    """Writes cache for the cacheable PCollection that is being computed.
+    """
+    template, write_input_placeholder = self._build_runner_api_template()
+    input_placeholder_id = self._context.pcollections.get_id(
+        write_input_placeholder.placeholder_pcoll)
+    input_id = self._context.pcollections.get_id(self._cacheable.pcoll)
+
+    # Copy cache writing subgraph from the template to the pipeline proto.
+    for pcoll_id in template.components.pcollections:

Review comment:
       I'm not sure that this code is correct. The component ids (PCollection ids, coder ids, etc.) are specific to a given pipeline. In other words, given a component id in one pipeline there is no guarantee that the component is the same given the same component id in another pipeline.
   
   Instead of doing this equality of ids between pipelines, it might be better to recursively iterate through the temporary pipeline and copy components over to _pipeline. You'll have to be careful though as the pipeline proto has some hidden ordering guarantees, like the transforms need to be in a topological ordering.

##########
File path: sdks/python/apache_beam/runners/interactive/augmented_pipeline.py
##########
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to augment interactive flavor into the given pipeline.
+
+For internal use only; no backward-compatibility guarantees.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import copy
+from typing import Dict
+from typing import Optional
+from typing import Set
+
+import apache_beam as beam
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import background_caching_job
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.interactive.caching.read_cache import ReadCache
+from apache_beam.runners.interactive.caching.write_cache import WriteCache
+
+
+class AugmentedPipeline:
+  """A pipeline with augmented interactive flavor that caches intermediate
+  PCollections defined by the user, reads computed PCollections as source and
+  prunes unnecessary pipeline parts for fast computation.
+  """
+  def __init__(
+      self,
+      user_pipeline: beam.Pipeline,
+      pcolls: Optional[Set[beam.pvalue.PCollection]] = None):
+    """
+    Initializes a pipelilne for augmenting interactive flavor.
+
+    Args:
+      user_pipeline: a beam.Pipeline instance defined by the user.
+      pcolls: cacheable pcolls to be computed/retrieved. If the set is
+        empty, all intermediate pcolls assigned to variables are applicable.
+    """
+    assert not pcolls or all([pcoll.pipeline is user_pipeline for pcoll in
+      pcolls]), 'All %s need to belong to %s' % (pcolls, user_pipeline)
+    self._user_pipeline = user_pipeline
+    self._pcolls = pcolls
+    self._cache_manager = ie.current_env().get_cache_manager(
+        self._user_pipeline, create_if_absent=True)
+    if background_caching_job.has_source_to_cache(self._user_pipeline):
+      self._cache_manager = ie.current_env().get_cache_manager(
+          self._user_pipeline)
+    self._pipeline, self._context = self._user_pipeline.to_runner_api(
+        return_context=True)
+    self._context.component_id_map = copy.copy(
+        self._user_pipeline.component_id_map)
+    self._cacheables = self.cacheables()
+    self._augmented = False
+
+  @property
+  def augmented_pipeline(self) -> beam_runner_api_pb2.Pipeline:
+    assert self._augmented, 'Call augment() before retrieving the value.'
+    return self._pipeline
+
+  # TODO(BEAM-10708): Support generating a background recording job that
+  # contains unbound source recording transforms only.
+  @property
+  def background_recording_pipeline(self) -> beam_runner_api_pb2.Pipeline:
+    raise NotImplementedError
+
+  def cacheables(self) -> Dict[beam.pvalue.PCollection, Cacheable]:
+    """Finds all the cacheable intermediate PCollections in the pipeline with
+    their metadata.
+    """
+    c = {}
+    for watching in ie.current_env().watching():
+      for key, val in watching:
+        if (isinstance(val, beam.pvalue.PCollection) and
+            val.pipeline is self._user_pipeline and
+            (not self._pcolls or val in self._pcolls)):
+          pcoll_id = self._context.pcollections.get_id(val)
+          c[val] = Cacheable(
+              pcoll_id=pcoll_id,
+              var=key,
+              pcoll=val,
+              version=str(id(val)),
+              producer_version=str(id(val.producer)))
+    return c
+
+  def augment(self) -> 'AugmentedPipeline':
+    """Augments the pipeline with cache. Idempotent and returns self.
+
+    For a cacheable PCollection, if cache exists, read cache; else, write cache.
+    """
+    if self._augmented:
+      return self
+    self._augmented = True

Review comment:
       Instead of having state that tracks whether the pipeline is augmented, it's a simpler API to have this always augment the user pipeline and let the client handle caching the result.

##########
File path: sdks/python/apache_beam/runners/interactive/augmented_pipeline.py
##########
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to augment interactive flavor into the given pipeline.
+
+For internal use only; no backward-compatibility guarantees.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import copy
+from typing import Dict
+from typing import Optional
+from typing import Set
+
+import apache_beam as beam
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import background_caching_job
+from apache_beam.runners.interactive.caching.cacheable import Cacheable
+from apache_beam.runners.interactive.caching.read_cache import ReadCache
+from apache_beam.runners.interactive.caching.write_cache import WriteCache
+
+
+class AugmentedPipeline:
+  """A pipeline with augmented interactive flavor that caches intermediate
+  PCollections defined by the user, reads computed PCollections as source and
+  prunes unnecessary pipeline parts for fast computation.
+  """
+  def __init__(
+      self,
+      user_pipeline: beam.Pipeline,
+      pcolls: Optional[Set[beam.pvalue.PCollection]] = None):
+    """
+    Initializes a pipelilne for augmenting interactive flavor.
+
+    Args:
+      user_pipeline: a beam.Pipeline instance defined by the user.
+      pcolls: cacheable pcolls to be computed/retrieved. If the set is
+        empty, all intermediate pcolls assigned to variables are applicable.
+    """
+    assert not pcolls or all([pcoll.pipeline is user_pipeline for pcoll in
+      pcolls]), 'All %s need to belong to %s' % (pcolls, user_pipeline)
+    self._user_pipeline = user_pipeline
+    self._pcolls = pcolls
+    self._cache_manager = ie.current_env().get_cache_manager(
+        self._user_pipeline, create_if_absent=True)
+    if background_caching_job.has_source_to_cache(self._user_pipeline):
+      self._cache_manager = ie.current_env().get_cache_manager(
+          self._user_pipeline)
+    self._pipeline, self._context = self._user_pipeline.to_runner_api(
+        return_context=True)
+    self._context.component_id_map = copy.copy(
+        self._user_pipeline.component_id_map)
+    self._cacheables = self.cacheables()
+    self._augmented = False
+
+  @property
+  def augmented_pipeline(self) -> beam_runner_api_pb2.Pipeline:
+    assert self._augmented, 'Call augment() before retrieving the value.'
+    return self._pipeline
+
+  # TODO(BEAM-10708): Support generating a background recording job that
+  # contains unbound source recording transforms only.
+  @property
+  def background_recording_pipeline(self) -> beam_runner_api_pb2.Pipeline:
+    raise NotImplementedError
+
+  def cacheables(self) -> Dict[beam.pvalue.PCollection, Cacheable]:
+    """Finds all the cacheable intermediate PCollections in the pipeline with
+    their metadata.
+    """
+    c = {}
+    for watching in ie.current_env().watching():
+      for key, val in watching:
+        if (isinstance(val, beam.pvalue.PCollection) and
+            val.pipeline is self._user_pipeline and
+            (not self._pcolls or val in self._pcolls)):
+          pcoll_id = self._context.pcollections.get_id(val)
+          c[val] = Cacheable(
+              pcoll_id=pcoll_id,
+              var=key,
+              pcoll=val,
+              version=str(id(val)),
+              producer_version=str(id(val.producer)))
+    return c
+
+  def augment(self) -> 'AugmentedPipeline':
+    """Augments the pipeline with cache. Idempotent and returns self.
+
+    For a cacheable PCollection, if cache exists, read cache; else, write cache.
+    """
+    if self._augmented:
+      return self
+    self._augmented = True
+
+    # Find pcolls eligible for reading or writing cache.
+    readcache_pcolls = set()
+    for pcoll, cacheable in self._cacheables.items():
+      key = repr(cacheable.to_key())
+      if (self._cache_manager.exists('full', key) and
+          pcoll in ie.current_env().computed_pcollections):

Review comment:
       Is there ever a case where a PCollection is computed but doesn't exist?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org