You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/12/10 00:46:18 UTC
[beam] branch master updated: [BEAM-7926] Data-centric Interactive
Part1
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b26000a [BEAM-7926] Data-centric Interactive Part1
new 024c385 Merge pull request #10276 from KevinGG/BEAM-7926-part1
b26000a is described below
commit b26000a0b76590ebe4a7299ae0680ed421fa2b34
Author: Ning Kang <ni...@ningk-macbookpro.roam.corp.google.com>
AuthorDate: Tue Dec 3 14:26:30 2019 -0800
[BEAM-7926] Data-centric Interactive Part1
1. Changed Beam Python SDK’s pipeline transform apply() logic to
implicitly alter the label of top-level transform to be applied with
ipython-prompt metadata when the code execution is in ipython and during
user-defined pipeline construction time.
2. The users can observe what pieces of code are in-effect by looking at
the labels of applied PTransforms. Thus they can avoid broken in-memory
states due to notebook cell-re-execution/cell-deletion/hidden-states.
3. Allow duplicate PTransform labels in different cells when the user
constructs a pipeline under an ipython/notebook environment because the
label altering de-duplicates the labels.
4. Created common testing package and modules: Created a mock ipython
kernel for testing; Made pipeline assertion a common testing module.
5. Updated tests for ipython related scenarios in a mocked ipython
environment.
6. Updated the example notebook.
---
sdks/python/apache_beam/pipeline.py | 7 +
.../runners/dataflow/dataflow_runner.py | 5 +-
.../interactive/display/pcoll_visualization.py | 9 +-
.../interactive/display/pipeline_graph_test.py | 47 ++++
.../examples/Interactive Beam Example.ipynb | 256 ++++++++++++---------
.../runners/interactive/interactive_environment.py | 38 ++-
.../runners/interactive/pipeline_instrument.py | 20 +-
.../interactive/pipeline_instrument_test.py | 64 +-----
.../runners/interactive/testing/__init__.py | 16 ++
.../runners/interactive/testing/mock_ipython.py | 67 ++++++
.../interactive/testing/pipeline_assertion.py | 86 +++++++
sdks/python/apache_beam/runners/utils.py | 47 ----
sdks/python/apache_beam/utils/interactive_utils.py | 100 ++++++++
13 files changed, 532 insertions(+), 230 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 61673fe..bc52e72 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -74,6 +74,7 @@ from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import typehints
from apache_beam.utils.annotations import deprecated
+from apache_beam.utils.interactive_utils import alter_label_if_ipython
__all__ = ['Pipeline', 'PTransformOverride']
@@ -491,6 +492,12 @@ class Pipeline(object):
finally:
transform.label = old_label
+ # Attempts to alter the label of the transform to be applied only when it's
+ # a top-level transform so that the cell number will not be prepended to
+ # every child transform in a composite.
+ if self._current_transform() is self._root_transform():
+ alter_label_if_ipython(transform, pvalueish)
+
full_label = '/'.join([self._current_transform().full_label,
label or transform.label]).lstrip('/')
if full_label in self.applied_labels:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1fae45d..75aedd1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -58,11 +58,11 @@ from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import PValueCache
-from apache_beam.runners.utils import is_interactive
from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayData
from apache_beam.typehints import typehints
from apache_beam.utils import proto_utils
+from apache_beam.utils.interactive_utils import is_in_notebook
from apache_beam.utils.plugin import BeamPlugin
try: # Python 3
@@ -376,8 +376,7 @@ class DataflowRunner(PipelineRunner):
def run_pipeline(self, pipeline, options):
"""Remotely executes entire pipeline or parts reachable from node."""
# Label goog-dataflow-notebook if job is started from notebook.
- _, is_in_notebook = is_interactive()
- if is_in_notebook:
+ if is_in_notebook():
notebook_version = ('goog-dataflow-notebook=' +
beam.version.__version__.replace('.', '_'))
if options.view_as(GoogleCloudOptions).labels:
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 1f0e925..bdd34ca 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -74,11 +74,14 @@ _OVERVIEW_HTML_TEMPLATE = """
</script>"""
_DATAFRAME_PAGINATION_TEMPLATE = """
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.2/jquery.min.js"></script>
- <script src="https://cdn.datatables.net/1.10.16/js/jquery.dataTables.js"></script>
- <link rel="stylesheet" href="https://cdn.datatables.net/1.10.16/css/jquery.dataTables.css">
+ <script src="https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js"></script>
+ <link rel="stylesheet" href="https://cdn.datatables.net/1.10.20/css/jquery.dataTables.min.css">
{dataframe_html}
<script>
- $("#{table_id}").DataTable();
+ $(document).ready(
+ function() {{
+ $("#{table_id}").DataTable();
+ }});
</script>"""
diff --git a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py
index 12cc3ef..e73dbd6 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py
@@ -25,6 +25,14 @@ from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import interactive_runner as ir
from apache_beam.runners.interactive.display import pipeline_graph
+from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+ from unittest.mock import patch
+except ImportError:
+ from mock import patch
# pylint: disable=range-builtin-not-iterating,unused-variable,possibly-unused-variable
# Reason:
@@ -83,6 +91,45 @@ class PipelineGraphTest(unittest.TestCase):
'}\n'),
pipeline_graph.PipelineGraph(p).get_dot())
+ @patch('IPython.get_ipython', mock_get_ipython)
+ def test_get_dot_within_notebook(self):
+ # Assume a mocked ipython kernel and notebook frontend have been set up.
+ ie.current_env()._is_in_ipython = True
+ ie.current_env()._is_in_notebook = True
+ with mock_get_ipython(): # Cell 1
+ p = beam.Pipeline(ir.InteractiveRunner())
+ # Immediately track this local pipeline so that ipython prompts when
+ # applying transforms will be tracked and used for labels.
+ ib.watch(locals())
+
+ with mock_get_ipython(): # Cell 2
+ init_pcoll = p | 'Init' >> beam.Create(range(10))
+
+ with mock_get_ipython(): # Cell 3
+ squares = init_pcoll | 'Square' >> beam.Map(lambda x: x * x)
+
+ with mock_get_ipython(): # Cell 4
+ cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x ** 3)
+
+ # Tracks all PCollections defined so far.
+ ib.watch(locals())
+ self.assertEqual(
+ ('digraph G {\n'
+ 'node [color=blue, fontcolor=blue, shape=box];\n'
+ '"Cell 2: Init";\n'
+ 'init_pcoll [shape=circle];\n'
+ '"Cell 3: Square";\n'
+ 'squares [shape=circle];\n'
+ '"Cell 4: Cube";\n'
+ 'cubes [shape=circle];\n'
+ '"Cell 2: Init" -> init_pcoll;\n'
+ 'init_pcoll -> "Cell 3: Square";\n'
+ 'init_pcoll -> "Cell 4: Cube";\n'
+ '"Cell 3: Square" -> squares;\n'
+ '"Cell 4: Cube" -> cubes;\n'
+ '}\n'),
+ pipeline_graph.PipelineGraph(p).get_dot())
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb b/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb
index a4486ee..b461a26 100644
--- a/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb
+++ b/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb
@@ -50,67 +50,76 @@
"<!-- Generated by graphviz version 2.43.0 (0)\n",
" -->\n",
"<!-- Title: G Pages: 1 -->\n",
- "<svg width=\"190pt\" height=\"253pt\"\n",
- " viewBox=\"0.00 0.00 190.09 253.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
- "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 249)\">\n",
+ "<svg width=\"208pt\" height=\"349pt\"\n",
+ " viewBox=\"0.00 0.00 208.43 349.26\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+ "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 345.26)\">\n",
"<title>G</title>\n",
- "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-249 186.09,-249 186.09,4 -4,4\"/>\n",
- "<!-- Create -->\n",
+ "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-345.26 204.43,-345.26 204.43,4 -4,4\"/>\n",
+ "<!-- Cell 2: Create -->\n",
"<g id=\"node1\" class=\"node\">\n",
- "<title>Create</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"86.05\" cy=\"-227\" rx=\"33.37\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"86.05\" y=\"-222.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Create</text>\n",
+ "<title>Cell 2: Create</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"148.8,-341.26 55.3,-341.26 55.3,-305.26 148.8,-305.26 148.8,-341.26\"/>\n",
+ "<text text-anchor=\"middle\" x=\"102.05\" y=\"-319.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Create</text>\n",
"</g>\n",
- "<!-- diverge6871 -->\n",
+ "<!-- init_pcoll -->\n",
"<g id=\"node2\" class=\"node\">\n",
- "<title>diverge6871</title>\n",
- "<ellipse fill=\"blue\" stroke=\"blue\" cx=\"86.05\" cy=\"-159\" rx=\"0\" ry=\"0\"/>\n",
+ "<title>init_pcoll</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"102.05\" cy=\"-225.26\" rx=\"44.01\" ry=\"44.01\"/>\n",
+ "<text text-anchor=\"middle\" x=\"102.05\" y=\"-221.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">init_pcoll</text>\n",
"</g>\n",
- "<!-- Create->diverge6871 -->\n",
+ "<!-- Cell 2: Create->init_pcoll -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
- "<title>Create->diverge6871</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M86.05,-208.66C86.05,-189.72 86.05,-161.81 86.05,-160.08\"/>\n",
- "<text text-anchor=\"middle\" x=\"112.88\" y=\"-179.8\" font-family=\"Times,serif\" font-size=\"14.00\">init_pcoll</text>\n",
+ "<title>Cell 2: Create->init_pcoll</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M102.05,-305.1C102.05,-297.81 102.05,-288.88 102.05,-279.68\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"105.55,-279.65 102.05,-269.65 98.55,-279.65 105.55,-279.65\"/>\n",
"</g>\n",
- "<!-- Square -->\n",
+ "<!-- Cell 2: Square -->\n",
"<g id=\"node3\" class=\"node\">\n",
- "<title>Square</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"46.05\" cy=\"-104\" rx=\"34.83\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"46.05\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Square</text>\n",
+ "<title>Cell 2: Square</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"96.15,-145.26 -0.05,-145.26 -0.05,-109.26 96.15,-109.26 96.15,-145.26\"/>\n",
+ "<text text-anchor=\"middle\" x=\"48.05\" y=\"-123.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Square</text>\n",
"</g>\n",
- "<!-- diverge6871->Square -->\n",
+ "<!-- init_pcoll->Cell 2: Square -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
- "<title>diverge6871->Square</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M85.93,-157.85C84.72,-156.24 74.07,-142.14 64.09,-128.91\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"66.87,-126.78 58.05,-120.9 61.28,-130.99 66.87,-126.78\"/>\n",
+ "<title>init_pcoll->Cell 2: Square</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M80.86,-186.59C74.77,-175.77 68.27,-164.2 62.67,-154.25\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"65.57,-152.28 57.62,-145.28 59.47,-155.71 65.57,-152.28\"/>\n",
"</g>\n",
- "<!-- Cube -->\n",
+ "<!-- Cell 2: Cube -->\n",
"<g id=\"node5\" class=\"node\">\n",
- "<title>Cube</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"136.05\" cy=\"-104\" rx=\"28.55\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"136.05\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cube</text>\n",
+ "<title>Cell 2: Cube</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"200.32,-145.26 113.78,-145.26 113.78,-109.26 200.32,-109.26 200.32,-145.26\"/>\n",
+ "<text text-anchor=\"middle\" x=\"157.05\" y=\"-123.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Cube</text>\n",
"</g>\n",
- "<!-- diverge6871->Cube -->\n",
+ "<!-- init_pcoll->Cell 2: Cube -->\n",
"<g id=\"edge3\" class=\"edge\">\n",
- "<title>diverge6871->Cube</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M86.19,-157.85C87.77,-156.17 102.21,-140.86 115.15,-127.15\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"117.77,-129.47 122.08,-119.8 112.68,-124.67 117.77,-129.47\"/>\n",
+ "<title>init_pcoll->Cell 2: Cube</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M123.63,-186.59C129.83,-175.77 136.46,-164.2 142.16,-154.25\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"145.36,-155.69 147.3,-145.28 139.29,-152.21 145.36,-155.69\"/>\n",
"</g>\n",
- "<!-- leaf6244 -->\n",
- "<!-- Square->leaf6244 -->\n",
+ "<!-- squares -->\n",
+ "<g id=\"node4\" class=\"node\">\n",
+ "<title>squares</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"48.05\" cy=\"-36.63\" rx=\"36.76\" ry=\"36.76\"/>\n",
+ "<text text-anchor=\"middle\" x=\"48.05\" y=\"-32.43\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">squares</text>\n",
+ "</g>\n",
+ "<!-- Cell 2: Square->squares -->\n",
"<g id=\"edge4\" class=\"edge\">\n",
- "<title>Square->leaf6244</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M45.03,-86C44.36,-74.69 43.46,-59.58 42.69,-46.57\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"46.16,-46.04 42.07,-36.26 39.17,-46.45 46.16,-46.04\"/>\n",
- "<text text-anchor=\"middle\" x=\"65.04\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">squares</text>\n",
+ "<title>Cell 2: Square->squares</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M48.05,-109.17C48.05,-101.79 48.05,-92.76 48.05,-83.63\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"51.55,-83.37 48.05,-73.37 44.55,-83.37 51.55,-83.37\"/>\n",
+ "</g>\n",
+ "<!-- cubes -->\n",
+ "<g id=\"node6\" class=\"node\">\n",
+ "<title>cubes</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"157.05\" cy=\"-36.63\" rx=\"30.49\" ry=\"30.49\"/>\n",
+ "<text text-anchor=\"middle\" x=\"157.05\" y=\"-32.43\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">cubes</text>\n",
"</g>\n",
- "<!-- leaf6690 -->\n",
- "<!-- Cube->leaf6690 -->\n",
+ "<!-- Cell 2: Cube->cubes -->\n",
"<g id=\"edge5\" class=\"edge\">\n",
- "<title>Cube->leaf6690</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M137.06,-86C137.73,-74.69 138.63,-59.58 139.41,-46.57\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"142.92,-46.45 140.02,-36.26 135.93,-46.04 142.92,-46.45\"/>\n",
- "<text text-anchor=\"middle\" x=\"154.98\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">cubes</text>\n",
+ "<title>Cell 2: Cube->cubes</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M157.05,-109.17C157.05,-100 157.05,-88.3 157.05,-77.03\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"160.55,-76.88 157.05,-66.88 153.55,-76.88 160.55,-76.88\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
@@ -142,11 +151,11 @@
"output_type": "stream",
"text": [
"Requirement already satisfied: matplotlib in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (3.1.1)\n",
- "Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.4.2)\n",
"Requirement already satisfied: cycler>=0.10 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (0.10.0)\n",
+ "Requirement already satisfied: kiwisolver>=1.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.1.0)\n",
+ "Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.4.2)\n",
"Requirement already satisfied: python-dateutil>=2.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.8.0)\n",
"Requirement already satisfied: numpy>=1.11 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.17.3)\n",
- "Requirement already satisfied: kiwisolver>=1.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.1.0)\n",
"Requirement already satisfied: six in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from cycler>=0.10->matplotlib) (1.12.0)\n",
"Requirement already satisfied: setuptools in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from kiwisolver>=1.0.1->matplotlib) (41.6.0)\n"
]
@@ -231,93 +240,124 @@
"<!-- Generated by graphviz version 2.43.0 (0)\n",
" -->\n",
"<!-- Title: G Pages: 1 -->\n",
- "<svg width=\"295pt\" height=\"339pt\"\n",
- " viewBox=\"0.00 0.00 295.12 339.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
- "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 335)\">\n",
+ "<svg width=\"308pt\" height=\"587pt\"\n",
+ " viewBox=\"0.00 0.00 307.55 587.46\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+ "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 583.46)\">\n",
"<title>G</title>\n",
- "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-335 291.12,-335 291.12,4 -4,4\"/>\n",
- "<!-- Create -->\n",
+ "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-583.46 303.55,-583.46 303.55,4 -4,4\"/>\n",
+ "<!-- Cell 2: Create -->\n",
"<g id=\"node1\" class=\"node\">\n",
- "<title>Create</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"153.17\" cy=\"-313\" rx=\"33.37\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"153.17\" y=\"-308.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Create</text>\n",
+ "<title>Cell 2: Create</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"210.36,-579.46 116.85,-579.46 116.85,-543.46 210.36,-543.46 210.36,-579.46\"/>\n",
+ "<text text-anchor=\"middle\" x=\"163.61\" y=\"-557.26\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Create</text>\n",
"</g>\n",
- "<!-- diverge6871 -->\n",
+ "<!-- init_pcoll -->\n",
"<g id=\"node2\" class=\"node\">\n",
- "<title>diverge6871</title>\n",
- "<ellipse fill=\"blue\" stroke=\"blue\" cx=\"153.17\" cy=\"-245\" rx=\"0\" ry=\"0\"/>\n",
+ "<title>init_pcoll</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"163.61\" cy=\"-463.46\" rx=\"44.01\" ry=\"44.01\"/>\n",
+ "<text text-anchor=\"middle\" x=\"163.61\" y=\"-459.26\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">init_pcoll</text>\n",
"</g>\n",
- "<!-- Create->diverge6871 -->\n",
+ "<!-- Cell 2: Create->init_pcoll -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
- "<title>Create->diverge6871</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M153.17,-294.66C153.17,-275.72 153.17,-247.81 153.17,-246.08\"/>\n",
- "<text text-anchor=\"middle\" x=\"180\" y=\"-265.8\" font-family=\"Times,serif\" font-size=\"14.00\">init_pcoll</text>\n",
+ "<title>Cell 2: Create->init_pcoll</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M163.61,-543.3C163.61,-536.01 163.61,-527.08 163.61,-517.88\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"167.11,-517.85 163.61,-507.85 160.11,-517.85 167.11,-517.85\"/>\n",
"</g>\n",
- "<!-- Square -->\n",
+ "<!-- Cell 2: Square -->\n",
"<g id=\"node3\" class=\"node\">\n",
- "<title>Square</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"113.17\" cy=\"-190\" rx=\"34.83\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"113.17\" y=\"-185.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Square</text>\n",
+ "<title>Cell 2: Square</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"157.7,-383.45 61.51,-383.45 61.51,-347.45 157.7,-347.45 157.7,-383.45\"/>\n",
+ "<text text-anchor=\"middle\" x=\"109.61\" y=\"-361.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Square</text>\n",
"</g>\n",
- "<!-- diverge6871->Square -->\n",
+ "<!-- init_pcoll->Cell 2: Square -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
- "<title>diverge6871->Square</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M153.06,-243.85C151.84,-242.24 141.2,-228.14 131.22,-214.91\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"133.99,-212.78 125.17,-206.9 128.4,-216.99 133.99,-212.78\"/>\n",
+ "<title>init_pcoll->Cell 2: Square</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M142.42,-424.79C136.33,-413.96 129.82,-402.4 124.23,-392.44\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"127.13,-390.47 119.18,-383.47 121.03,-393.9 127.13,-390.47\"/>\n",
"</g>\n",
- "<!-- Cube -->\n",
- "<g id=\"node4\" class=\"node\">\n",
- "<title>Cube</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"202.17\" cy=\"-190\" rx=\"28.55\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"202.17\" y=\"-185.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cube</text>\n",
+ "<!-- Cell 2: Cube -->\n",
+ "<g id=\"node5\" class=\"node\">\n",
+ "<title>Cell 2: Cube</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"267.88,-383.45 181.33,-383.45 181.33,-347.45 267.88,-347.45 267.88,-383.45\"/>\n",
+ "<text text-anchor=\"middle\" x=\"224.61\" y=\"-361.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Cube</text>\n",
"</g>\n",
- "<!-- diverge6871->Cube -->\n",
+ "<!-- init_pcoll->Cell 2: Cube -->\n",
"<g id=\"edge3\" class=\"edge\">\n",
- "<title>diverge6871->Cube</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M153.31,-243.85C154.84,-242.19 168.59,-227.33 181.11,-213.78\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"184.02,-215.79 188.24,-206.07 178.88,-211.04 184.02,-215.79\"/>\n",
+ "<title>init_pcoll->Cell 2: Cube</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M186.84,-425.89C193.98,-414.66 201.68,-402.54 208.25,-392.2\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"211.28,-393.95 213.69,-383.63 205.38,-390.19 211.28,-393.95\"/>\n",
"</g>\n",
- "<!-- Average Square -->\n",
- "<g id=\"node5\" class=\"node\">\n",
- "<title>Average Square</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"66.17\" cy=\"-104\" rx=\"66.35\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"66.17\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Average Square</text>\n",
+ "<!-- squares -->\n",
+ "<g id=\"node4\" class=\"node\">\n",
+ "<title>squares</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"97.61\" cy=\"-274.82\" rx=\"36.76\" ry=\"36.76\"/>\n",
+ "<text text-anchor=\"middle\" x=\"97.61\" y=\"-270.62\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">squares</text>\n",
"</g>\n",
- "<!-- Square->Average Square -->\n",
+ "<!-- Cell 2: Square->squares -->\n",
"<g id=\"edge4\" class=\"edge\">\n",
- "<title>Square->Average Square</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M103.89,-172.4C97.24,-160.52 88.15,-144.28 80.56,-130.72\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"83.54,-128.87 75.6,-121.85 77.43,-132.28 83.54,-128.87\"/>\n",
- "<text text-anchor=\"middle\" x=\"114.17\" y=\"-142.8\" font-family=\"Times,serif\" font-size=\"14.00\">squares</text>\n",
+ "<title>Cell 2: Square->squares</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M107.29,-347.36C106.27,-339.83 105.02,-330.58 103.76,-321.26\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"107.21,-320.64 102.4,-311.2 100.27,-321.58 107.21,-320.64\"/>\n",
"</g>\n",
- "<!-- Average Cube -->\n",
+ "<!-- Cell 5: Average Square -->\n",
"<g id=\"node7\" class=\"node\">\n",
- "<title>Average Cube</title>\n",
- "<ellipse fill=\"none\" stroke=\"blue\" cx=\"210.17\" cy=\"-104\" rx=\"60.07\" ry=\"18\"/>\n",
- "<text text-anchor=\"middle\" x=\"210.17\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Average Cube</text>\n",
+ "<title>Cell 5: Average Square</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"145.32,-202.2 -0.11,-202.2 -0.11,-166.2 145.32,-166.2 145.32,-202.2\"/>\n",
+ "<text text-anchor=\"middle\" x=\"72.61\" y=\"-180\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 5: Average Square</text>\n",
"</g>\n",
- "<!-- Cube->Average Cube -->\n",
+ "<!-- squares->Cell 5: Average Square -->\n",
"<g id=\"edge5\" class=\"edge\">\n",
- "<title>Cube->Average Cube</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M203.79,-172C204.87,-160.69 206.31,-145.58 207.55,-132.57\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"211.07,-132.55 208.53,-122.26 204.1,-131.88 211.07,-132.55\"/>\n",
- "<text text-anchor=\"middle\" x=\"223.11\" y=\"-142.8\" font-family=\"Times,serif\" font-size=\"14.00\">cubes</text>\n",
+ "<title>squares->Cell 5: Average Square</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M87.87,-239.3C85.33,-230.32 82.64,-220.77 80.23,-212.23\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"83.55,-211.11 77.47,-202.43 76.81,-213.01 83.55,-211.11\"/>\n",
"</g>\n",
- "<!-- leaf1333 -->\n",
- "<!-- Average Square->leaf1333 -->\n",
+ "<!-- cubes -->\n",
+ "<g id=\"node6\" class=\"node\">\n",
+ "<title>cubes</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"227.61\" cy=\"-274.82\" rx=\"30.49\" ry=\"30.49\"/>\n",
+ "<text text-anchor=\"middle\" x=\"227.61\" y=\"-270.62\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">cubes</text>\n",
+ "</g>\n",
+ "<!-- Cell 2: Cube->cubes -->\n",
"<g id=\"edge6\" class=\"edge\">\n",
- "<title>Average Square->leaf1333</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M66.17,-85.6C66.17,-74.26 66.17,-59.23 66.17,-46.32\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"69.67,-46.1 66.17,-36.1 62.67,-46.1 69.67,-46.1\"/>\n",
- "<text text-anchor=\"middle\" x=\"109.7\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">average_square</text>\n",
+ "<title>Cell 2: Cube->cubes</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M225.18,-347.36C225.5,-338.2 225.89,-326.5 226.27,-315.22\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"229.78,-315.19 226.62,-305.07 222.78,-314.95 229.78,-315.19\"/>\n",
+ "</g>\n",
+ "<!-- Cell 5: Average Cube -->\n",
+ "<g id=\"node9\" class=\"node\">\n",
+ "<title>Cell 5: Average Cube</title>\n",
+ "<polygon fill=\"none\" stroke=\"blue\" points=\"299.5,-202.2 163.72,-202.2 163.72,-166.2 299.5,-166.2 299.5,-202.2\"/>\n",
+ "<text text-anchor=\"middle\" x=\"231.61\" y=\"-180\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 5: Average Cube</text>\n",
"</g>\n",
- "<!-- leaf6554 -->\n",
- "<!-- Average Cube->leaf6554 -->\n",
+ "<!-- cubes->Cell 5: Average Cube -->\n",
"<g id=\"edge7\" class=\"edge\">\n",
- "<title>Average Cube->leaf6554</title>\n",
- "<path fill=\"none\" stroke=\"black\" d=\"M210.17,-85.6C210.17,-74.26 210.17,-59.23 210.17,-46.32\"/>\n",
- "<polygon fill=\"black\" stroke=\"black\" points=\"213.67,-46.1 210.17,-36.1 206.67,-46.1 213.67,-46.1\"/>\n",
- "<text text-anchor=\"middle\" x=\"248.65\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">average_cube</text>\n",
+ "<title>cubes->Cell 5: Average Cube</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M228.94,-244.31C229.4,-234.11 229.91,-222.72 230.37,-212.68\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"233.87,-212.59 230.83,-202.44 226.88,-212.27 233.87,-212.59\"/>\n",
+ "</g>\n",
+ "<!-- average_square -->\n",
+ "<g id=\"node8\" class=\"node\">\n",
+ "<title>average_square</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"72.61\" cy=\"-65.1\" rx=\"65.2\" ry=\"65.2\"/>\n",
+ "<text text-anchor=\"middle\" x=\"72.61\" y=\"-60.9\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">average_square</text>\n",
+ "</g>\n",
+ "<!-- Cell 5: Average Square->average_square -->\n",
+ "<g id=\"edge8\" class=\"edge\">\n",
+ "<title>Cell 5: Average Square->average_square</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M72.61,-166.17C72.61,-158.97 72.61,-150.04 72.61,-140.5\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"76.11,-140.39 72.61,-130.39 69.11,-140.39 76.11,-140.39\"/>\n",
+ "</g>\n",
+ "<!-- average_cube -->\n",
+ "<g id=\"node10\" class=\"node\">\n",
+ "<title>average_cube</title>\n",
+ "<ellipse fill=\"none\" stroke=\"blue\" cx=\"231.61\" cy=\"-65.1\" rx=\"58.92\" ry=\"58.92\"/>\n",
+ "<text text-anchor=\"middle\" x=\"231.61\" y=\"-60.9\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">average_cube</text>\n",
+ "</g>\n",
+ "<!-- Cell 5: Average Cube->average_cube -->\n",
+ "<g id=\"edge9\" class=\"edge\">\n",
+ "<title>Cell 5: Average Cube->average_cube</title>\n",
+ "<path fill=\"none\" stroke=\"black\" d=\"M231.61,-166.17C231.61,-157.35 231.61,-145.94 231.61,-133.99\"/>\n",
+ "<polygon fill=\"black\" stroke=\"black\" points=\"235.11,-133.88 231.61,-123.88 228.11,-133.88 235.11,-133.88\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index d0cd661..39d1445 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -31,14 +31,14 @@ import sys
import apache_beam as beam
from apache_beam.runners import runner
-from apache_beam.runners.utils import is_interactive
+from apache_beam.utils.interactive_utils import is_in_ipython
+from apache_beam.utils.interactive_utils import is_in_notebook
# Interactive Beam user flow is data-centric rather than pipeline-centric, so
# there is only one global interactive environment instance that manages
# implementation that enables interactivity.
_interactive_beam_env = None
-
_LOGGER = logging.getLogger(__name__)
@@ -84,6 +84,7 @@ class InteractiveEnvironment(object):
# InteractiveRunner is responsible for populating this dictionary
# implicitly.
self._pipeline_results = {}
+ self._tracked_user_pipelines = set()
# Always watch __main__ module.
self.watch('__main__')
# Do a warning level logging if current python version is below 3.6.
@@ -106,7 +107,8 @@ class InteractiveEnvironment(object):
'install apache-beam[interactive]` to install necessary '
'dependencies to enable all data visualization features.')
- self._is_in_ipython, self._is_in_notebook = is_interactive()
+ self._is_in_ipython = is_in_ipython()
+ self._is_in_notebook = is_in_notebook()
if not self._is_in_ipython:
_LOGGER.warning('You cannot use Interactive Beam features when you are '
'not in an interactive environment such as a Jupyter '
@@ -221,3 +223,33 @@ class InteractiveEnvironment(object):
if result:
return runner.PipelineState.is_terminal(result.state)
return True
+
+ def track_user_pipelines(self):
+ """Record references to all user-defined pipeline instances watched in
+ current environment.
+
+ Current static global singleton interactive environment holds references to
+ a set of pipeline instances defined by the user in the watched scope.
+ Interactive Beam features could use the references to determine if a given
+ pipeline is defined by user or implicitly created by Beam SDK or runners,
+ then handle them differently.
+
+ This is invoked every time a PTransform is to be applied if the current
+ code execution is under ipython due to the possibility that any user-defined
+ pipeline can be re-evaluated through notebook cell re-execution at any time.
+
+ Each time this is invoked, the tracked user pipelines are refreshed to
+ remove any pipeline instances that are no longer in watched scope. For
+ example, after a notebook cell re-execution re-evaluating a pipeline
+ creation, the last pipeline reference created by last evaluation will not be
+ in watched scope anymore.
+ """
+ self._tracked_user_pipelines = set()
+ for watching in self.watching():
+ for _, val in watching:
+ if isinstance(val, beam.pipeline.Pipeline):
+ self._tracked_user_pipelines.add(val)
+
+ @property
+ def tracked_user_pipelines(self):
+ return self._tracked_user_pipelines
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index cd06b42..b9255e4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -54,17 +54,10 @@ class PipelineInstrument(object):
def __init__(self, pipeline, options=None):
self._pipeline = pipeline
- # The cache manager should be initiated outside of this module and outside
- # of run_pipeline() from interactive runner so that its lifespan could cover
- # multiple runs in the interactive environment. Owned by
- # interactive_environment module. Not owned by this module.
- # TODO(BEAM-7760): change the scope of cache to be owned by runner or
- # pipeline result instances because a pipeline is not 1:1 correlated to a
- # running job. Only complete and read-only cache is valid across multiple
- # jobs. Other cache instances should have their own scopes. Some design
- # change should support only runner.run(pipeline) pattern rather than
- # pipeline.run([runner]) and a runner can only run at most one pipeline at a
- # time. Otherwise, result returned by run() is the only 1:1 anchor.
+ # The global cache manager is lazily initiated outside of this module by any
+ # interactive runner so that its lifespan could cover multiple runs in
+ # the interactive environment. Owned by interactive_environment module. Not
+ # owned by this module.
self._cache_manager = ie.current_env().cache_manager()
# Invoke a round trip through the runner API. This makes sure the Pipeline
@@ -444,12 +437,15 @@ class PipelineInstrument(object):
cache. It doesn't mean that there would definitely be such cache already.
Also, the pcoll can come from the original user defined pipeline object or
an equivalent pcoll from a transformed copy of the original pipeline.
+
+ 'pcoll_id' of cacheable is not stable for cache_key, thus not included in
+ cache key. A combination of 'var', 'version' and 'producer_version' is
+ sufficient to identify a cached PCollection.
"""
cacheable = self.cacheables.get(self._cacheable_key(pcoll), None)
if cacheable:
return '_'.join((cacheable['var'],
cacheable['version'],
- cacheable['pcoll_id'],
cacheable['producer_version']))
return ''
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index c45b8e3..09b646e 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -31,6 +31,8 @@ from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_instrument as instr
from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_equal
+from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_proto_equal
# Work around nose tests using Python2 without unittest.mock module.
try:
@@ -44,47 +46,6 @@ class PipelineInstrumentTest(unittest.TestCase):
def setUp(self):
ie.new_env(cache_manager=cache.FileBasedCacheManager())
- def assertPipelineProtoEqual(self, actual_pipeline_proto,
- expected_pipeline_proto):
- components1 = actual_pipeline_proto.components
- components2 = expected_pipeline_proto.components
- self.assertEqual(len(components1.transforms), len(components2.transforms))
- self.assertEqual(len(components1.pcollections),
- len(components2.pcollections))
-
- # GreatEqual instead of Equal because the pipeline_proto_to_execute could
- # include more windowing_stratagies and coders than necessary.
- self.assertGreaterEqual(len(components1.windowing_strategies),
- len(components2.windowing_strategies))
- self.assertGreaterEqual(len(components1.coders), len(components2.coders))
- self.assertTransformEqual(actual_pipeline_proto,
- actual_pipeline_proto.root_transform_ids[0],
- expected_pipeline_proto,
- expected_pipeline_proto.root_transform_ids[0])
-
- def assertPipelineEqual(self, actual_pipeline, expected_pipeline):
- actual_pipeline_proto = actual_pipeline.to_runner_api(use_fake_coders=True)
- expected_pipeline_proto = expected_pipeline.to_runner_api(
- use_fake_coders=True)
- self.assertPipelineProtoEqual(actual_pipeline_proto,
- expected_pipeline_proto)
-
- def assertTransformEqual(self, actual_pipeline_proto, actual_transform_id,
- expected_pipeline_proto, expected_transform_id):
- transform_proto1 = actual_pipeline_proto.components.transforms[
- actual_transform_id]
- transform_proto2 = expected_pipeline_proto.components.transforms[
- expected_transform_id]
- self.assertEqual(transform_proto1.spec.urn, transform_proto2.spec.urn)
- # Skipping payload checking because PTransforms of the same functionality
- # could generate different payloads.
- self.assertEqual(len(transform_proto1.subtransforms),
- len(transform_proto2.subtransforms))
- self.assertSetEqual(set(transform_proto1.inputs),
- set(transform_proto2.inputs))
- self.assertSetEqual(set(transform_proto1.outputs),
- set(transform_proto2.outputs))
-
def test_pcolls_to_pcoll_id(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
# pylint: disable=range-builtin-not-iterating
@@ -137,14 +98,11 @@ class PipelineInstrumentTest(unittest.TestCase):
pin = instr.pin(p)
self.assertEqual(pin.cache_key(init_pcoll), 'init_pcoll_' + str(
- id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
- init_pcoll.producer)))
+ id(init_pcoll)) + '_' + str(id(init_pcoll.producer)))
self.assertEqual(pin.cache_key(squares), 'squares_' + str(
- id(squares)) + '_ref_PCollection_PCollection_11_' + str(id(
- squares.producer)))
+ id(squares)) + '_' + str(id(squares.producer)))
self.assertEqual(pin.cache_key(cubes), 'cubes_' + str(
- id(cubes)) + '_ref_PCollection_PCollection_12_' + str(id(
- cubes.producer)))
+ id(cubes)) + '_' + str(id(cubes.producer)))
def test_cacheables(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -224,7 +182,7 @@ class PipelineInstrumentTest(unittest.TestCase):
expected_pipeline = p.to_runner_api(return_context=False,
use_fake_coders=True)
- self.assertPipelineProtoEqual(actual_pipeline, expected_pipeline)
+ assert_pipeline_proto_equal(self, expected_pipeline, actual_pipeline)
def _example_pipeline(self, watch=True):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -275,7 +233,7 @@ class PipelineInstrumentTest(unittest.TestCase):
('_WriteCache_' + second_pcoll_cache_key) >> cache.WriteCache(
ie.current_env().cache_manager(), second_pcoll_cache_key))
# The 2 pipelines should be the same now.
- self.assertPipelineEqual(p_copy, p_origin)
+ assert_pipeline_equal(self, p_copy, p_origin)
def test_instrument_example_pipeline_to_read_cache(self):
p_origin, init_pcoll, second_pcoll = self._example_pipeline()
@@ -283,12 +241,10 @@ class PipelineInstrumentTest(unittest.TestCase):
# Mock as if cacheable PCollections are cached.
init_pcoll_cache_key = 'init_pcoll_' + str(
- id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
- init_pcoll.producer))
+ id(init_pcoll)) + '_' + str(id(init_pcoll.producer))
self._mock_write_cache(init_pcoll, init_pcoll_cache_key)
second_pcoll_cache_key = 'second_pcoll_' + str(
- id(second_pcoll)) + '_ref_PCollection_PCollection_11_' + str(id(
- second_pcoll.producer))
+ id(second_pcoll)) + '_' + str(id(second_pcoll.producer))
self._mock_write_cache(second_pcoll, second_pcoll_cache_key)
ie.current_env().cache_manager().exists = MagicMock(return_value=True)
instr.pin(p_copy)
@@ -315,7 +271,7 @@ class PipelineInstrumentTest(unittest.TestCase):
v = TestReadCacheWireVisitor()
p_origin.visit(v)
- self.assertPipelineEqual(p_origin, p_copy)
+ assert_pipeline_equal(self, p_origin, p_copy)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/interactive/testing/__init__.py b/sdks/python/apache_beam/runners/interactive/testing/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py b/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
new file mode 100644
index 0000000..ee3acd8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Mocked object returned by invoking get_ipython() in an ipython environment.
+_mocked_get_ipython = None
+
+
+def mock_get_ipython():
+ """Mock an ipython environment w/o setting up real ipython kernel.
+
+ Each entering of get_ipython() invocation will have the prompt increased by
+ one. Grouping arbitrary python code into separate cells using `with` clause.
+
+ Examples::
+
+ # Usage, before each test function, append:
+ @patch('IPython.get_ipython', mock_get_ipython)
+
+ # Group lines of code into a cell:
+ with mock_get_ipython():
+ # arbitrary python code
+ # ...
+ # arbitrary python code
+
+ # Next cell with prompt increased by one:
+ with mock_get_ipython(): # Auto-incremental
+ # arbitrary python code
+ # ...
+ # arbitrary python code
+ """
+
+ class MockedGetIpython(object):
+
+ def __init__(self):
+ self._execution_count = 0
+
+ @property
+ def execution_count(self):
+ """Execution count always starts from 1 and is constant within a cell."""
+ return self._execution_count
+
+ def __enter__(self):
+ """Marks entering of a cell/prompt."""
+ self._execution_count = self._execution_count + 1
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Marks exiting of a cell/prompt."""
+ pass
+
+ global _mocked_get_ipython
+ if not _mocked_get_ipython:
+ _mocked_get_ipython = MockedGetIpython()
+ return _mocked_get_ipython
diff --git a/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
new file mode 100644
index 0000000..a7762a4
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to verify implicit cache transforms applied by Interactive Beam.
+
+For internal use only; no backwards-compatibility guarantees.
+This utility should only be used by Interactive Beam tests. For example, it can
+be used to verify if the implicit cache transforms are applied as expected when
+running a pipeline with the InteractiveRunner. It can also be used to verify if
+a pipeline fragment has pruned unnecessary transforms. It shouldn't be used to
+verify equivalence between pipelines if the code to be tested depends on or
+mutates user code within transforms in pipelines.
+"""
+
+
+def assert_pipeline_equal(test_case, expected_pipeline, actual_pipeline):
+ """Asserts the equivalence between two given apache_beam.Pipeline instances.
+
+ Args:
+ test_case: (unittest.TestCase) the unittest testcase where it asserts.
+ expected_pipeline: (Pipeline) the pipeline instance expected.
+ actual_pipeline: (Pipeline) the actual pipeline instance to be asserted.
+ """
+ expected_pipeline_proto = expected_pipeline.to_runner_api(
+ use_fake_coders=True)
+ actual_pipeline_proto = actual_pipeline.to_runner_api(use_fake_coders=True)
+ assert_pipeline_proto_equal(test_case, expected_pipeline_proto,
+ actual_pipeline_proto)
+
+
+def assert_pipeline_proto_equal(test_case,
+ expected_pipeline_proto,
+ actual_pipeline_proto):
+ """Asserts the equivalence between two pipeline proto representations."""
+ components1 = expected_pipeline_proto.components
+ components2 = actual_pipeline_proto.components
+ test_case.assertEqual(len(components1.transforms),
+ len(components2.transforms))
+ test_case.assertEqual(len(components1.pcollections),
+ len(components2.pcollections))
+
+ # TODO(BEAM-7926): Update tests and make below 2 assertions assertEqual.
+ test_case.assertLessEqual(len(components1.windowing_strategies),
+ len(components2.windowing_strategies))
+ test_case.assertLessEqual(len(components1.coders), len(components2.coders))
+
+ _assert_transform_equal(test_case,
+ actual_pipeline_proto,
+ actual_pipeline_proto.root_transform_ids[0],
+ expected_pipeline_proto,
+ expected_pipeline_proto.root_transform_ids[0])
+
+
+def _assert_transform_equal(test_case,
+ expected_pipeline_proto,
+ expected_transform_id,
+ actual_pipeline_proto,
+ actual_transform_id):
+ """Asserts the equivalence between transforms from two given pipelines. """
+ transform_proto1 = expected_pipeline_proto.components.transforms[
+ expected_transform_id]
+ transform_proto2 = actual_pipeline_proto.components.transforms[
+ actual_transform_id]
+ test_case.assertEqual(transform_proto1.spec.urn, transform_proto2.spec.urn)
+ # Skipping payload checking because PTransforms of the same functionality
+ # could generate different payloads.
+ test_case.assertEqual(len(transform_proto1.subtransforms),
+ len(transform_proto2.subtransforms))
+ test_case.assertSetEqual(set(transform_proto1.inputs),
+ set(transform_proto2.inputs))
+ test_case.assertSetEqual(set(transform_proto1.outputs),
+ set(transform_proto2.outputs))
diff --git a/sdks/python/apache_beam/runners/utils.py b/sdks/python/apache_beam/runners/utils.py
deleted file mode 100644
index 8952423..0000000
--- a/sdks/python/apache_beam/runners/utils.py
+++ /dev/null
@@ -1,47 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Common utility module shared by runners.
-
-For internal use only; no backwards-compatibility guarantees.
-"""
-from __future__ import absolute_import
-
-
-def is_interactive():
- """Determines if current code execution is in interactive environment.
-
- Returns:
- is_in_ipython: (bool) tells if current code is executed within an ipython
- session.
- is_in_notebook: (bool) tells if current code is executed from an ipython
- notebook.
-
- If is_in_notebook is True, then is_in_ipython must also be True.
- """
- is_in_ipython = False
- is_in_notebook = False
- # Check if the runtime is within an interactive environment, i.e., ipython.
- try:
- from IPython import get_ipython # pylint: disable=import-error
- if get_ipython():
- is_in_ipython = True
- if 'IPKernelApp' in get_ipython().config:
- is_in_notebook = True
- except ImportError:
- pass # If dependencies are not available, then not interactive for sure.
- return is_in_ipython, is_in_notebook
diff --git a/sdks/python/apache_beam/utils/interactive_utils.py b/sdks/python/apache_beam/utils/interactive_utils.py
new file mode 100644
index 0000000..ac4e5d8
--- /dev/null
+++ b/sdks/python/apache_beam/utils/interactive_utils.py
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Common interactive utility module.
+
+For experimental usage only; no backwards-compatibility guarantees.
+"""
+from __future__ import absolute_import
+
+import logging
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def is_in_ipython():
+ """Determines if current code is executed within an ipython session."""
+ is_in_ipython = False
+ # Check if the runtime is within an interactive environment, i.e., ipython.
+ try:
+ from IPython import get_ipython # pylint: disable=import-error
+ if get_ipython():
+ is_in_ipython = True
+ except ImportError:
+ pass # If dependencies are not available, then not interactive for sure.
+ return is_in_ipython
+
+
+def is_in_notebook():
+ """Determines if current code is executed from an ipython notebook.
+
+ If is_in_notebook() is True, then is_in_ipython() must also be True.
+ """
+ is_in_notebook = False
+ if is_in_ipython():
+ # The import and usage must be valid under the execution path.
+ from IPython import get_ipython
+ if 'IPKernelApp' in get_ipython().config:
+ is_in_notebook = True
+ return is_in_notebook
+
+
+def alter_label_if_ipython(transform, pvalueish):
+ """Alters the label to an interactive label with ipython prompt metadata
+ prefixed for the given transform if the given pvalueish belongs to a
+ user-defined pipeline and current code execution is within an ipython kernel.
+ Otherwise, noop.
+
+ A label is either a user-defined or auto-generated str name of a PTransform
+ that is unique within a pipeline. If current environment is_in_ipython(), Beam
+ can implicitly create interactive labels to replace labels of top-level
+ PTransforms to be applied. The label is formatted as:
+ `Cell {prompt}: {original_label}`.
+ """
+ if is_in_ipython():
+ from apache_beam.runners.interactive import interactive_environment as ie
+ # Tracks user defined pipeline instances in watched scopes so that we only
+ # alter labels for any transform to pvalueish belonging to those pipeline
+ # instances, excluding any transform to be applied in other pipeline
+ # instances the Beam SDK creates implicitly.
+ ie.current_env().track_user_pipelines()
+ from IPython import get_ipython
+ prompt = get_ipython().execution_count
+ pipeline = _extract_pipeline_of_pvalueish(pvalueish)
+ if not pipeline:
+ _LOGGER.warning('Failed to alter the label of a transform with the '
+ 'ipython prompt metadata. Cannot figure out the pipeline '
+ 'that the given pvalueish %s belongs to. Thus noop.'
+ % pvalueish)
+ if (pipeline
+ # We only alter for transforms to be applied to user-defined pipelines
+ # at pipeline construction time.
+ and pipeline in ie.current_env().tracked_user_pipelines):
+ transform.label = 'Cell {}: {}'.format(prompt, transform.label)
+
+
+def _extract_pipeline_of_pvalueish(pvalueish):
+ """Extracts the pipeline that the given pvalueish belongs to."""
+ if isinstance(pvalueish, tuple):
+ pvalue = pvalueish[0]
+ elif isinstance(pvalueish, dict):
+ pvalue = next(iter(pvalueish.values()))
+ else:
+ pvalue = pvalueish
+ if hasattr(pvalue, 'pipeline'):
+ return pvalue.pipeline
+ return None