You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/10/01 17:58:00 UTC

[jira] [Work logged] (BEAM-7760) Interactive Beam Caching PCollections bound to user defined vars in notebook

     [ https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=321402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321402 ]

ASF GitHub Bot logged work on BEAM-7760:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Oct/19 17:57
            Start Date: 01/Oct/19 17:57
    Worklog Time Spent: 10m 
      Work Description: KevinGG commented on pull request #9619: [BEAM-7760] Added pipeline_instrument module
URL: https://github.com/apache/beam/pull/9619#discussion_r330193612
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
 ##########
 @@ -0,0 +1,470 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to instrument interactivity to the given pipeline.
+
+For internal use only; no backwards-compatibility guarantees.
+This module accesses current interactive environment and analyzes given pipeline
+to transform original pipeline into a one-shot pipeline with interactivity.
+"""
+from __future__ import absolute_import
+
+import logging
+
+import apache_beam as beam
+from apache_beam.pipeline import PipelineVisitor
+from apache_beam.runners.interactive import cache_manager as cache
+from apache_beam.runners.interactive import interactive_environment as ie
+
+READ_CACHE = "_ReadCache_"
+WRITE_CACHE = "_WriteCache_"
+
+
+class PipelineInstrument(object):
+  """A pipeline instrument for pipeline to be executed by interactive runner.
+
+  This module should never depend on underlying runner that interactive runner
+  delegates. It instruments the original instance of pipeline directly by
+  appending or replacing transforms with help of cache. It provides
+  interfaces to recover states of original pipeline. It's the interactive
+  runner's responsibility to coordinate supported underlying runners to run
+  the pipeline instrumented and recover the original pipeline states if needed.
+  """
+
+  def __init__(self, pipeline, options=None):
+    self._pipeline = pipeline
+    # The cache manager should be initiated outside of this module and outside
+    # of run_pipeline() from interactive runner so that its lifespan could cover
+    # multiple runs in the interactive environment. Owned by
+    # interactive_environment module. Not owned by this module.
+    # TODO(BEAM-7760): change the scope of cache to be owned by runner or
+    # pipeline result instances because a pipeline is not 1:1 correlated to a
+    # running job. Only complete and read-only cache is valid across multiple
+    # jobs. Other cache instances should have their own scopes. Some design
+    # change should support only runner.run(pipeline) pattern rather than
+    # pipeline.run([runner]) and a runner can only run at most one pipeline at a
+    # time. Otherwise, result returned by run() is the only 1:1 anchor.
+    self._cache_manager = ie.current_env().cache_manager()
 
 Review comment:
   There is some work from Alexey to decouple the caching from a centralized cache manager instance. So a cache manager might not exist anymore. Doing this to avoid exposing it in the constructor.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 321402)
    Time Spent: 11.5h  (was: 11h 20m)

> Interactive Beam Caching PCollections bound to user defined vars in notebook
> ----------------------------------------------------------------------------
>
>                 Key: BEAM-7760
>                 URL: https://issues.apache.org/jira/browse/BEAM-7760
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-python
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> Cache only PCollections bound to user defined variables in a pipeline when running pipeline with interactive runner in jupyter notebooks.
> [Interactive Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]] has been caching and using caches of "leaf" PCollections for interactive execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new transforms to existing pipeline for a new run, executed part of the pipeline doesn't need to be re-executed. 
> A PCollection is "leaf" when it is never used as input in any PTransform in the pipeline.
> The problem with building caches and pipeline to execute around "leaf" is that when a PCollection is consumed by a sink with no output, the pipeline to execute built will miss the subgraph generating and consuming that PCollection.
> An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty pipeline.
> Caching around PCollections bound to user defined variables and replacing transforms with source and sink of caches could resolve the pipeline to execute properly under the interactive execution scenario. Also, cached PCollection now can trace back to user code and can be used for user data visualization if user wants to do it.
> E.g.,
> {code:java}
> // ...
> p = beam.Pipeline(interactive_runner.InteractiveRunner(),
>                   options=pipeline_options)
> messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
> messages | "Write" >> beam.io.WriteToPubSub(topic_path)
> result = p.run()
> // ...
> visualize(messages){code}
>  The interactive runner automatically figures out that PCollection
> {code:java}
> messages{code}
> created by
> {code:java}
> p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
> should be cached and reused if the notebook user appends more transforms.
>  And once the pipeline gets executed, the user could use any visualize(PCollection) module to visualize the data statically (batch) or dynamically (stream)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)