You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/12/10 00:46:18 UTC

[beam] branch master updated: [BEAM-7926] Data-centric Interactive Part1

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b26000a  [BEAM-7926] Data-centric Interactive Part1
     new 024c385  Merge pull request #10276 from KevinGG/BEAM-7926-part1
b26000a is described below

commit b26000a0b76590ebe4a7299ae0680ed421fa2b34
Author: Ning Kang <ni...@ningk-macbookpro.roam.corp.google.com>
AuthorDate: Tue Dec 3 14:26:30 2019 -0800

    [BEAM-7926] Data-centric Interactive Part1
    
    1. Changed Beam Python SDK’s pipeline transform apply() logic to
    implicitly alter the label of top-level transform to be applied with
    ipython-prompt metadata when the code execution is in ipython and during
    user-defined pipeline construction time.
    2. The users can observe what pieces of code are in-effect by looking at
    the labels of applied PTransforms. Thus they can avoid broken in-memory
    states due to notebook cell-re-execution/cell-deletion/hidden-states.
    3. Allow duplicate PTransform labels in different cells when the user
    constructs a pipeline under an ipython/notebook environment because the
    label altering de-duplicates the labels.
    4. Created common testing package and modules: Created a mock ipython
    kernel for testing; Made pipeline assertion a common testing module.
    5. Updated tests for ipython related scenarios in a mocked ipython
    environment.
    6. Updated the example notebook.
---
 sdks/python/apache_beam/pipeline.py                |   7 +
 .../runners/dataflow/dataflow_runner.py            |   5 +-
 .../interactive/display/pcoll_visualization.py     |   9 +-
 .../interactive/display/pipeline_graph_test.py     |  47 ++++
 .../examples/Interactive Beam Example.ipynb        | 256 ++++++++++++---------
 .../runners/interactive/interactive_environment.py |  38 ++-
 .../runners/interactive/pipeline_instrument.py     |  20 +-
 .../interactive/pipeline_instrument_test.py        |  64 +-----
 .../runners/interactive/testing/__init__.py        |  16 ++
 .../runners/interactive/testing/mock_ipython.py    |  67 ++++++
 .../interactive/testing/pipeline_assertion.py      |  86 +++++++
 sdks/python/apache_beam/runners/utils.py           |  47 ----
 sdks/python/apache_beam/utils/interactive_utils.py | 100 ++++++++
 13 files changed, 532 insertions(+), 230 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 61673fe..bc52e72 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -74,6 +74,7 @@ from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import typehints
 from apache_beam.utils.annotations import deprecated
+from apache_beam.utils.interactive_utils import alter_label_if_ipython
 
 __all__ = ['Pipeline', 'PTransformOverride']
 
@@ -491,6 +492,12 @@ class Pipeline(object):
       finally:
         transform.label = old_label
 
+    # Attempts to alter the label of the transform to be applied only when it's
+    # a top-level transform so that the cell number will not be prepended to
+    # every child transform in a composite.
+    if self._current_transform() is self._root_transform():
+      alter_label_if_ipython(transform, pvalueish)
+
     full_label = '/'.join([self._current_transform().full_label,
                            label or transform.label]).lstrip('/')
     if full_label in self.applied_labels:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1fae45d..75aedd1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -58,11 +58,11 @@ from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
-from apache_beam.runners.utils import is_interactive
 from apache_beam.transforms import window
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils import proto_utils
+from apache_beam.utils.interactive_utils import is_in_notebook
 from apache_beam.utils.plugin import BeamPlugin
 
 try:                    # Python 3
@@ -376,8 +376,7 @@ class DataflowRunner(PipelineRunner):
   def run_pipeline(self, pipeline, options):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Label goog-dataflow-notebook if job is started from notebook.
-    _, is_in_notebook = is_interactive()
-    if is_in_notebook:
+    if is_in_notebook():
       notebook_version = ('goog-dataflow-notebook=' +
                           beam.version.__version__.replace('.', '_'))
       if options.view_as(GoogleCloudOptions).labels:
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 1f0e925..bdd34ca 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -74,11 +74,14 @@ _OVERVIEW_HTML_TEMPLATE = """
             </script>"""
 _DATAFRAME_PAGINATION_TEMPLATE = """
             <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.2/jquery.min.js"></script>
-            <script src="https://cdn.datatables.net/1.10.16/js/jquery.dataTables.js"></script>
-            <link rel="stylesheet" href="https://cdn.datatables.net/1.10.16/css/jquery.dataTables.css">
+            <script src="https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js"></script>
+            <link rel="stylesheet" href="https://cdn.datatables.net/1.10.20/css/jquery.dataTables.min.css">
             {dataframe_html}
             <script>
-              $("#{table_id}").DataTable();
+              $(document).ready(
+                function() {{
+                  $("#{table_id}").DataTable();
+                }});
             </script>"""
 
 
diff --git a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py
index 12cc3ef..e73dbd6 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph_test.py
@@ -25,6 +25,14 @@ from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
 from apache_beam.runners.interactive.display import pipeline_graph
+from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+  from unittest.mock import patch
+except ImportError:
+  from mock import patch
 
 # pylint: disable=range-builtin-not-iterating,unused-variable,possibly-unused-variable
 # Reason:
@@ -83,6 +91,45 @@ class PipelineGraphTest(unittest.TestCase):
          '}\n'),
         pipeline_graph.PipelineGraph(p).get_dot())
 
+  @patch('IPython.get_ipython', mock_get_ipython)
+  def test_get_dot_within_notebook(self):
+    # Assume a mocked ipython kernel and notebook frontend have been set up.
+    ie.current_env()._is_in_ipython = True
+    ie.current_env()._is_in_notebook = True
+    with mock_get_ipython():  # Cell 1
+      p = beam.Pipeline(ir.InteractiveRunner())
+      # Immediately track this local pipeline so that ipython prompts when
+      # applying transforms will be tracked and used for labels.
+      ib.watch(locals())
+
+    with mock_get_ipython():  # Cell 2
+      init_pcoll = p | 'Init' >> beam.Create(range(10))
+
+    with mock_get_ipython():  # Cell 3
+      squares = init_pcoll | 'Square' >> beam.Map(lambda x: x * x)
+
+    with mock_get_ipython():  # Cell 4
+      cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x ** 3)
+
+    # Tracks all PCollections defined so far.
+    ib.watch(locals())
+    self.assertEqual(
+        ('digraph G {\n'
+         'node [color=blue, fontcolor=blue, shape=box];\n'
+         '"Cell 2: Init";\n'
+         'init_pcoll [shape=circle];\n'
+         '"Cell 3: Square";\n'
+         'squares [shape=circle];\n'
+         '"Cell 4: Cube";\n'
+         'cubes [shape=circle];\n'
+         '"Cell 2: Init" -> init_pcoll;\n'
+         'init_pcoll -> "Cell 3: Square";\n'
+         'init_pcoll -> "Cell 4: Cube";\n'
+         '"Cell 3: Square" -> squares;\n'
+         '"Cell 4: Cube" -> cubes;\n'
+         '}\n'),
+        pipeline_graph.PipelineGraph(p).get_dot())
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb b/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb
index a4486ee..b461a26 100644
--- a/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb	
+++ b/sdks/python/apache_beam/runners/interactive/examples/Interactive Beam Example.ipynb	
@@ -50,67 +50,76 @@
        "<!-- Generated by graphviz version 2.43.0 (0)\n",
        " -->\n",
        "<!-- Title: G Pages: 1 -->\n",
-       "<svg width=\"190pt\" height=\"253pt\"\n",
-       " viewBox=\"0.00 0.00 190.09 253.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
-       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 249)\">\n",
+       "<svg width=\"208pt\" height=\"349pt\"\n",
+       " viewBox=\"0.00 0.00 208.43 349.26\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 345.26)\">\n",
        "<title>G</title>\n",
-       "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-249 186.09,-249 186.09,4 -4,4\"/>\n",
-       "<!-- Create -->\n",
+       "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-345.26 204.43,-345.26 204.43,4 -4,4\"/>\n",
+       "<!-- Cell 2: Create -->\n",
        "<g id=\"node1\" class=\"node\">\n",
-       "<title>Create</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"86.05\" cy=\"-227\" rx=\"33.37\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"86.05\" y=\"-222.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Create</text>\n",
+       "<title>Cell 2: Create</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"148.8,-341.26 55.3,-341.26 55.3,-305.26 148.8,-305.26 148.8,-341.26\"/>\n",
+       "<text text-anchor=\"middle\" x=\"102.05\" y=\"-319.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Create</text>\n",
        "</g>\n",
-       "<!-- diverge6871 -->\n",
+       "<!-- init_pcoll -->\n",
        "<g id=\"node2\" class=\"node\">\n",
-       "<title>diverge6871</title>\n",
-       "<ellipse fill=\"blue\" stroke=\"blue\" cx=\"86.05\" cy=\"-159\" rx=\"0\" ry=\"0\"/>\n",
+       "<title>init_pcoll</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"102.05\" cy=\"-225.26\" rx=\"44.01\" ry=\"44.01\"/>\n",
+       "<text text-anchor=\"middle\" x=\"102.05\" y=\"-221.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">init_pcoll</text>\n",
        "</g>\n",
-       "<!-- Create&#45;&gt;diverge6871 -->\n",
+       "<!-- Cell 2: Create&#45;&gt;init_pcoll -->\n",
        "<g id=\"edge1\" class=\"edge\">\n",
-       "<title>Create&#45;&gt;diverge6871</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M86.05,-208.66C86.05,-189.72 86.05,-161.81 86.05,-160.08\"/>\n",
-       "<text text-anchor=\"middle\" x=\"112.88\" y=\"-179.8\" font-family=\"Times,serif\" font-size=\"14.00\">init_pcoll</text>\n",
+       "<title>Cell 2: Create&#45;&gt;init_pcoll</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M102.05,-305.1C102.05,-297.81 102.05,-288.88 102.05,-279.68\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"105.55,-279.65 102.05,-269.65 98.55,-279.65 105.55,-279.65\"/>\n",
        "</g>\n",
-       "<!-- Square -->\n",
+       "<!-- Cell 2: Square -->\n",
        "<g id=\"node3\" class=\"node\">\n",
-       "<title>Square</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"46.05\" cy=\"-104\" rx=\"34.83\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"46.05\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Square</text>\n",
+       "<title>Cell 2: Square</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"96.15,-145.26 -0.05,-145.26 -0.05,-109.26 96.15,-109.26 96.15,-145.26\"/>\n",
+       "<text text-anchor=\"middle\" x=\"48.05\" y=\"-123.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Square</text>\n",
        "</g>\n",
-       "<!-- diverge6871&#45;&gt;Square -->\n",
+       "<!-- init_pcoll&#45;&gt;Cell 2: Square -->\n",
        "<g id=\"edge2\" class=\"edge\">\n",
-       "<title>diverge6871&#45;&gt;Square</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M85.93,-157.85C84.72,-156.24 74.07,-142.14 64.09,-128.91\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"66.87,-126.78 58.05,-120.9 61.28,-130.99 66.87,-126.78\"/>\n",
+       "<title>init_pcoll&#45;&gt;Cell 2: Square</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M80.86,-186.59C74.77,-175.77 68.27,-164.2 62.67,-154.25\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"65.57,-152.28 57.62,-145.28 59.47,-155.71 65.57,-152.28\"/>\n",
        "</g>\n",
-       "<!-- Cube -->\n",
+       "<!-- Cell 2: Cube -->\n",
        "<g id=\"node5\" class=\"node\">\n",
-       "<title>Cube</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"136.05\" cy=\"-104\" rx=\"28.55\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"136.05\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cube</text>\n",
+       "<title>Cell 2: Cube</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"200.32,-145.26 113.78,-145.26 113.78,-109.26 200.32,-109.26 200.32,-145.26\"/>\n",
+       "<text text-anchor=\"middle\" x=\"157.05\" y=\"-123.06\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Cube</text>\n",
        "</g>\n",
-       "<!-- diverge6871&#45;&gt;Cube -->\n",
+       "<!-- init_pcoll&#45;&gt;Cell 2: Cube -->\n",
        "<g id=\"edge3\" class=\"edge\">\n",
-       "<title>diverge6871&#45;&gt;Cube</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M86.19,-157.85C87.77,-156.17 102.21,-140.86 115.15,-127.15\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"117.77,-129.47 122.08,-119.8 112.68,-124.67 117.77,-129.47\"/>\n",
+       "<title>init_pcoll&#45;&gt;Cell 2: Cube</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M123.63,-186.59C129.83,-175.77 136.46,-164.2 142.16,-154.25\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"145.36,-155.69 147.3,-145.28 139.29,-152.21 145.36,-155.69\"/>\n",
        "</g>\n",
-       "<!-- leaf6244 -->\n",
-       "<!-- Square&#45;&gt;leaf6244 -->\n",
+       "<!-- squares -->\n",
+       "<g id=\"node4\" class=\"node\">\n",
+       "<title>squares</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"48.05\" cy=\"-36.63\" rx=\"36.76\" ry=\"36.76\"/>\n",
+       "<text text-anchor=\"middle\" x=\"48.05\" y=\"-32.43\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">squares</text>\n",
+       "</g>\n",
+       "<!-- Cell 2: Square&#45;&gt;squares -->\n",
        "<g id=\"edge4\" class=\"edge\">\n",
-       "<title>Square&#45;&gt;leaf6244</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M45.03,-86C44.36,-74.69 43.46,-59.58 42.69,-46.57\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"46.16,-46.04 42.07,-36.26 39.17,-46.45 46.16,-46.04\"/>\n",
-       "<text text-anchor=\"middle\" x=\"65.04\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">squares</text>\n",
+       "<title>Cell 2: Square&#45;&gt;squares</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M48.05,-109.17C48.05,-101.79 48.05,-92.76 48.05,-83.63\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"51.55,-83.37 48.05,-73.37 44.55,-83.37 51.55,-83.37\"/>\n",
+       "</g>\n",
+       "<!-- cubes -->\n",
+       "<g id=\"node6\" class=\"node\">\n",
+       "<title>cubes</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"157.05\" cy=\"-36.63\" rx=\"30.49\" ry=\"30.49\"/>\n",
+       "<text text-anchor=\"middle\" x=\"157.05\" y=\"-32.43\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">cubes</text>\n",
        "</g>\n",
-       "<!-- leaf6690 -->\n",
-       "<!-- Cube&#45;&gt;leaf6690 -->\n",
+       "<!-- Cell 2: Cube&#45;&gt;cubes -->\n",
        "<g id=\"edge5\" class=\"edge\">\n",
-       "<title>Cube&#45;&gt;leaf6690</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M137.06,-86C137.73,-74.69 138.63,-59.58 139.41,-46.57\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"142.92,-46.45 140.02,-36.26 135.93,-46.04 142.92,-46.45\"/>\n",
-       "<text text-anchor=\"middle\" x=\"154.98\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">cubes</text>\n",
+       "<title>Cell 2: Cube&#45;&gt;cubes</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M157.05,-109.17C157.05,-100 157.05,-88.3 157.05,-77.03\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"160.55,-76.88 157.05,-66.88 153.55,-76.88 160.55,-76.88\"/>\n",
        "</g>\n",
        "</g>\n",
        "</svg>\n"
@@ -142,11 +151,11 @@
      "output_type": "stream",
      "text": [
       "Requirement already satisfied: matplotlib in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (3.1.1)\n",
-      "Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.4.2)\n",
       "Requirement already satisfied: cycler>=0.10 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (0.10.0)\n",
+      "Requirement already satisfied: kiwisolver>=1.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.1.0)\n",
+      "Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.4.2)\n",
       "Requirement already satisfied: python-dateutil>=2.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (2.8.0)\n",
       "Requirement already satisfied: numpy>=1.11 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.17.3)\n",
-      "Requirement already satisfied: kiwisolver>=1.0.1 in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from matplotlib) (1.1.0)\n",
       "Requirement already satisfied: six in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from cycler>=0.10->matplotlib) (1.12.0)\n",
       "Requirement already satisfied: setuptools in /Users/ningk/workspace/p3_ib_venv/lib/python3.7/site-packages (from kiwisolver>=1.0.1->matplotlib) (41.6.0)\n"
      ]
@@ -231,93 +240,124 @@
        "<!-- Generated by graphviz version 2.43.0 (0)\n",
        " -->\n",
        "<!-- Title: G Pages: 1 -->\n",
-       "<svg width=\"295pt\" height=\"339pt\"\n",
-       " viewBox=\"0.00 0.00 295.12 339.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
-       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 335)\">\n",
+       "<svg width=\"308pt\" height=\"587pt\"\n",
+       " viewBox=\"0.00 0.00 307.55 587.46\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
+       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 583.46)\">\n",
        "<title>G</title>\n",
-       "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-335 291.12,-335 291.12,4 -4,4\"/>\n",
-       "<!-- Create -->\n",
+       "<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-583.46 303.55,-583.46 303.55,4 -4,4\"/>\n",
+       "<!-- Cell 2: Create -->\n",
        "<g id=\"node1\" class=\"node\">\n",
-       "<title>Create</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"153.17\" cy=\"-313\" rx=\"33.37\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"153.17\" y=\"-308.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Create</text>\n",
+       "<title>Cell 2: Create</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"210.36,-579.46 116.85,-579.46 116.85,-543.46 210.36,-543.46 210.36,-579.46\"/>\n",
+       "<text text-anchor=\"middle\" x=\"163.61\" y=\"-557.26\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Create</text>\n",
        "</g>\n",
-       "<!-- diverge6871 -->\n",
+       "<!-- init_pcoll -->\n",
        "<g id=\"node2\" class=\"node\">\n",
-       "<title>diverge6871</title>\n",
-       "<ellipse fill=\"blue\" stroke=\"blue\" cx=\"153.17\" cy=\"-245\" rx=\"0\" ry=\"0\"/>\n",
+       "<title>init_pcoll</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"163.61\" cy=\"-463.46\" rx=\"44.01\" ry=\"44.01\"/>\n",
+       "<text text-anchor=\"middle\" x=\"163.61\" y=\"-459.26\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">init_pcoll</text>\n",
        "</g>\n",
-       "<!-- Create&#45;&gt;diverge6871 -->\n",
+       "<!-- Cell 2: Create&#45;&gt;init_pcoll -->\n",
        "<g id=\"edge1\" class=\"edge\">\n",
-       "<title>Create&#45;&gt;diverge6871</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M153.17,-294.66C153.17,-275.72 153.17,-247.81 153.17,-246.08\"/>\n",
-       "<text text-anchor=\"middle\" x=\"180\" y=\"-265.8\" font-family=\"Times,serif\" font-size=\"14.00\">init_pcoll</text>\n",
+       "<title>Cell 2: Create&#45;&gt;init_pcoll</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M163.61,-543.3C163.61,-536.01 163.61,-527.08 163.61,-517.88\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"167.11,-517.85 163.61,-507.85 160.11,-517.85 167.11,-517.85\"/>\n",
        "</g>\n",
-       "<!-- Square -->\n",
+       "<!-- Cell 2: Square -->\n",
        "<g id=\"node3\" class=\"node\">\n",
-       "<title>Square</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"113.17\" cy=\"-190\" rx=\"34.83\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"113.17\" y=\"-185.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Square</text>\n",
+       "<title>Cell 2: Square</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"157.7,-383.45 61.51,-383.45 61.51,-347.45 157.7,-347.45 157.7,-383.45\"/>\n",
+       "<text text-anchor=\"middle\" x=\"109.61\" y=\"-361.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Square</text>\n",
        "</g>\n",
-       "<!-- diverge6871&#45;&gt;Square -->\n",
+       "<!-- init_pcoll&#45;&gt;Cell 2: Square -->\n",
        "<g id=\"edge2\" class=\"edge\">\n",
-       "<title>diverge6871&#45;&gt;Square</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M153.06,-243.85C151.84,-242.24 141.2,-228.14 131.22,-214.91\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"133.99,-212.78 125.17,-206.9 128.4,-216.99 133.99,-212.78\"/>\n",
+       "<title>init_pcoll&#45;&gt;Cell 2: Square</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M142.42,-424.79C136.33,-413.96 129.82,-402.4 124.23,-392.44\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"127.13,-390.47 119.18,-383.47 121.03,-393.9 127.13,-390.47\"/>\n",
        "</g>\n",
-       "<!-- Cube -->\n",
-       "<g id=\"node4\" class=\"node\">\n",
-       "<title>Cube</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"202.17\" cy=\"-190\" rx=\"28.55\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"202.17\" y=\"-185.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cube</text>\n",
+       "<!-- Cell 2: Cube -->\n",
+       "<g id=\"node5\" class=\"node\">\n",
+       "<title>Cell 2: Cube</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"267.88,-383.45 181.33,-383.45 181.33,-347.45 267.88,-347.45 267.88,-383.45\"/>\n",
+       "<text text-anchor=\"middle\" x=\"224.61\" y=\"-361.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 2: Cube</text>\n",
        "</g>\n",
-       "<!-- diverge6871&#45;&gt;Cube -->\n",
+       "<!-- init_pcoll&#45;&gt;Cell 2: Cube -->\n",
        "<g id=\"edge3\" class=\"edge\">\n",
-       "<title>diverge6871&#45;&gt;Cube</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M153.31,-243.85C154.84,-242.19 168.59,-227.33 181.11,-213.78\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"184.02,-215.79 188.24,-206.07 178.88,-211.04 184.02,-215.79\"/>\n",
+       "<title>init_pcoll&#45;&gt;Cell 2: Cube</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M186.84,-425.89C193.98,-414.66 201.68,-402.54 208.25,-392.2\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"211.28,-393.95 213.69,-383.63 205.38,-390.19 211.28,-393.95\"/>\n",
        "</g>\n",
-       "<!-- Average Square -->\n",
-       "<g id=\"node5\" class=\"node\">\n",
-       "<title>Average Square</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"66.17\" cy=\"-104\" rx=\"66.35\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"66.17\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Average Square</text>\n",
+       "<!-- squares -->\n",
+       "<g id=\"node4\" class=\"node\">\n",
+       "<title>squares</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"97.61\" cy=\"-274.82\" rx=\"36.76\" ry=\"36.76\"/>\n",
+       "<text text-anchor=\"middle\" x=\"97.61\" y=\"-270.62\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">squares</text>\n",
        "</g>\n",
-       "<!-- Square&#45;&gt;Average Square -->\n",
+       "<!-- Cell 2: Square&#45;&gt;squares -->\n",
        "<g id=\"edge4\" class=\"edge\">\n",
-       "<title>Square&#45;&gt;Average Square</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M103.89,-172.4C97.24,-160.52 88.15,-144.28 80.56,-130.72\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"83.54,-128.87 75.6,-121.85 77.43,-132.28 83.54,-128.87\"/>\n",
-       "<text text-anchor=\"middle\" x=\"114.17\" y=\"-142.8\" font-family=\"Times,serif\" font-size=\"14.00\">squares</text>\n",
+       "<title>Cell 2: Square&#45;&gt;squares</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M107.29,-347.36C106.27,-339.83 105.02,-330.58 103.76,-321.26\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"107.21,-320.64 102.4,-311.2 100.27,-321.58 107.21,-320.64\"/>\n",
        "</g>\n",
-       "<!-- Average Cube -->\n",
+       "<!-- Cell 5: Average Square -->\n",
        "<g id=\"node7\" class=\"node\">\n",
-       "<title>Average Cube</title>\n",
-       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"210.17\" cy=\"-104\" rx=\"60.07\" ry=\"18\"/>\n",
-       "<text text-anchor=\"middle\" x=\"210.17\" y=\"-99.8\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Average Cube</text>\n",
+       "<title>Cell 5: Average Square</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"145.32,-202.2 -0.11,-202.2 -0.11,-166.2 145.32,-166.2 145.32,-202.2\"/>\n",
+       "<text text-anchor=\"middle\" x=\"72.61\" y=\"-180\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 5: Average Square</text>\n",
        "</g>\n",
-       "<!-- Cube&#45;&gt;Average Cube -->\n",
+       "<!-- squares&#45;&gt;Cell 5: Average Square -->\n",
        "<g id=\"edge5\" class=\"edge\">\n",
-       "<title>Cube&#45;&gt;Average Cube</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M203.79,-172C204.87,-160.69 206.31,-145.58 207.55,-132.57\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"211.07,-132.55 208.53,-122.26 204.1,-131.88 211.07,-132.55\"/>\n",
-       "<text text-anchor=\"middle\" x=\"223.11\" y=\"-142.8\" font-family=\"Times,serif\" font-size=\"14.00\">cubes</text>\n",
+       "<title>squares&#45;&gt;Cell 5: Average Square</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M87.87,-239.3C85.33,-230.32 82.64,-220.77 80.23,-212.23\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"83.55,-211.11 77.47,-202.43 76.81,-213.01 83.55,-211.11\"/>\n",
        "</g>\n",
-       "<!-- leaf1333 -->\n",
-       "<!-- Average Square&#45;&gt;leaf1333 -->\n",
+       "<!-- cubes -->\n",
+       "<g id=\"node6\" class=\"node\">\n",
+       "<title>cubes</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"227.61\" cy=\"-274.82\" rx=\"30.49\" ry=\"30.49\"/>\n",
+       "<text text-anchor=\"middle\" x=\"227.61\" y=\"-270.62\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">cubes</text>\n",
+       "</g>\n",
+       "<!-- Cell 2: Cube&#45;&gt;cubes -->\n",
        "<g id=\"edge6\" class=\"edge\">\n",
-       "<title>Average Square&#45;&gt;leaf1333</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M66.17,-85.6C66.17,-74.26 66.17,-59.23 66.17,-46.32\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"69.67,-46.1 66.17,-36.1 62.67,-46.1 69.67,-46.1\"/>\n",
-       "<text text-anchor=\"middle\" x=\"109.7\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">average_square</text>\n",
+       "<title>Cell 2: Cube&#45;&gt;cubes</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M225.18,-347.36C225.5,-338.2 225.89,-326.5 226.27,-315.22\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"229.78,-315.19 226.62,-305.07 222.78,-314.95 229.78,-315.19\"/>\n",
+       "</g>\n",
+       "<!-- Cell 5: Average Cube -->\n",
+       "<g id=\"node9\" class=\"node\">\n",
+       "<title>Cell 5: Average Cube</title>\n",
+       "<polygon fill=\"none\" stroke=\"blue\" points=\"299.5,-202.2 163.72,-202.2 163.72,-166.2 299.5,-166.2 299.5,-202.2\"/>\n",
+       "<text text-anchor=\"middle\" x=\"231.61\" y=\"-180\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cell 5: Average Cube</text>\n",
        "</g>\n",
-       "<!-- leaf6554 -->\n",
-       "<!-- Average Cube&#45;&gt;leaf6554 -->\n",
+       "<!-- cubes&#45;&gt;Cell 5: Average Cube -->\n",
        "<g id=\"edge7\" class=\"edge\">\n",
-       "<title>Average Cube&#45;&gt;leaf6554</title>\n",
-       "<path fill=\"none\" stroke=\"black\" d=\"M210.17,-85.6C210.17,-74.26 210.17,-59.23 210.17,-46.32\"/>\n",
-       "<polygon fill=\"black\" stroke=\"black\" points=\"213.67,-46.1 210.17,-36.1 206.67,-46.1 213.67,-46.1\"/>\n",
-       "<text text-anchor=\"middle\" x=\"248.65\" y=\"-56.8\" font-family=\"Times,serif\" font-size=\"14.00\">average_cube</text>\n",
+       "<title>cubes&#45;&gt;Cell 5: Average Cube</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M228.94,-244.31C229.4,-234.11 229.91,-222.72 230.37,-212.68\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"233.87,-212.59 230.83,-202.44 226.88,-212.27 233.87,-212.59\"/>\n",
+       "</g>\n",
+       "<!-- average_square -->\n",
+       "<g id=\"node8\" class=\"node\">\n",
+       "<title>average_square</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"72.61\" cy=\"-65.1\" rx=\"65.2\" ry=\"65.2\"/>\n",
+       "<text text-anchor=\"middle\" x=\"72.61\" y=\"-60.9\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">average_square</text>\n",
+       "</g>\n",
+       "<!-- Cell 5: Average Square&#45;&gt;average_square -->\n",
+       "<g id=\"edge8\" class=\"edge\">\n",
+       "<title>Cell 5: Average Square&#45;&gt;average_square</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M72.61,-166.17C72.61,-158.97 72.61,-150.04 72.61,-140.5\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"76.11,-140.39 72.61,-130.39 69.11,-140.39 76.11,-140.39\"/>\n",
+       "</g>\n",
+       "<!-- average_cube -->\n",
+       "<g id=\"node10\" class=\"node\">\n",
+       "<title>average_cube</title>\n",
+       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"231.61\" cy=\"-65.1\" rx=\"58.92\" ry=\"58.92\"/>\n",
+       "<text text-anchor=\"middle\" x=\"231.61\" y=\"-60.9\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">average_cube</text>\n",
+       "</g>\n",
+       "<!-- Cell 5: Average Cube&#45;&gt;average_cube -->\n",
+       "<g id=\"edge9\" class=\"edge\">\n",
+       "<title>Cell 5: Average Cube&#45;&gt;average_cube</title>\n",
+       "<path fill=\"none\" stroke=\"black\" d=\"M231.61,-166.17C231.61,-157.35 231.61,-145.94 231.61,-133.99\"/>\n",
+       "<polygon fill=\"black\" stroke=\"black\" points=\"235.11,-133.88 231.61,-123.88 228.11,-133.88 235.11,-133.88\"/>\n",
        "</g>\n",
        "</g>\n",
        "</svg>\n"
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index d0cd661..39d1445 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -31,14 +31,14 @@ import sys
 
 import apache_beam as beam
 from apache_beam.runners import runner
-from apache_beam.runners.utils import is_interactive
+from apache_beam.utils.interactive_utils import is_in_ipython
+from apache_beam.utils.interactive_utils import is_in_notebook
 
 # Interactive Beam user flow is data-centric rather than pipeline-centric, so
 # there is only one global interactive environment instance that manages
 # implementation that enables interactivity.
 _interactive_beam_env = None
 
-
 _LOGGER = logging.getLogger(__name__)
 
 
@@ -84,6 +84,7 @@ class InteractiveEnvironment(object):
     # InteractiveRunner is responsible for populating this dictionary
     # implicitly.
     self._pipeline_results = {}
+    self._tracked_user_pipelines = set()
     # Always watch __main__ module.
     self.watch('__main__')
     # Do a warning level logging if current python version is below 3.6.
@@ -106,7 +107,8 @@ class InteractiveEnvironment(object):
                       'install apache-beam[interactive]` to install necessary '
                       'dependencies to enable all data visualization features.')
 
-    self._is_in_ipython, self._is_in_notebook = is_interactive()
+    self._is_in_ipython = is_in_ipython()
+    self._is_in_notebook = is_in_notebook()
     if not self._is_in_ipython:
       _LOGGER.warning('You cannot use Interactive Beam features when you are '
                       'not in an interactive environment such as a Jupyter '
@@ -221,3 +223,33 @@ class InteractiveEnvironment(object):
     if result:
       return runner.PipelineState.is_terminal(result.state)
     return True
+
+  def track_user_pipelines(self):
+    """Record references to all user-defined pipeline instances watched in
+    current environment.
+
+    Current static global singleton interactive environment holds references to
+    a set of pipeline instances defined by the user in the watched scope.
+    Interactive Beam features could use the references to determine if a given
+    pipeline is defined by user or implicitly created by Beam SDK or runners,
+    then handle them differently.
+
+    This is invoked every time a PTransform is to be applied if the current
+    code execution is under ipython due to the possibility that any user-defined
+    pipeline can be re-evaluated through notebook cell re-execution at any time.
+
+    Each time this is invoked, the tracked user pipelines are refreshed to
+    remove any pipeline instances that are no longer in watched scope. For
+    example, after a notebook cell re-execution re-evaluating a pipeline
+    creation, the last pipeline reference created by last evaluation will not be
+    in watched scope anymore.
+    """
+    self._tracked_user_pipelines = set()
+    for watching in self.watching():
+      for _, val in watching:
+        if isinstance(val, beam.pipeline.Pipeline):
+          self._tracked_user_pipelines.add(val)
+
+  @property
+  def tracked_user_pipelines(self):
+    return self._tracked_user_pipelines
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index cd06b42..b9255e4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -54,17 +54,10 @@ class PipelineInstrument(object):
 
   def __init__(self, pipeline, options=None):
     self._pipeline = pipeline
-    # The cache manager should be initiated outside of this module and outside
-    # of run_pipeline() from interactive runner so that its lifespan could cover
-    # multiple runs in the interactive environment. Owned by
-    # interactive_environment module. Not owned by this module.
-    # TODO(BEAM-7760): change the scope of cache to be owned by runner or
-    # pipeline result instances because a pipeline is not 1:1 correlated to a
-    # running job. Only complete and read-only cache is valid across multiple
-    # jobs. Other cache instances should have their own scopes. Some design
-    # change should support only runner.run(pipeline) pattern rather than
-    # pipeline.run([runner]) and a runner can only run at most one pipeline at a
-    # time. Otherwise, result returned by run() is the only 1:1 anchor.
+    # The global cache manager is lazily initiated outside of this module by any
+    # interactive runner so that its lifespan could cover multiple runs in
+    # the interactive environment. Owned by interactive_environment module. Not
+    # owned by this module.
     self._cache_manager = ie.current_env().cache_manager()
 
     # Invoke a round trip through the runner API. This makes sure the Pipeline
@@ -444,12 +437,15 @@ class PipelineInstrument(object):
     cache. It doesn't mean that there would definitely be such cache already.
     Also, the pcoll can come from the original user defined pipeline object or
     an equivalent pcoll from a transformed copy of the original pipeline.
+
+    'pcoll_id' of cacheable is not stable for cache_key, thus not included in
+    cache key. A combination of 'var', 'version' and 'producer_version' is
+    sufficient to identify a cached PCollection.
     """
     cacheable = self.cacheables.get(self._cacheable_key(pcoll), None)
     if cacheable:
       return '_'.join((cacheable['var'],
                        cacheable['version'],
-                       cacheable['pcoll_id'],
                        cacheable['producer_version']))
     return ''
 
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index c45b8e3..09b646e 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -31,6 +31,8 @@ from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_instrument as instr
 from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_equal
+from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_proto_equal
 
 # Work around nose tests using Python2 without unittest.mock module.
 try:
@@ -44,47 +46,6 @@ class PipelineInstrumentTest(unittest.TestCase):
   def setUp(self):
     ie.new_env(cache_manager=cache.FileBasedCacheManager())
 
-  def assertPipelineProtoEqual(self, actual_pipeline_proto,
-                               expected_pipeline_proto):
-    components1 = actual_pipeline_proto.components
-    components2 = expected_pipeline_proto.components
-    self.assertEqual(len(components1.transforms), len(components2.transforms))
-    self.assertEqual(len(components1.pcollections),
-                     len(components2.pcollections))
-
-    # GreatEqual instead of Equal because the pipeline_proto_to_execute could
-    # include more windowing_stratagies and coders than necessary.
-    self.assertGreaterEqual(len(components1.windowing_strategies),
-                            len(components2.windowing_strategies))
-    self.assertGreaterEqual(len(components1.coders), len(components2.coders))
-    self.assertTransformEqual(actual_pipeline_proto,
-                              actual_pipeline_proto.root_transform_ids[0],
-                              expected_pipeline_proto,
-                              expected_pipeline_proto.root_transform_ids[0])
-
-  def assertPipelineEqual(self, actual_pipeline, expected_pipeline):
-    actual_pipeline_proto = actual_pipeline.to_runner_api(use_fake_coders=True)
-    expected_pipeline_proto = expected_pipeline.to_runner_api(
-        use_fake_coders=True)
-    self.assertPipelineProtoEqual(actual_pipeline_proto,
-                                  expected_pipeline_proto)
-
-  def assertTransformEqual(self, actual_pipeline_proto, actual_transform_id,
-                           expected_pipeline_proto, expected_transform_id):
-    transform_proto1 = actual_pipeline_proto.components.transforms[
-        actual_transform_id]
-    transform_proto2 = expected_pipeline_proto.components.transforms[
-        expected_transform_id]
-    self.assertEqual(transform_proto1.spec.urn, transform_proto2.spec.urn)
-    # Skipping payload checking because PTransforms of the same functionality
-    # could generate different payloads.
-    self.assertEqual(len(transform_proto1.subtransforms),
-                     len(transform_proto2.subtransforms))
-    self.assertSetEqual(set(transform_proto1.inputs),
-                        set(transform_proto2.inputs))
-    self.assertSetEqual(set(transform_proto1.outputs),
-                        set(transform_proto2.outputs))
-
   def test_pcolls_to_pcoll_id(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
     # pylint: disable=range-builtin-not-iterating
@@ -137,14 +98,11 @@ class PipelineInstrumentTest(unittest.TestCase):
 
     pin = instr.pin(p)
     self.assertEqual(pin.cache_key(init_pcoll), 'init_pcoll_' + str(
-        id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
-            init_pcoll.producer)))
+        id(init_pcoll)) + '_' + str(id(init_pcoll.producer)))
     self.assertEqual(pin.cache_key(squares), 'squares_' + str(
-        id(squares)) + '_ref_PCollection_PCollection_11_' + str(id(
-            squares.producer)))
+        id(squares)) + '_' + str(id(squares.producer)))
     self.assertEqual(pin.cache_key(cubes), 'cubes_' + str(
-        id(cubes)) + '_ref_PCollection_PCollection_12_' + str(id(
-            cubes.producer)))
+        id(cubes)) + '_' + str(id(cubes.producer)))
 
   def test_cacheables(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -224,7 +182,7 @@ class PipelineInstrumentTest(unittest.TestCase):
     expected_pipeline = p.to_runner_api(return_context=False,
                                         use_fake_coders=True)
 
-    self.assertPipelineProtoEqual(actual_pipeline, expected_pipeline)
+    assert_pipeline_proto_equal(self, expected_pipeline, actual_pipeline)
 
   def _example_pipeline(self, watch=True):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -275,7 +233,7 @@ class PipelineInstrumentTest(unittest.TestCase):
         ('_WriteCache_' + second_pcoll_cache_key) >> cache.WriteCache(
             ie.current_env().cache_manager(), second_pcoll_cache_key))
     # The 2 pipelines should be the same now.
-    self.assertPipelineEqual(p_copy, p_origin)
+    assert_pipeline_equal(self, p_copy, p_origin)
 
   def test_instrument_example_pipeline_to_read_cache(self):
     p_origin, init_pcoll, second_pcoll = self._example_pipeline()
@@ -283,12 +241,10 @@ class PipelineInstrumentTest(unittest.TestCase):
 
     # Mock as if cacheable PCollections are cached.
     init_pcoll_cache_key = 'init_pcoll_' + str(
-        id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
-            init_pcoll.producer))
+        id(init_pcoll)) + '_' + str(id(init_pcoll.producer))
     self._mock_write_cache(init_pcoll, init_pcoll_cache_key)
     second_pcoll_cache_key = 'second_pcoll_' + str(
-        id(second_pcoll)) + '_ref_PCollection_PCollection_11_' + str(id(
-            second_pcoll.producer))
+        id(second_pcoll)) + '_' + str(id(second_pcoll.producer))
     self._mock_write_cache(second_pcoll, second_pcoll_cache_key)
     ie.current_env().cache_manager().exists = MagicMock(return_value=True)
     instr.pin(p_copy)
@@ -315,7 +271,7 @@ class PipelineInstrumentTest(unittest.TestCase):
 
     v = TestReadCacheWireVisitor()
     p_origin.visit(v)
-    self.assertPipelineEqual(p_origin, p_copy)
+    assert_pipeline_equal(self, p_origin, p_copy)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/interactive/testing/__init__.py b/sdks/python/apache_beam/runners/interactive/testing/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py b/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
new file mode 100644
index 0000000..ee3acd8
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Mocked object returned by invoking get_ipython() in an ipython environment.
+_mocked_get_ipython = None
+
+
+def mock_get_ipython():
+  """Mock an ipython environment w/o setting up real ipython kernel.
+
+  Each entering of get_ipython() invocation will have the prompt increased by
+  one. Grouping arbitrary python code into separate cells using `with` clause.
+
+  Examples::
+
+    # Usage, before each test function, append:
+    @patch('IPython.get_ipython', mock_get_ipython)
+
+    # Group lines of code into a cell:
+    with mock_get_ipython():
+      # arbitrary python code
+      # ...
+      # arbitrary python code
+
+    # Next cell with prompt increased by one:
+    with mock_get_ipython():  # Auto-incremental
+      # arbitrary python code
+      # ...
+      # arbitrary python code
+  """
+
+  class MockedGetIpython(object):
+
+    def __init__(self):
+      self._execution_count = 0
+
+    @property
+    def execution_count(self):
+      """Execution count always starts from 1 and is constant within a cell."""
+      return self._execution_count
+
+    def __enter__(self):
+      """Marks entering of a cell/prompt."""
+      self._execution_count = self._execution_count + 1
+
+    def __exit__(self, exc_type, exc_value, traceback):
+      """Marks exiting of a cell/prompt."""
+      pass
+
+  global _mocked_get_ipython
+  if not _mocked_get_ipython:
+    _mocked_get_ipython = MockedGetIpython()
+  return _mocked_get_ipython
diff --git a/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
new file mode 100644
index 0000000..a7762a4
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to verify implicit cache transforms applied by Interactive Beam.
+
+For internal use only; no backwards-compatibility guarantees.
+This utility should only be used by Interactive Beam tests. For example, it can
+be used to verify if the implicit cache transforms are applied as expected when
+running a pipeline with the InteractiveRunner. It can also be used to verify if
+a pipeline fragment has pruned unnecessary transforms. It shouldn't be used to
+verify equivalence between pipelines if the code to be tested depends on or
+mutates user code within transforms in pipelines.
+"""
+
+
+def assert_pipeline_equal(test_case, expected_pipeline, actual_pipeline):
+  """Asserts the equivalence between two given apache_beam.Pipeline instances.
+
+  Args:
+    test_case: (unittest.TestCase) the unittest testcase where it asserts.
+    expected_pipeline: (Pipeline) the pipeline instance expected.
+    actual_pipeline: (Pipeline) the actual pipeline instance to be asserted.
+  """
+  expected_pipeline_proto = expected_pipeline.to_runner_api(
+      use_fake_coders=True)
+  actual_pipeline_proto = actual_pipeline.to_runner_api(use_fake_coders=True)
+  assert_pipeline_proto_equal(test_case, expected_pipeline_proto,
+                              actual_pipeline_proto)
+
+
+def assert_pipeline_proto_equal(test_case,
+                                expected_pipeline_proto,
+                                actual_pipeline_proto):
+  """Asserts the equivalence between two pipeline proto representations."""
+  components1 = expected_pipeline_proto.components
+  components2 = actual_pipeline_proto.components
+  test_case.assertEqual(len(components1.transforms),
+                        len(components2.transforms))
+  test_case.assertEqual(len(components1.pcollections),
+                        len(components2.pcollections))
+
+  # TODO(BEAM-7926): Update tests and make below 2 assertions assertEqual.
+  test_case.assertLessEqual(len(components1.windowing_strategies),
+                            len(components2.windowing_strategies))
+  test_case.assertLessEqual(len(components1.coders), len(components2.coders))
+
+  _assert_transform_equal(test_case,
+                          actual_pipeline_proto,
+                          actual_pipeline_proto.root_transform_ids[0],
+                          expected_pipeline_proto,
+                          expected_pipeline_proto.root_transform_ids[0])
+
+
+def _assert_transform_equal(test_case,
+                            expected_pipeline_proto,
+                            expected_transform_id,
+                            actual_pipeline_proto,
+                            actual_transform_id):
+  """Asserts the equivalence between transforms from two given pipelines. """
+  transform_proto1 = expected_pipeline_proto.components.transforms[
+      expected_transform_id]
+  transform_proto2 = actual_pipeline_proto.components.transforms[
+      actual_transform_id]
+  test_case.assertEqual(transform_proto1.spec.urn, transform_proto2.spec.urn)
+  # Skipping payload checking because PTransforms of the same functionality
+  # could generate different payloads.
+  test_case.assertEqual(len(transform_proto1.subtransforms),
+                        len(transform_proto2.subtransforms))
+  test_case.assertSetEqual(set(transform_proto1.inputs),
+                           set(transform_proto2.inputs))
+  test_case.assertSetEqual(set(transform_proto1.outputs),
+                           set(transform_proto2.outputs))
diff --git a/sdks/python/apache_beam/runners/utils.py b/sdks/python/apache_beam/runners/utils.py
deleted file mode 100644
index 8952423..0000000
--- a/sdks/python/apache_beam/runners/utils.py
+++ /dev/null
@@ -1,47 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Common utility module shared by runners.
-
-For internal use only; no backwards-compatibility guarantees.
-"""
-from __future__ import absolute_import
-
-
-def is_interactive():
-  """Determines if current code execution is in interactive environment.
-
-  Returns:
-    is_in_ipython: (bool) tells if current code is executed within an ipython
-        session.
-    is_in_notebook: (bool) tells if current code is executed from an ipython
-        notebook.
-
-  If is_in_notebook is True, then is_in_ipython must also be True.
-  """
-  is_in_ipython = False
-  is_in_notebook = False
-  # Check if the runtime is within an interactive environment, i.e., ipython.
-  try:
-    from IPython import get_ipython  # pylint: disable=import-error
-    if get_ipython():
-      is_in_ipython = True
-      if 'IPKernelApp' in get_ipython().config:
-        is_in_notebook = True
-  except ImportError:
-    pass  # If dependencies are not available, then not interactive for sure.
-  return is_in_ipython, is_in_notebook
diff --git a/sdks/python/apache_beam/utils/interactive_utils.py b/sdks/python/apache_beam/utils/interactive_utils.py
new file mode 100644
index 0000000..ac4e5d8
--- /dev/null
+++ b/sdks/python/apache_beam/utils/interactive_utils.py
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Common interactive utility module.
+
+For experimental usage only; no backwards-compatibility guarantees.
+"""
+from __future__ import absolute_import
+
+import logging
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def is_in_ipython():
+  """Determines if current code is executed within an ipython session."""
+  is_in_ipython = False
+  # Check if the runtime is within an interactive environment, i.e., ipython.
+  try:
+    from IPython import get_ipython  # pylint: disable=import-error
+    if get_ipython():
+      is_in_ipython = True
+  except ImportError:
+    pass  # If dependencies are not available, then not interactive for sure.
+  return is_in_ipython
+
+
+def is_in_notebook():
+  """Determines if current code is executed from an ipython notebook.
+
+  If is_in_notebook() is True, then is_in_ipython() must also be True.
+  """
+  is_in_notebook = False
+  if is_in_ipython():
+    # The import and usage must be valid under the execution path.
+    from IPython import get_ipython
+    if 'IPKernelApp' in get_ipython().config:
+      is_in_notebook = True
+  return is_in_notebook
+
+
+def alter_label_if_ipython(transform, pvalueish):
+  """Alters the label to an interactive label with ipython prompt metadata
+  prefixed for the given transform if the given pvalueish belongs to a
+  user-defined pipeline and current code execution is within an ipython kernel.
+  Otherwise, noop.
+
+  A label is either a user-defined or auto-generated str name of a PTransform
+  that is unique within a pipeline. If current environment is_in_ipython(), Beam
+  can implicitly create interactive labels to replace labels of top-level
+  PTransforms to be applied. The label is formatted as:
+  `Cell {prompt}: {original_label}`.
+  """
+  if is_in_ipython():
+    from apache_beam.runners.interactive import interactive_environment as ie
+    # Tracks user defined pipeline instances in watched scopes so that we only
+    # alter labels for any transform to pvalueish belonging to those pipeline
+    # instances, excluding any transform to be applied in other pipeline
+    # instances the Beam SDK creates implicitly.
+    ie.current_env().track_user_pipelines()
+    from IPython import get_ipython
+    prompt = get_ipython().execution_count
+    pipeline = _extract_pipeline_of_pvalueish(pvalueish)
+    if not pipeline:
+      _LOGGER.warning('Failed to alter the label of a transform with the '
+                      'ipython prompt metadata. Cannot figure out the pipeline '
+                      'that the given pvalueish %s belongs to. Thus noop.'
+                      % pvalueish)
+    if (pipeline
+        # We only alter for transforms to be applied to user-defined pipelines
+        # at pipeline construction time.
+        and pipeline in ie.current_env().tracked_user_pipelines):
+      transform.label = 'Cell {}: {}'.format(prompt, transform.label)
+
+
+def _extract_pipeline_of_pvalueish(pvalueish):
+  """Extracts the pipeline that the given pvalueish belongs to."""
+  if isinstance(pvalueish, tuple):
+    pvalue = pvalueish[0]
+  elif isinstance(pvalueish, dict):
+    pvalue = next(iter(pvalueish.values()))
+  else:
+    pvalue = pvalueish
+  if hasattr(pvalue, 'pipeline'):
+    return pvalue.pipeline
+  return None