You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ni...@apache.org on 2022/05/06 17:45:49 UTC

[beam] branch master updated: [BEAM-14332] Refactored cluster management for Flink on Dataproc (#17402)

This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 30c3ea22d96 [BEAM-14332] Refactored cluster management for Flink on Dataproc (#17402)
30c3ea22d96 is described below

commit 30c3ea22d96aeec6b05d80b27b57ac1f8320d934
Author: Ning Kang <ni...@gmail.com>
AuthorDate: Fri May 6 10:45:43 2022 -0700

    [BEAM-14332] Refactored cluster management for Flink on Dataproc (#17402)
    
    1. Used ClusterMetadata to replace MasterURLIdentifier. Instead of
       MasterURLIdentifier, now Union[str, beam.Pipeline, ClusterMetadata]
       functions as ClusterIdentifier.
    2. Made DataprocClusterManager 1:1 with real clusters instead of 1:1
       with pipelines.
    3. Deprecated bidict and many unnecessary mappings in Clusters class.
    4. Added a create factory in Clusters class to create cluster managers.
    5. Used default cluster metadata to replace the default cluster name.
       Now each cluster created has a distinct cluster name.
    6. Applied mocks to isolate singleton environment/clusters instance in
       tests to avoid test failures due to shared states.
---
 .../dataproc/dataproc_cluster_manager.py           | 139 +++----
 .../dataproc/dataproc_cluster_manager_test.py      |  49 ++-
 .../runners/interactive/dataproc/types.py          |  25 +-
 .../runners/interactive/interactive_beam.py        | 341 ++++++++++------
 .../runners/interactive/interactive_beam_test.py   | 435 +++++++++++++--------
 .../runners/interactive/interactive_environment.py |  14 +-
 .../interactive/interactive_environment_test.py    |  19 +-
 .../runners/interactive/interactive_runner.py      | 124 ++----
 .../runners/interactive/interactive_runner_test.py | 183 +++++----
 .../messaging/interactive_environment_inspector.py |  28 +-
 .../interactive_environment_inspector_test.py      |  69 ++--
 .../runners/interactive/recording_manager.py       |  11 +-
 .../runners/interactive/testing/mock_env.py        |  90 +++++
 .../runners/interactive/testing/mock_ipython.py    |   2 +
 .../apache_beam/runners/interactive/utils.py       |  55 +--
 .../apache_beam/runners/interactive/utils_test.py  |  32 +-
 16 files changed, 890 insertions(+), 726 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
index f636d162daa..2e2007abc79 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
@@ -23,9 +23,9 @@ import time
 from typing import Optional
 from typing import Tuple
 
+from apache_beam import version as beam_version
 from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
 from apache_beam.runners.interactive.utils import progress_indicated
 
 try:
@@ -40,45 +40,28 @@ except ImportError:
 
 _LOGGER = logging.getLogger(__name__)
 
+# Name of the log file auto-generated by Dataproc. We use it to locate the
+# startup output of the Flink daemon to retrieve master url and dashboard
+# information.
+DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
+
 
 class DataprocClusterManager:
-  """The DataprocClusterManager object simplifies the operations
-  required for creating and deleting Dataproc clusters for use
-  under Interactive Beam.
+  """Self-contained cluster manager that controls the lifecyle of a Dataproc
+  cluster connected by one or more pipelines under Interactive Beam.
   """
-  def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
+  def __init__(self, cluster_metadata: ClusterMetadata) -> None:
     """Initializes the DataprocClusterManager with properties required
     to interface with the Dataproc ClusterControllerClient.
     """
     self.cluster_metadata = cluster_metadata
-    if self.cluster_metadata.region == 'global':
-      # The global region is unsupported as it will be eventually deprecated.
-      raise ValueError('Clusters in the global region are not supported.')
-    elif not self.cluster_metadata.region:
-      _LOGGER.warning(
-          'No region information was detected, defaulting Dataproc cluster '
-          'region to: us-central1.')
-      self.cluster_metadata.region = 'us-central1'
-
-    if not self.cluster_metadata.cluster_name:
-      self.cluster_metadata.cluster_name = ie.current_env(
-      ).clusters.default_cluster_name
-
+    # Pipelines whose jobs are executed on the cluster.
+    self.pipelines = set()
     self._cluster_client = dataproc_v1.ClusterControllerClient(
         client_options={
             'api_endpoint': \
             f'{self.cluster_metadata.region}-dataproc.googleapis.com:443'
         })
-
-    if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse:
-      self.master_url = ie.current_env().clusters.master_urls.inverse[
-          self.cluster_metadata]
-      self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[
-          self.master_url]
-    else:
-      self.master_url = None
-      self.dashboard = None
-
     self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
     self._staging_directory = None
 
@@ -92,7 +75,7 @@ class DataprocClusterManager:
           schema for clusters here:
           https://cloud.google.com/python/docs/reference/dataproc/latest/google.cloud.dataproc_v1.types.Cluster
     """
-    if self.master_url:
+    if self.cluster_metadata.master_url:
       return
     try:
       self._cluster_client.create_cluster(
@@ -101,18 +84,11 @@ class DataprocClusterManager:
               'region': self.cluster_metadata.region,
               'cluster': cluster
           })
-      _LOGGER.info(
-          'Cluster created successfully: %s',
-          self.cluster_metadata.cluster_name)
-      self._staging_directory = self.get_staging_location(self.cluster_metadata)
-      self.master_url, self.dashboard = self.get_master_url_and_dashboard(
-        self.cluster_metadata,
-        self._staging_directory)
     except Exception as e:
       if e.code == 409:
         _LOGGER.info(
             'Cluster %s already exists. Continuing...',
-            ie.current_env().clusters.default_cluster_name)
+            self.cluster_metadata.cluster_name)
       elif e.code == 403:
         _LOGGER.error(
             'Due to insufficient project permissions, '
@@ -130,6 +106,14 @@ class DataprocClusterManager:
         _LOGGER.error(
             'Unable to create cluster: %s', self.cluster_metadata.cluster_name)
         raise e
+    else:
+      _LOGGER.info(
+          'Cluster created successfully: %s',
+          self.cluster_metadata.cluster_name)
+      self._staging_directory = self.get_staging_location()
+      master_url, dashboard = self.get_master_url_and_dashboard()
+      self.cluster_metadata.master_url = master_url
+      self.cluster_metadata.dashboard = dashboard
 
   def create_flink_cluster(self) -> None:
     """Calls _create_cluster with a configuration that enables FlinkRunner."""
@@ -155,6 +139,10 @@ class DataprocClusterManager:
             'endpoint_config': {
                 'enable_http_port_access': True
             }
+        },
+        'labels': {
+            'goog-dataflow-notebook': beam_version.__version__.replace(
+                '.', '_')
         }
     }
     self.create_cluster(cluster)
@@ -163,14 +151,13 @@ class DataprocClusterManager:
     """Deletes the cluster that uses the attributes initialized
     with the DataprocClusterManager instance."""
     try:
-      if self._staging_directory:
-        self.cleanup_staging_files(self._staging_directory)
       self._cluster_client.delete_cluster(
           request={
               'project_id': self.cluster_metadata.project_id,
               'region': self.cluster_metadata.region,
               'cluster_name': self.cluster_metadata.cluster_name,
           })
+      self.cleanup_staging_files()
     except Exception as e:
       if e.code == 403:
         _LOGGER.error(
@@ -191,72 +178,60 @@ class DataprocClusterManager:
             'Failed to delete cluster: %s', self.cluster_metadata.cluster_name)
         raise e
 
-  def describe(self) -> None:
-    """Returns a dictionary describing the cluster."""
-    return {
-        'cluster_metadata': self.cluster_metadata,
-        'master_url': self.master_url,
-        'dashboard': self.dashboard
-    }
-
-  def get_cluster_details(
-      self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
+  def get_cluster_details(self) -> dataproc_v1.Cluster:
     """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
     try:
       return self._cluster_client.get_cluster(
           request={
-              'project_id': cluster_metadata.project_id,
-              'region': cluster_metadata.region,
-              'cluster_name': cluster_metadata.cluster_name
+              'project_id': self.cluster_metadata.project_id,
+              'region': self.cluster_metadata.region,
+              'cluster_name': self.cluster_metadata.cluster_name
           })
     except Exception as e:
       if e.code == 403:
         _LOGGER.error(
             'Due to insufficient project permissions, '
             'unable to retrieve information for cluster: %s',
-            cluster_metadata.cluster_name)
+            self.cluster_metadata.cluster_name)
         raise ValueError(
             'You cannot view clusters in project: {}'.format(
-                cluster_metadata.project_id))
+                self.cluster_metadata.project_id))
       elif e.code == 404:
         _LOGGER.error(
-            'Cluster does not exist: %s', cluster_metadata.cluster_name)
+            'Cluster does not exist: %s', self.cluster_metadata.cluster_name)
         raise ValueError(
-            'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+            'Cluster was not found: {}'.format(
+                self.cluster_metadata.cluster_name))
       else:
         _LOGGER.error(
             'Failed to get information for cluster: %s',
-            cluster_metadata.cluster_name)
+            self.cluster_metadata.cluster_name)
         raise e
 
-  def wait_for_cluster_to_provision(
-      self, cluster_metadata: MasterURLIdentifier) -> None:
-    while self.get_cluster_details(
-        cluster_metadata).status.state.name == 'CREATING':
+  def wait_for_cluster_to_provision(self) -> None:
+    while self.get_cluster_details().status.state.name == 'CREATING':
       time.sleep(15)
 
-  def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str:
+  def get_staging_location(self) -> str:
     """Gets the staging bucket of an existing Dataproc cluster."""
     try:
-      self.wait_for_cluster_to_provision(cluster_metadata)
-      cluster_details = self.get_cluster_details(cluster_metadata)
+      self.wait_for_cluster_to_provision()
+      cluster_details = self.get_cluster_details()
       bucket_name = cluster_details.config.config_bucket
       gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/'
       for file in self._fs._list(gcs_path):
-        if cluster_metadata.cluster_name in file.path:
+        if self.cluster_metadata.cluster_name in file.path:
           # this file path split will look something like:
           # ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/',
           # '-{node-type}/dataproc-startup-script_output']
-          return file.path.split(cluster_metadata.cluster_name)[0]
+          return file.path.split(self.cluster_metadata.cluster_name)[0]
     except Exception as e:
       _LOGGER.error(
           'Failed to get %s cluster staging bucket.',
-          cluster_metadata.cluster_name)
+          self.cluster_metadata.cluster_name)
       raise e
 
-  def parse_master_url_and_dashboard(
-      self, cluster_metadata: MasterURLIdentifier,
-      line: str) -> Tuple[str, str]:
+  def parse_master_url_and_dashboard(self, line: str) -> Tuple[str, str]:
     """Parses the master_url and YARN application_id of the Flink process from
     an input line. The line containing both the master_url and application id
     is always formatted as such:
@@ -270,7 +245,7 @@ class DataprocClusterManager:
     'application_123456789000_0001'.
 
     Returns the flink_master_url and dashboard link as a tuple."""
-    cluster_details = self.get_cluster_details(cluster_metadata)
+    cluster_details = self.get_cluster_details()
     yarn_endpoint = cluster_details.config.endpoint_config.http_ports[
         'YARN ResourceManager']
     segment = line.split('Found Web Interface ')[1].split(' of application ')
@@ -282,13 +257,11 @@ class DataprocClusterManager:
         yarn_endpoint)
     return master_url, dashboard
 
-  def get_master_url_and_dashboard(
-      self, cluster_metadata: MasterURLIdentifier,
-      staging_bucket) -> Tuple[Optional[str], Optional[str]]:
+  def get_master_url_and_dashboard(self) -> Tuple[Optional[str], Optional[str]]:
     """Returns the master_url of the current cluster."""
     startup_logs = []
-    for file in self._fs._list(staging_bucket):
-      if ie.current_env().clusters.DATAPROC_STAGING_LOG_NAME in file.path:
+    for file in self._fs._list(self._staging_directory):
+      if DATAPROC_STAGING_LOG_NAME in file.path:
         startup_logs.append(file.path)
 
     for log in startup_logs:
@@ -296,10 +269,12 @@ class DataprocClusterManager:
       for line in content.readlines():
         decoded_line = line.decode()
         if 'Found Web Interface' in decoded_line:
-          return self.parse_master_url_and_dashboard(
-              cluster_metadata, decoded_line)
+          return self.parse_master_url_and_dashboard(decoded_line)
     return None, None
 
-  def cleanup_staging_files(self, staging_directory: str) -> None:
-    staging_files = [file.path for file in self._fs._list(staging_directory)]
-    self._fs.delete(staging_files)
+  def cleanup_staging_files(self) -> None:
+    if self._staging_directory:
+      staging_files = [
+          file.path for file in self._fs._list(self._staging_directory)
+      ]
+      self._fs.delete(staging_files)
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
index fc826e34cbc..2ba832f46e7 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
@@ -24,7 +24,7 @@ from unittest.mock import patch
 
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
 
 try:
   from google.cloud import dataproc_v1  # pylint: disable=unused-import
@@ -87,7 +87,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that no exception is thrown when a cluster already exists,
     but is using ie.current_env().clusters.default_cluster_name.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -103,7 +103,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when a user is trying to write to
     a project while having insufficient permissions.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -121,7 +121,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when a user specifies a region
     that does not exist.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -137,7 +137,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when the exception is not handled by
     any other case under _create_cluster.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -157,7 +157,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when a user is trying to delete
     a project that they have insufficient permissions for.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -179,7 +179,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when cleanup attempts to delete
     a cluster that does not exist.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -199,7 +199,7 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when the exception is not handled by
     any other case under cleanup.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
@@ -223,13 +223,13 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Test to receive a mock staging location successfully under
     get_staging_location.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project',
         region='test-region',
         cluster_name='test-cluster')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     self.assertEqual(
-        cluster_manager.get_staging_location(cluster_metadata),
+        cluster_manager.get_staging_location(),
         'gs://test-bucket/google-cloud-dataproc-metainfo/')
 
   @patch(
@@ -239,13 +239,13 @@ class DataprocClusterManagerTest(unittest.TestCase):
     """
     Test to catch when an error is raised inside get_staging_location.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project',
         region='test-region',
         cluster_name='test-cluster')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     with self.assertRaises(MockException):
-      cluster_manager.get_staging_location(cluster_metadata)
+      cluster_manager.get_staging_location()
 
   @patch(
       'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
@@ -262,13 +262,12 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that parse_master_url_and_dashboard properly parses the input
     string and produces a mock master_url and mock dashboard link.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     line = 'test-line Found Web Interface test-master-url' \
     ' of application \'test-app-id\'.\n'
-    master_url, dashboard = cluster_manager.parse_master_url_and_dashboard(
-      cluster_metadata, line)
+    master_url, dashboard = cluster_manager.parse_master_url_and_dashboard(line)
     self.assertEqual('test-master-url', master_url)
     self.assertEqual(
         'test-resource-manager/gateway/default/yarn/proxy/test-app-id/',
@@ -282,14 +281,14 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when a user is trying to get information
     for a project without sufficient permissions to do so.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(
         _LOGGER,
         level='ERROR') as context_manager, self.assertRaises(ValueError):
-      cluster_manager.get_cluster_details(cluster_metadata)
+      cluster_manager.get_cluster_details()
       self.assertTrue(
           'Due to insufficient project permissions' in
           context_manager.output[0])
@@ -302,14 +301,14 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when cleanup attempts to get information
     for a cluster that does not exist.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(
         _LOGGER,
         level='ERROR') as context_manager, self.assertRaises(ValueError):
-      cluster_manager.get_cluster_details(cluster_metadata)
+      cluster_manager.get_cluster_details()
       self.assertTrue('Cluster does not exist' in context_manager.output[0])
 
   @patch(
@@ -320,14 +319,14 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when the exception is not handled by
     any other case under get_cluster_details.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(
         _LOGGER,
         level='ERROR') as context_manager, self.assertRaises(MockException):
-      cluster_manager.get_cluster_details(cluster_metadata)
+      cluster_manager.get_cluster_details()
       self.assertTrue(
           'Failed to get information for cluster' in context_manager.output[0])
 
@@ -341,14 +340,12 @@ class DataprocClusterManagerTest(unittest.TestCase):
     unique substring which identifies the location of the master_url and
     application id of the Flink master.
     """
-    cluster_metadata = MasterURLIdentifier(
+    cluster_metadata = ClusterMetadata(
         project_id='test-project', region='test-region')
     cluster_manager = DataprocClusterManager(cluster_metadata)
     cluster_manager._fs = MockFileSystem()
-    master_url, dashboard = cluster_manager.get_master_url_and_dashboard(
-        cluster_metadata,
-        'test-staging-bucket'
-    )
+    cluster_metadata._staging_directory = 'test-staging-bucket'
+    master_url, dashboard = cluster_manager.get_master_url_and_dashboard()
     self.assertEqual(master_url, 'test-master-url')
     self.assertEqual(dashboard, 'test-dashboard-link')
 
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/types.py b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
index 5d0b578e2c2..ed2400e48f1 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/types.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
@@ -17,15 +17,27 @@
 
 # pytype: skip-file
 
+import uuid
 from dataclasses import dataclass
+from dataclasses import field
 from typing import Optional
+from typing import Union
+
+from apache_beam.pipeline import Pipeline
+
+
+def _default_cluster_name():
+  return f'interactive-beam-{uuid.uuid4().hex}'
 
 
 @dataclass
-class MasterURLIdentifier:
+class ClusterMetadata:
   project_id: Optional[str] = None
   region: Optional[str] = None
-  cluster_name: Optional[str] = None
+  cluster_name: Optional[str] = field(default_factory=_default_cluster_name)
+  # Derivative fields do not affect hash or comparison.
+  master_url: Optional[str] = None
+  dashboard: Optional[str] = None
 
   def __key(self):
     return (self.project_id, self.region, self.cluster_name)
@@ -34,8 +46,9 @@ class MasterURLIdentifier:
     return hash(self.__key())
 
   def __eq__(self, other):
-    if isinstance(other, MasterURLIdentifier):
+    if isinstance(other, ClusterMetadata):
       return self.__key() == other.__key()
-    raise NotImplementedError(
-        'Comparisons are only supported between '
-        'instances of MasterURLIdentifier.')
+    return False
+
+
+ClusterIdentifier = Union[str, Pipeline, ClusterMetadata]
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index c6b111c03a4..7f9cf232b6a 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -25,6 +25,9 @@ bounded dataset. In the meantime, it hides the interactivity implementation
 from users so that users can focus on developing Beam pipeline without worrying
 about how hidden states in the interactive session are managed.
 
+A convention to import this module:
+  from apache_beam.runners.interactive import interactive_beam as ib
+
 Note: If you want backward-compatibility, only invoke interfaces provided by
 this module in your notebook or application code.
 """
@@ -32,9 +35,8 @@ this module in your notebook or application code.
 # pytype: skip-file
 
 import logging
-from collections import defaultdict
+import warnings
 from datetime import timedelta
-from typing import DefaultDict
 from typing import Dict
 from typing import List
 from typing import Optional
@@ -46,12 +48,13 @@ import apache_beam as beam
 from apache_beam.dataframe.frame_base import DeferredBase
 from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
+from apache_beam.runners.interactive.dataproc.types import ClusterIdentifier
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.display.pcoll_visualization import visualize
 from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll
 from apache_beam.runners.interactive.options import interactive_options
-from apache_beam.runners.interactive.utils import bidict
 from apache_beam.runners.interactive.utils import deferred_df_to_pcollection
 from apache_beam.runners.interactive.utils import elements_to_df
 from apache_beam.runners.interactive.utils import find_pcoll_name
@@ -339,151 +342,241 @@ class Recordings():
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # If pipeline runs on a known cluster, below code reuses the cluster
+      # manager without creating a new one.
+      dcm = ib.clusters.create(pipeline)
+
+  To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
+  set the flink_master explicitly so no cluster will be created/reused.
+
+    An example pipeline options to skip automatic Dataproc cluster usage::
+
+      options = PipelineOptions([
+          '--flink_master=some.self.hosted.flink:port',
+          '--environment_type=DOCKER'])
+
+  To configure a pipeline to run on a local FlinkRunner, explicitly set the
+  default cluster metadata to None: ib.clusters.set_default_cluster(None).
   """
   # Explicitly set the Flink version here to ensure compatibility with 2.0
   # Dataproc images:
   # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
   DATAPROC_FLINK_VERSION = '1.12'
+
   # TODO(BEAM-14142): Fix the Dataproc image version after a released image
   # contains all missing dependencies for Flink to run.
   # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
-  DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
 
   def __init__(self) -> None:
-    """Instantiates default values for Dataproc cluster interactions.
-    """
-    # Set the default_cluster_name that will be used when creating Dataproc
-    # clusters.
-    self.default_cluster_name = 'interactive-beam-cluster'
-    # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
-    # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
-    # MasterURLIdentifier -> str.
-    self.master_urls = bidict()
-    # self.dataproc_cluster_managers map string pipeline ids to instances of
-    # DataprocClusterManager.
-    self.dataproc_cluster_managers = {}
-    # self.master_urls_to_pipelines map string master_urls to lists of
-    # pipelines that use the corresponding master_url.
-    self.master_urls_to_pipelines: DefaultDict[
-        str, List[beam.Pipeline]] = defaultdict(list)
-    # self.master_urls_to_dashboards map string master_urls to the
-    # corresponding Apache Flink dashboards.
-    self.master_urls_to_dashboards: Dict[str, str] = {}
-    # self.default_cluster_metadata for creating a DataprocClusterManager when
-    # a pipeline has its cluster deleted from the clusters Jupyterlab
-    # extension.
-    self.default_cluster_metadata = None
-
-  def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
-    """Returns a description of the cluster associated to the given pipeline.
-
-    If no pipeline is given then this returns a dictionary of descriptions for
-    all pipelines, mapped to by id.
+    self.dataproc_cluster_managers: Dict[ClusterMetadata,
+                                         DataprocClusterManager] = {}
+    self.master_urls: Dict[str, ClusterMetadata] = {}
+    self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
+    self.default_cluster_metadata: Optional[ClusterMetadata] = None
+
+  def create(
+      self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager:
+    """Creates a Dataproc cluster manager provisioned for the cluster
+    identified. If the cluster is known, returns an existing cluster manager.
     """
-    description = {
-        pid: dcm.describe()
-        for pid,
-        dcm in self.dataproc_cluster_managers.items()
-    }
-    if pipeline:
-      return description.get(str(id(pipeline)), None)
-    return description
+    # Try to get some not-None cluster metadata.
+    cluster_metadata = self.cluster_metadata(cluster_identifier)
+    if not cluster_metadata:
+      raise ValueError(
+          'Unknown cluster identifier: %s. Cannot create or reuse'
+          'a Dataproc cluster.')
+    elif cluster_metadata.region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif not cluster_metadata.region:
+      _LOGGER.info(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      cluster_metadata.region = 'us-central1'
+    # else use the provided region.
+    known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
+    if known_dcm:
+      return known_dcm
+    dcm = DataprocClusterManager(cluster_metadata)
+    dcm.create_flink_cluster()
+    # ClusterMetadata with derivative fields populated by the dcm.
+    derived_meta = dcm.cluster_metadata
+    self.dataproc_cluster_managers[derived_meta] = dcm
+    self.master_urls[derived_meta.master_url] = derived_meta
+    # Update the default cluster metadata to the one just created.
+    self.set_default_cluster(derived_meta)
+    return dcm
 
   def cleanup(
-      self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None:
-    """Attempt to cleanup the Dataproc Cluster corresponding to the given
-    pipeline.
-
-    If the cluster is not managed by interactive_beam, a corresponding cluster
-    manager is not detected, and deletion is skipped.
-
-    For clusters managed by Interactive Beam, by default, deletion is skipped
-    if any other pipelines are using the cluster.
-
-    Optionally, the cleanup for a cluster managed by Interactive Beam can be
-    forced, by setting the 'force' parameter to True.
-
-    Example usage of default cleanup::
-      interactive_beam.clusters.cleanup(p)
-
-    Example usage of forceful cleanup::
-      interactive_beam.clusters.cleanup(p, force=True)
+      self,
+      cluster_identifier: Optional[ClusterIdentifier] = None,
+      force: bool = False) -> None:
+    """Cleans up the cluster associated with the given cluster_identifier.
+
+    When None cluster_identifier is provided: if force is True, cleans up for
+    all clusters; otherwise, do a dry run and NOOP.
+    If a beam.Pipeline is given as the ClusterIdentifier while multiple
+    pipelines share the same cluster, it only cleans up the association between
+    the pipeline and the cluster identified.
+    If the cluster_identifier is unknown, NOOP.
     """
-    if pipeline:
-      cluster_manager = self.dataproc_cluster_managers.get(
-          str(id(pipeline)), None)
-      if cluster_manager:
-        master_url = cluster_manager.master_url
-        if len(self.master_urls_to_pipelines[master_url]) > 1:
-          if force:
-            _LOGGER.warning(
-                'Cluster is currently being used by another cluster, but '
-                'will be forcefully cleaned up.')
-            cluster_manager.cleanup()
-          else:
-            _LOGGER.warning(
-                'Cluster is currently being used, skipping deletion.')
-          self.master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
-        else:
-          cluster_manager.cleanup()
-          self.master_urls.pop(master_url, None)
-          self.master_urls_to_pipelines.pop(master_url, None)
-          self.master_urls_to_dashboards.pop(master_url, None)
-          self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
+    if not cluster_identifier:
+      dcm_to_cleanup = set(self.dataproc_cluster_managers.values())
+      if force:
+        for dcm in dcm_to_cleanup:
+          self._cleanup(dcm)
+        self.default_cluster_metadata = None
+      else:
+        _LOGGER.warning(
+            'No cluster_identifier provided. If you intend to '
+            'clean up all clusters, invoke ib.clusters.cleanup(force=True). '
+            'Current clusters are %s.',
+            self.describe())
+    elif isinstance(cluster_identifier, beam.Pipeline):
+      p = cluster_identifier
+      dcm = self.pipelines.pop(p, None)
+      if dcm:
+        dcm.pipelines.remove(p)
+        warnings.filterwarnings(
+            'ignore',
+            'options is deprecated since First stable release. References to '
+            '<pipeline>.options will not be supported',
+            category=DeprecationWarning)
+        p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+        # Only cleans up when there is no pipeline using the cluster.
+        if not dcm.pipelines:
+          self._cleanup(dcm)
     else:
-      cluster_manager_identifiers = set()
-      for cluster_manager in self.dataproc_cluster_managers.values():
-        if cluster_manager.cluster_metadata not in cluster_manager_identifiers:
-          cluster_manager_identifiers.add(cluster_manager.cluster_metadata)
-          cluster_manager.cleanup()
-      self.dataproc_cluster_managers.clear()
-      self.master_urls.clear()
-      self.master_urls_to_pipelines.clear()
-      self.master_urls_to_dashboards.clear()
-
-  def delete_cluster(self, master: Union[str, MasterURLIdentifier]) -> None:
-    """Deletes the cluster with the given obfuscated identifier from the
-    Interactive Environment, as well as from Dataproc. Additionally, unassigns
-    the 'flink_master' pipeline option for all impacted pipelines.
+      if isinstance(cluster_identifier, str):
+        meta = self.master_urls.get(cluster_identifier, None)
+      else:
+        meta = cluster_identifier
+      dcm = self.dataproc_cluster_managers.get(meta, None)
+      if dcm:
+        self._cleanup(dcm)
+
+  def describe(
+      self,
+      cluster_identifier: Optional[ClusterIdentifier] = None
+  ) -> Union[ClusterMetadata, List[ClusterMetadata]]:
+    """Describes the ClusterMetadata by a ClusterIdentifier.
+
+    If no cluster_identifier is given or if the cluster_identifier is unknown,
+    it returns descriptions for all known clusters.
+
+    Example usage:
+    # Describe the cluster executing work for a pipeline.
+    ib.clusters.describe(pipeline)
+    # Describe the cluster with the flink master url.
+    ib.clusters.describe(master_url)
+    # Describe all existing clusters.
+    ib.clusters.describe()
     """
-    if isinstance(master, MasterURLIdentifier):
-      master_url = self.master_urls.inverse[master]
-    else:
-      master_url = master
-
-    pipelines = [
-        ie.current_env().pipeline_id_to_pipeline(pid)
-        for pid in self.master_urls_to_pipelines[master_url]
-    ]
-    for p in pipelines:
-      ie.current_env().clusters.cleanup(p)
-      p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+    if cluster_identifier:
+      meta = self._cluster_metadata(cluster_identifier)
+      if meta in self.dataproc_cluster_managers:
+        return meta
+    return list(self.dataproc_cluster_managers.keys())
 
   def set_default_cluster(
-      self, master: Union[str, MasterURLIdentifier]) -> None:
-    """Given an obfuscated identifier for a cluster, set the
-    default_cluster_metadata to be the MasterURLIdentifier that represents the
-    cluster."""
-    if isinstance(master, MasterURLIdentifier):
-      master_url = self.master_urls.inverse[master]
+      self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None:
+    """Temporarily sets the default metadata for creating or reusing a
+    DataprocClusterManager. It is always updated to the most recently created
+    cluster.
+
+    If no known ClusterMetadata can be identified by the ClusterIdentifer, NOOP.
+    If None is set, next time when Flink is in use, if no cluster is explicitly
+    configured by a pipeline, the job runs locally.
+    """
+    if cluster_identifier:
+      self.default_cluster_metadata = self.cluster_metadata(cluster_identifier)
     else:
-      master_url = master
-
-    self.default_cluster_metadata = self.master_urls[master_url]
+      self.default_cluster_metadata = None
+
+  def cluster_metadata(
+      self,
+      cluster_identifier: Optional[ClusterIdentifier] = None
+  ) -> Optional[ClusterMetadata]:
+    """Fetches the ClusterMetadata by a ClusterIdentifier that could be a
+    URL in string, a Beam pipeline, or an equivalent to a known ClusterMetadata;
+
+    If the given cluster_identifier is an URL or a pipeline that is unknown to
+    the current environment, the default cluster metadata (could be None) is
+    returned.
+    If the given cluster_identifier is a ClusterMetadata but unknown to the
+    current environment, passes it through (NOOP).
+    """
+    meta = self._cluster_metadata(cluster_identifier)
+    return meta if meta else self.default_cluster_metadata
+
+  def _cluster_metadata(
+      self,
+      cluster_identifier: Optional[ClusterIdentifier] = None
+  ) -> Optional[ClusterMetadata]:
+    meta = None
+    if cluster_identifier:
+      if isinstance(cluster_identifier, str):
+        meta = self.master_urls.get(cluster_identifier, None)
+      elif isinstance(cluster_identifier, beam.Pipeline):
+        dcm = self.pipelines.get(cluster_identifier, None)
+        if dcm:
+          meta = dcm.cluster_metadata
+      elif isinstance(cluster_identifier, ClusterMetadata):
+        meta = cluster_identifier
+        if meta in self.dataproc_cluster_managers:
+          meta = self.dataproc_cluster_managers[meta].cluster_metadata
+      else:
+        raise TypeError(
+            'A cluster_identifier should be Optional[Union[str, '
+            'beam.Pipeline, ClusterMetadata], instead %s was given.',
+            type(cluster_identifier))
+    return meta
+
+  def _cleanup(self, dcm: DataprocClusterManager) -> None:
+    dcm.cleanup()
+    self.dataproc_cluster_managers.pop(dcm.cluster_metadata, None)
+    self.master_urls.pop(dcm.cluster_metadata.master_url, None)
+    for p in dcm.pipelines:
+      self.pipelines.pop(p, None)
+    if dcm.cluster_metadata == self.default_cluster_metadata:
+      self.default_cluster_metadata = None
 
 
 # Users can set options to guide how Interactive Beam works.
 # Examples:
-# from apache_beam.runners.interactive import interactive_beam as ib
 # ib.options.enable_recording_replay = False/True
 # ib.options.recording_duration = '1m'
 # ib.options.recordable_sources.add(SourceClass)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index 4512753fa72..c5dd0ea99e0 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -28,20 +28,21 @@ from unittest.mock import patch
 
 import apache_beam as beam
 from apache_beam import dataframe as frames
+from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
 from apache_beam.runners.interactive.options.capture_limiters import Limiter
-from apache_beam.runners.interactive.utils import obfuscate
+from apache_beam.runners.interactive.testing.mock_env import isolated_env
 from apache_beam.runners.runner import PipelineState
 from apache_beam.testing.test_stream import TestStream
 
 
 @dataclasses.dataclass
-class MockMasterURLIdentifier:
+class MockClusterMetadata:
   master_url = 'mock_url'
 
 
@@ -64,10 +65,10 @@ def _get_watched_pcollections_with_variable_names():
   return watched_pcollections
 
 
+@isolated_env
 class InteractiveBeamTest(unittest.TestCase):
   def setUp(self):
     self._var_in_class_instance = 'a var in class instance, not directly used'
-    ie.new_env()
 
   def tearDown(self):
     ib.options.capture_control.set_limiters_for_test([])
@@ -296,173 +297,271 @@ class InteractiveBeamTest(unittest.TestCase):
 @unittest.skipIf(
     not ie.current_env().is_interactive_ready,
     '[interactive] dependency is not installed.')
+@isolated_env
 class InteractiveBeamClustersTest(unittest.TestCase):
   def setUp(self):
-    ie.new_env()
-
-  def test_clusters_describe(self):
-    clusters = ib.Clusters()
-    project = 'test-project'
-    region = 'test-region'
-    p = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[str(
-        id(p))] = DataprocClusterManager(cluster_metadata)
-    self.assertEqual(
-        'test-project',
-        clusters.describe()[str(id(p))]['cluster_metadata'].project_id)
-
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.get_master_url_and_dashboard',
-      return_value=('test-master-url', None))
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.cleanup',
-      return_value=None)
-  def test_clusters_cleanup_forcefully(self, mock_cleanup, mock_master_url):
-    clusters = ib.Clusters()
-    project = 'test-project'
-    region = 'test-region'
-    p1 = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    p2 = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    cluster_metadata_1 = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[str(
-        id(p1))] = DataprocClusterManager(cluster_metadata_1)
-    clusters.dataproc_cluster_managers[str(id(p1))].master_url = 'test_url'
-    clusters.master_urls_to_pipelines['test_url'].append(str(id(p1)))
-    cluster_metadata_2 = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[str(
-        id(p1))] = DataprocClusterManager(cluster_metadata_2)
-    clusters.dataproc_cluster_managers[str(id(p1))].master_url = 'test_url'
-    clusters.master_urls_to_pipelines['test_url'].append(str(id(p2)))
-    from apache_beam.runners.interactive.interactive_beam import _LOGGER
-    with self.assertLogs(_LOGGER, level='WARNING') as context_manager:
-      clusters.cleanup(p1, force=True)
-      self.assertTrue('forcefully cleaned up' in context_manager.output[0])
-
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.get_master_url_and_dashboard',
-      return_value=('test-master-url', None))
-  def test_clusters_cleanup_skip_on_duplicate(self, mock_master_url):
-    clusters = ib.Clusters()
-    project = 'test-project'
-    region = 'test-region'
-    p1 = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    p2 = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    cluster_metadata_1 = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[str(
-        id(p1))] = DataprocClusterManager(cluster_metadata_1)
-    clusters.dataproc_cluster_managers[str(id(p1))].master_url = 'test_url'
-    clusters.master_urls_to_pipelines['test_url'].append(str(id(p1)))
-    cluster_metadata_2 = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[str(
-        id(p1))] = DataprocClusterManager(cluster_metadata_2)
-    clusters.dataproc_cluster_managers[str(id(p1))].master_url = 'test_url'
-    clusters.master_urls_to_pipelines['test_url'].append(str(id(p2)))
-    from apache_beam.runners.interactive.interactive_beam import _LOGGER
-    with self.assertLogs(_LOGGER, level='WARNING') as context_manager:
-      clusters.cleanup(p1)
-      self.assertTrue('skipping deletion' in context_manager.output[0])
-
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.cleanup',
-      return_value=None)
-  def test_clusters_cleanup_otherwise(self, mock_cleanup):
-    clusters = ie.current_env().clusters
-    project = 'test-project'
-    region = 'test-region'
-    p = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[str(
-        id(p))] = DataprocClusterManager(cluster_metadata)
-    clusters.dataproc_cluster_managers[str(id(p))].master_url = 'test_url'
-    clusters.cleanup(p)
-
-  def test_delete_cluster(self):
-    clusters = ie.current_env().clusters
-
-    class MockClusterManager:
-      master_url = 'test-url'
-
-      def cleanup(self):
-        pass
-
-    master_url = 'test-url'
-    cluster_name = 'test-cluster'
-    project = 'test-project'
-    region = 'test-region'
-    metadata = MasterURLIdentifier(project, region, cluster_name)
-
-    p = beam.Pipeline(ir.InteractiveRunner())
-    ie.current_env()._tracked_user_pipelines.add_user_pipeline(p)
-    clusters.master_urls[master_url] = metadata
-    clusters.master_urls_to_dashboards[master_url] = 'test-dashboard'
-    clusters.dataproc_cluster_managers[str(id(p))] = MockClusterManager()
-    clusters.master_urls_to_pipelines[master_url] = [str(id(p))]
-
-    cluster_id = obfuscate(project, region, cluster_name)
-    ie.current_env().inspector._clusters[cluster_id] = {
-        'master_url': master_url, 'pipelines': [str(id(p))]
-    }
-    clusters.delete_cluster(
-        ie.current_env().inspector.get_cluster_master_url(cluster_id))
-    self.assertEqual(clusters.master_urls, {})
-    self.assertEqual(clusters.master_urls_to_pipelines, {})
-
-  def test_set_default_cluster(self):
-    clusters = ie.current_env().clusters
-    master_url = 'test-url'
-    cluster_name = 'test-cluster'
-    project = 'test-project'
-    region = 'test-region'
-    pipelines = ['pid']
-    dashboard = 'test-dashboard'
-
-    cluster_id = obfuscate(project, region, cluster_name)
-    ie.current_env().inspector._clusters = {
-        cluster_id: {
-            'cluster_name': cluster_name,
-            'project': project,
-            'region': region,
-            'master_url': master_url,
-            'dashboard': dashboard,
-            'pipelines': pipelines
-        }
-    }
-    clusters.master_urls[master_url] = MasterURLIdentifier(
-        project, region, cluster_name)
-    clusters.set_default_cluster(
-        ie.current_env().inspector.get_cluster_master_url(cluster_id))
+    self.clusters = self.current_env.clusters
+
+  def test_cluster_metadata_pass_through_metadata(self):
+    cid = ClusterMetadata(project_id='test-project')
+    meta = self.clusters.cluster_metadata(cid)
+    self.assertIs(meta, cid)
+
+  def test_cluster_metadata_identifies_pipeline(self):
+    cid = beam.Pipeline()
+    known_meta = ClusterMetadata(project_id='test-project')
+    dcm = DataprocClusterManager(known_meta)
+    self.clusters.pipelines[cid] = dcm
+
+    meta = self.clusters.cluster_metadata(cid)
+    self.assertIs(meta, known_meta)
+
+  def test_cluster_metadata_identifies_master_url(self):
+    cid = 'test-url'
+    known_meta = ClusterMetadata(project_id='test-project')
+    _ = DataprocClusterManager(known_meta)
+    self.clusters.master_urls[cid] = known_meta
+
+    meta = self.clusters.cluster_metadata(cid)
+    self.assertIs(meta, known_meta)
+
+  def test_cluster_metadata_default_value(self):
+    cid_none = None
+    cid_unknown_p = beam.Pipeline()
+    cid_unknown_master_url = 'test-url'
+    default_meta = ClusterMetadata(project_id='test-project')
+    self.clusters.set_default_cluster(default_meta)
+
+    self.assertIs(default_meta, self.clusters.cluster_metadata(cid_none))
+    self.assertIs(default_meta, self.clusters.cluster_metadata(cid_unknown_p))
+    self.assertIs(
+        default_meta, self.clusters.cluster_metadata(cid_unknown_master_url))
+
+  def test_create_a_new_cluster(self):
+    meta = ClusterMetadata(project_id='test-project')
+    _ = self.clusters.create(meta)
+
+    # Derived fields are populated.
+    self.assertTrue(meta.master_url.startswith('test-url'))
+    self.assertEqual(meta.dashboard, 'test-dashboard')
+    # The cluster is known.
+    self.assertIn(meta, self.clusters.dataproc_cluster_managers)
+    self.assertIn(meta.master_url, self.clusters.master_urls)
+    # The default cluster is updated to the created cluster.
+    self.assertIs(meta, self.clusters.default_cluster_metadata)
+
+  def test_create_but_reuse_a_known_cluster(self):
+    known_meta = ClusterMetadata(
+        project_id='test-project', region='test-region')
+    known_dcm = DataprocClusterManager(known_meta)
+    known_meta.master_url = 'test-url'
+    self.clusters.set_default_cluster(known_meta)
+    self.clusters.dataproc_cluster_managers[known_meta] = known_dcm
+    self.clusters.master_urls[known_meta.master_url] = known_meta
+
+    # Use an equivalent meta as the identifier to create a cluster.
+    cid_meta = ClusterMetadata(
+        project_id=known_meta.project_id,
+        region=known_meta.region,
+        cluster_name=known_meta.cluster_name)
+    dcm = self.clusters.create(cid_meta)
+    # The known cluster manager is returned.
+    self.assertIs(dcm, known_dcm)
+
+    # Then use an equivalent master_url as the identifier.
+    cid_master_url = known_meta.master_url
+    dcm = self.clusters.create(cid_master_url)
+    self.assertIs(dcm, known_dcm)
+
+  def test_cleanup_by_a_pipeline(self):
+    meta = ClusterMetadata(project_id='test-project')
+    dcm = self.clusters.create(meta)
+
+    # Set up the association between a pipeline and a cluster.
+    # In real code, it's set by the runner the 1st time a pipeline is executed.
+    options = PipelineOptions()
+    options.view_as(FlinkRunnerOptions).flink_master = meta.master_url
+    p = beam.Pipeline(options=options)
+    self.clusters.pipelines[p] = dcm
+    dcm.pipelines.add(p)
+
+    self.clusters.cleanup(p)
+    # Delete the cluster.
+    self.m_delete_cluster.assert_called_once()
+    # Pipeline association is cleaned up.
+    self.assertNotIn(p, self.clusters.pipelines)
+    self.assertNotIn(p, dcm.pipelines)
+    self.assertEqual(options.view_as(FlinkRunnerOptions).flink_master, '[auto]')
+    # The cluster is unknown now.
+    self.assertNotIn(meta, self.clusters.dataproc_cluster_managers)
+    self.assertNotIn(meta.master_url, self.clusters.master_urls)
+    # The cleaned up cluster is also the default cluster. Clean the default.
+    self.assertIsNone(self.clusters.default_cluster_metadata)
+
+  def test_not_cleanup_if_multiple_pipelines_share_a_manager(self):
+    meta = ClusterMetadata(project_id='test-project')
+    dcm = self.clusters.create(meta)
+
+    options = PipelineOptions()
+    options.view_as(FlinkRunnerOptions).flink_master = meta.master_url
+    options2 = PipelineOptions()
+    options2.view_as(FlinkRunnerOptions).flink_master = meta.master_url
+    p = beam.Pipeline(options=options)
+    p2 = beam.Pipeline(options=options2)
+    self.clusters.pipelines[p] = dcm
+    self.clusters.pipelines[p2] = dcm
+    dcm.pipelines.add(p)
+    dcm.pipelines.add(p2)
+
+    self.clusters.cleanup(p)
+    # No cluster deleted.
+    self.m_delete_cluster.assert_not_called()
+    # Pipeline association of p is cleaned up.
+    self.assertNotIn(p, self.clusters.pipelines)
+    self.assertNotIn(p, dcm.pipelines)
+    self.assertEqual(options.view_as(FlinkRunnerOptions).flink_master, '[auto]')
+    # Pipeline association of p2 still presents.
+    self.assertIn(p2, self.clusters.pipelines)
+    self.assertIn(p2, dcm.pipelines)
     self.assertEqual(
-        MasterURLIdentifier(project, region, cluster_name),
-        clusters.default_cluster_metadata)
+        options2.view_as(FlinkRunnerOptions).flink_master, meta.master_url)
+    # The cluster is still known.
+    self.assertIn(meta, self.clusters.dataproc_cluster_managers)
+    self.assertIn(meta.master_url, self.clusters.master_urls)
+    # The default cluster still presents.
+    self.assertIs(meta, self.clusters.default_cluster_metadata)
+
+  def test_cleanup_by_a_master_url(self):
+    meta = ClusterMetadata(project_id='test-project')
+    _ = self.clusters.create(meta)
+
+    self.clusters.cleanup(meta.master_url)
+    self.m_delete_cluster.assert_called_once()
+    self.assertNotIn(meta, self.clusters.dataproc_cluster_managers)
+    self.assertNotIn(meta.master_url, self.clusters.master_urls)
+    self.assertIsNone(self.clusters.default_cluster_metadata)
+
+  def test_cleanup_by_meta(self):
+    known_meta = ClusterMetadata(
+        project_id='test-project', region='test-region')
+    _ = self.clusters.create(known_meta)
+
+    meta = ClusterMetadata(
+        project_id=known_meta.project_id,
+        region=known_meta.region,
+        cluster_name=known_meta.cluster_name)
+    self.clusters.cleanup(meta)
+    self.m_delete_cluster.assert_called_once()
+    self.assertNotIn(known_meta, self.clusters.dataproc_cluster_managers)
+    self.assertNotIn(known_meta.master_url, self.clusters.master_urls)
+    self.assertIsNone(self.clusters.default_cluster_metadata)
+
+  def test_force_cleanup_everything(self):
+    meta = ClusterMetadata(project_id='test-project')
+    meta2 = ClusterMetadata(project_id='test-project-2')
+    _ = self.clusters.create(meta)
+    _ = self.clusters.create(meta2)
+
+    self.clusters.cleanup(force=True)
+    self.assertEqual(self.m_delete_cluster.call_count, 2)
+    self.assertNotIn(meta, self.clusters.dataproc_cluster_managers)
+    self.assertNotIn(meta2, self.clusters.dataproc_cluster_managers)
+    self.assertIsNone(self.clusters.default_cluster_metadata)
+
+  def test_cleanup_noop_for_no_cluster_identifier(self):
+    meta = ClusterMetadata(project_id='test-project')
+    _ = self.clusters.create(meta)
+
+    self.clusters.cleanup()
+    self.m_delete_cluster.assert_not_called()
+
+  def test_cleanup_noop_unknown_cluster(self):
+    meta = ClusterMetadata(project_id='test-project')
+    dcm = self.clusters.create(meta)
+    p = beam.Pipeline()
+    self.clusters.pipelines[p] = dcm
+    dcm.pipelines.add(p)
+
+    cid_pipeline = beam.Pipeline()
+    self.clusters.cleanup(cid_pipeline)
+    self.m_delete_cluster.assert_not_called()
+
+    cid_master_url = 'some-random-url'
+    self.clusters.cleanup(cid_master_url)
+    self.m_delete_cluster.assert_not_called()
+
+    cid_meta = ClusterMetadata(project_id='random-project')
+    self.clusters.cleanup(cid_meta)
+    self.m_delete_cluster.assert_not_called()
+
+    self.assertIn(meta, self.clusters.dataproc_cluster_managers)
+    self.assertIn(meta.master_url, self.clusters.master_urls)
+    self.assertIs(meta, self.clusters.default_cluster_metadata)
+    self.assertIn(p, self.clusters.pipelines)
+    self.assertIn(p, dcm.pipelines)
+
+  def test_describe_everything(self):
+    meta = ClusterMetadata(project_id='test-project')
+    meta2 = ClusterMetadata(
+        project_id='test-project', region='some-other-region')
+    _ = self.clusters.create(meta)
+    _ = self.clusters.create(meta2)
+
+    meta_list = self.clusters.describe()
+    self.assertEqual([meta, meta2], meta_list)
+
+  def test_describe_by_cluster_identifier(self):
+    known_meta = ClusterMetadata(project_id='test-project')
+    known_meta2 = ClusterMetadata(
+        project_id='test-project', region='some-other-region')
+    dcm = self.clusters.create(known_meta)
+    dcm2 = self.clusters.create(known_meta2)
+    p = beam.Pipeline()
+    p2 = beam.Pipeline()
+    self.clusters.pipelines[p] = dcm
+    dcm.pipelines.add(p)
+    self.clusters.pipelines[p2] = dcm2
+    dcm.pipelines.add(p2)
+
+    cid_pipeline = p
+    meta = self.clusters.describe(cid_pipeline)
+    self.assertIs(meta, known_meta)
+
+    cid_master_url = known_meta.master_url
+    meta = self.clusters.describe(cid_master_url)
+    self.assertIs(meta, known_meta)
+
+    cid_meta = ClusterMetadata(
+        project_id=known_meta.project_id,
+        region=known_meta.region,
+        cluster_name=known_meta.cluster_name)
+    meta = self.clusters.describe(cid_meta)
+    self.assertIs(meta, known_meta)
+
+  def test_describe_everything_when_cluster_identifer_unknown(self):
+    known_meta = ClusterMetadata(project_id='test-project')
+    known_meta2 = ClusterMetadata(
+        project_id='test-project', region='some-other-region')
+    dcm = self.clusters.create(known_meta)
+    dcm2 = self.clusters.create(known_meta2)
+    p = beam.Pipeline()
+    p2 = beam.Pipeline()
+    self.clusters.pipelines[p] = dcm
+    dcm.pipelines.add(p)
+    self.clusters.pipelines[p2] = dcm2
+    dcm.pipelines.add(p2)
+
+    cid_pipeline = beam.Pipeline()
+    meta_list = self.clusters.describe(cid_pipeline)
+    self.assertEqual([known_meta, known_meta2], meta_list)
+
+    cid_master_url = 'some-random-url'
+    meta_list = self.clusters.describe(cid_master_url)
+    self.assertEqual([known_meta, known_meta2], meta_list)
+
+    cid_meta = ClusterMetadata(project_id='some-random-project')
+    meta_list = self.clusters.describe(cid_meta)
+    self.assertEqual([known_meta, known_meta2], meta_list)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 5f3552f6770..94c9a442cbe 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -284,8 +284,7 @@ class InteractiveEnvironment(object):
       # we don't need to clean it up here.
       if cache_manager and pipeline_id not in self._recording_managers:
         cache_manager.cleanup()
-    # TODO(BEAM-14330): uncomment this once tests are refactored.
-    # self.clusters.cleanup()
+    self.clusters.cleanup(force=True)
 
   def cleanup(self, pipeline=None):
     """Cleans up cached states for the given pipeline. Noop if the given
@@ -375,12 +374,11 @@ class InteractiveEnvironment(object):
     given pipeline. If the pipeline is absent from the environment while
     create_if_absent is True, creates and returns a new file based cache
     manager for the pipeline."""
-    if self._is_in_ipython:
-      warnings.filterwarnings(
-          'ignore',
-          'options is deprecated since First stable release. References to '
-          '<pipeline>.options will not be supported',
-          category=DeprecationWarning)
+    warnings.filterwarnings(
+        'ignore',
+        'options is deprecated since First stable release. References to '
+        '<pipeline>.options will not be supported',
+        category=DeprecationWarning)
 
     cache_manager = self._cache_managers.get(str(id(pipeline)), None)
     pipeline_runner = detect_pipeline_runner(pipeline)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index a2dd24b57ef..4d5f3f36ce6 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -28,16 +28,17 @@ from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive.recording_manager import RecordingManager
 from apache_beam.runners.interactive.sql.sql_chain import SqlNode
+from apache_beam.runners.interactive.testing.mock_env import isolated_env
 
 # The module name is also a variable in module.
 _module_name = 'apache_beam.runners.interactive.interactive_environment_test'
 
 
+@isolated_env
 class InteractiveEnvironmentTest(unittest.TestCase):
   def setUp(self):
     self._p = beam.Pipeline()
     self._var_in_class_instance = 'a var in class instance'
-    ie.new_env()
 
   def assertVariableWatched(self, variable_name, variable_val):
     self.assertTrue(self._is_variable_watched(variable_name, variable_val))
@@ -164,8 +165,7 @@ class InteractiveEnvironmentTest(unittest.TestCase):
       mocked_atexit.assert_called_once()
 
   def test_cleanup_invoked_when_new_env_replace_not_none_env(self):
-    ie._interactive_beam_env = None
-    ie.new_env()
+    ie._interactive_beam_env = self.current_env
     with patch('apache_beam.runners.interactive.interactive_environment'
                '.InteractiveEnvironment.cleanup') as mocked_cleanup:
       ie.new_env()
@@ -211,7 +211,6 @@ class InteractiveEnvironmentTest(unittest.TestCase):
         env.get_cache_manager(dummy_pipeline, create_if_absent=True))
 
   def test_track_user_pipeline_cleanup_non_inspectable_pipeline(self):
-    ie.new_env()
     dummy_pipeline_1 = beam.Pipeline()
     dummy_pipeline_2 = beam.Pipeline()
     dummy_pipeline_3 = beam.Pipeline()
@@ -259,36 +258,24 @@ class InteractiveEnvironmentTest(unittest.TestCase):
     self.assertSetEqual(ie.current_env().computed_pcollections, {not_evicted})
 
   def test_set_get_recording_manager(self):
-    ie._interactive_beam_env = None
-    ie.new_env()
-
     p = beam.Pipeline()
     rm = RecordingManager(p)
     ie.current_env().set_recording_manager(rm, p)
     self.assertIs(rm, ie.current_env().get_recording_manager(p))
 
   def test_recording_manager_create_if_absent(self):
-    ie._interactive_beam_env = None
-    ie.new_env()
-
     p = beam.Pipeline()
     self.assertFalse(ie.current_env().get_recording_manager(p))
     self.assertTrue(
         ie.current_env().get_recording_manager(p, create_if_absent=True))
 
   def test_evict_recording_manager(self):
-    ie._interactive_beam_env = None
-    ie.new_env()
-
     p = beam.Pipeline()
     self.assertFalse(ie.current_env().get_recording_manager(p))
     self.assertTrue(
         ie.current_env().get_recording_manager(p, create_if_absent=True))
 
   def test_describe_all_recordings(self):
-    ie._interactive_beam_env = None
-    ie.new_env()
-
     self.assertFalse(ie.current_env().describe_all_recordings())
 
     p1 = beam.Pipeline()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 3c356f1331f..b3909df769a 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -23,18 +23,18 @@ This module is experimental. No backwards-compatibility guarantees.
 # pytype: skip-file
 
 import logging
-import warnings
 
 import apache_beam as beam
 from apache_beam import runners
 from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_instrument as inst
 from apache_beam.runners.interactive import background_caching_job
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.options import capture_control
 from apache_beam.runners.interactive.utils import to_element_list
@@ -137,17 +137,11 @@ class InteractiveRunner(runners.PipelineRunner):
     watch_sources(pipeline)
 
     user_pipeline = ie.current_env().user_pipeline(pipeline)
-    if user_pipeline:
-      # When the underlying_runner is a FlinkRunner instance, create a
-      # corresponding DataprocClusterManager for it if no flink_master_url
-      # is provided.
-      master_url = self._get_dataproc_cluster_master_url_if_applicable(
-          user_pipeline)
-      if master_url:
-        flink_options = options.view_as(FlinkRunnerOptions)
-        flink_options.flink_master = master_url
-        flink_options.flink_version = ie.current_env(
-        ).clusters.DATAPROC_FLINK_VERSION
+
+    from apache_beam.runners.portability.flink_runner import FlinkRunner
+    if isinstance(self._underlying_runner, FlinkRunner):
+      self.configure_for_flink(user_pipeline, options)
+
     pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)
 
     # The user_pipeline analyzed might be None if the pipeline given has nothing
@@ -224,77 +218,45 @@ class InteractiveRunner(runners.PipelineRunner):
 
     return main_job_result
 
-  # TODO(victorhc): Move this method somewhere else if performance is impacted
-  # by generating a cluster during runtime.
-  def _get_dataproc_cluster_master_url_if_applicable(
-      self, user_pipeline: beam.Pipeline) -> str:
-    """ Creates a Dataproc cluster if the provided user_pipeline is running
-    FlinkRunner and no flink_master_url was provided as an option. A cluster
-    is not created when a flink_master_url is detected.
-
-    Example pipeline options to enable automatic Dataproc cluster creation:
-      options = PipelineOptions([
-      '--runner=FlinkRunner',
-      '--project=my-project',
-      '--region=my-region',
-      '--environment_type=DOCKER'
-      ])
-
-    Example pipeline options to skip automatic Dataproc cluster creation:
-      options = PipelineOptions([
-      '--runner=FlinkRunner',
-      '--flink_master=example.internal:41979',
-      '--environment_type=DOCKER'
-      ])
+  def configure_for_flink(
+      self, user_pipeline: beam.Pipeline, options: PipelineOptions) -> None:
+    """Tunes the pipeline options for the setup of running a job with Flink.
     """
-    from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
-    from apache_beam.runners.portability.flink_runner import FlinkRunner
-    flink_master = user_pipeline.options.view_as(
-        FlinkRunnerOptions).flink_master
     clusters = ie.current_env().clusters
-    # Only consider this logic when both below 2 conditions apply.
-    if isinstance(self._underlying_runner,
-                  FlinkRunner) and clusters.dataproc_cluster_managers.get(
-                      str(id(user_pipeline)), None) is None:
-      if flink_master == '[auto]':
-        # The above condition is True when the user has not provided a
-        # flink_master.
-        if ie.current_env()._is_in_ipython:
-          warnings.filterwarnings(
-              'ignore',
-              'options is deprecated since First stable release. References to '
-              '<pipeline>.options will not be supported',
-              category=DeprecationWarning)
-        project_id = (user_pipeline.options.view_as(GoogleCloudOptions).project)
-        region = (user_pipeline.options.view_as(GoogleCloudOptions).region)
-        if not project_id:
-          # When a Google Cloud project is not specified, we try to set the
-          # cluster_metadata to be the default value set from the
-          # 'Manage Clusters' JupyterLab extension. If a value has not been
-          # specified, this value defaults to None.
-          cluster_metadata = ie.current_env().clusters.default_cluster_metadata
+    if clusters.pipelines.get(user_pipeline, None):
+      # Noop for a known pipeline using a known Dataproc cluster.
+      return
+    flink_master = options.view_as(FlinkRunnerOptions).flink_master
+    cluster_metadata = clusters.default_cluster_metadata
+    if flink_master == '[auto]':
+      # Try to create/reuse a cluster when no flink_master is given.
+      project_id = options.view_as(GoogleCloudOptions).project
+      region = options.view_as(GoogleCloudOptions).region
+      if project_id:
+        if clusters.default_cluster_metadata:
+          # Reuse the cluster name from default in case of a known cluster.
+          cluster_metadata = ClusterMetadata(
+              project_id=project_id,
+              region=region,
+              cluster_name=clusters.default_cluster_metadata.cluster_name)
         else:
-          cluster_name = ie.current_env().clusters.default_cluster_name
-          cluster_metadata = MasterURLIdentifier(
-              project_id=project_id, region=region, cluster_name=cluster_name)
-      else:
-        cluster_metadata = clusters.master_urls.get(flink_master, None)
-      # else noop, no need to log anything because we allow a master_url
-      # (not managed by us) provided by the user.
-      if cluster_metadata:
-        # create the cluster_manager and populate dicts in the clusters
-        # instance if the pipeline is not already mapped to an existing
-        # cluster_manager.
-        cluster_manager = DataprocClusterManager(cluster_metadata)
-        cluster_manager.create_flink_cluster()
-        clusters.master_urls[cluster_manager.master_url] = cluster_metadata
-        clusters.dataproc_cluster_managers[str(
-            id(user_pipeline))] = cluster_manager
-        clusters.master_urls_to_pipelines[cluster_manager.master_url].append(
-            str(id(user_pipeline)))
-        clusters.master_urls_to_dashboards[
-            cluster_manager.master_url] = cluster_manager.dashboard
-        return cluster_manager.master_url
+          # Generate the metadata with a new unique cluster name.
+          cluster_metadata = ClusterMetadata(
+              project_id=project_id, region=region)
+      # else use the default cluster metadata.
+    elif flink_master in clusters.master_urls:
+      cluster_metadata = clusters.cluster_metadata(flink_master)
+    else:  # Noop if a self-hosted Flink is in use.
+      return
+    if not cluster_metadata:
+      return  # Not even a default cluster to create/reuse, run Flink locally.
+    dcm = clusters.create(cluster_metadata)
+    # Side effects associated with the user_pipeline.
+    clusters.pipelines[user_pipeline] = dcm
+    dcm.pipelines.add(user_pipeline)
+    flink_options = options.view_as(FlinkRunnerOptions)
+    flink_options.flink_master = dcm.cluster_metadata.master_url
+    flink_options.flink_version = clusters.DATAPROC_FLINK_VERSION
 
 
 class PipelineResult(beam.runners.runner.PipelineResult):
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index ff954f0dae2..d6ead00b692 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -25,20 +25,22 @@ This module is experimental. No backwards-compatibility guarantees.
 import sys
 import unittest
 from typing import NamedTuple
-from unittest.mock import patch
 
 import pandas as pd
 
 import apache_beam as beam
 from apache_beam.dataframe.convert import to_dataframe
+from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
-from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
+from apache_beam.runners.interactive.testing.mock_env import isolated_env
+from apache_beam.runners.portability.flink_runner import FlinkRunner
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import IntervalWindow
@@ -62,6 +64,7 @@ class Record(NamedTuple):
   height: int
 
 
+@isolated_env
 class InteractiveRunnerTest(unittest.TestCase):
   @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
   def test_basic(self):
@@ -262,17 +265,16 @@ class InteractiveRunnerTest(unittest.TestCase):
   @unittest.skipIf(
       not ie.current_env().is_interactive_ready,
       '[interactive] dependency is not installed.')
-  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
-  def test_mark_pcollection_completed_after_successful_run(self, cell):
-    with cell:  # Cell 1
+  def test_mark_pcollection_completed_after_successful_run(self):
+    with self.cell:  # Cell 1
       p = beam.Pipeline(interactive_runner.InteractiveRunner())
       ib.watch({'p': p})
 
-    with cell:  # Cell 2
+    with self.cell:  # Cell 2
       # pylint: disable=bad-option-value
       init = p | 'Init' >> beam.Create(range(5))
 
-    with cell:  # Cell 3
+    with self.cell:  # Cell 3
       square = init | 'Square' >> beam.Map(lambda x: x * x)
       cube = init | 'Cube' >> beam.Map(lambda x: x**3)
 
@@ -414,16 +416,14 @@ class InteractiveRunnerTest(unittest.TestCase):
       not ie.current_env().is_interactive_ready,
       '[interactive] dependency is not installed.')
   @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
-  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
-  def test_dataframe_caching(self, cell):
-
+  def test_dataframe_caching(self):
     # Create a pipeline that exercises the DataFrame API. This will also use
     # caching in the background.
-    with cell:  # Cell 1
+    with self.cell:  # Cell 1
       p = beam.Pipeline(interactive_runner.InteractiveRunner())
       ib.watch({'p': p})
 
-    with cell:  # Cell 2
+    with self.cell:  # Cell 2
       data = p | beam.Create([
           1, 2, 3
       ]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
@@ -433,11 +433,11 @@ class InteractiveRunnerTest(unittest.TestCase):
 
       ib.collect(df)
 
-    with cell:  # Cell 3
+    with self.cell:  # Cell 3
       df['output'] = df['square'] * df['cube']
       ib.collect(df)
 
-    with cell:  # Cell 4
+    with self.cell:  # Cell 4
       df['output'] = 0
       ib.collect(df)
 
@@ -485,74 +485,105 @@ class InteractiveRunnerTest(unittest.TestCase):
       self.assertEqual(producer, prev_producer, trace_string)
       prev_producer = consumer
 
-  @unittest.skipIf(
-      not ie.current_env().is_interactive_ready,
-      '[interactive] dependency is not installed.')
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.create_flink_cluster',
-      return_value=None)
-  @patch('apache_beam.runners.interactive.interactive_environment.current_env')
-  def test_get_master_url_no_flink_master_or_provided_master_url(
-      self, m_env, mock_create_cluster):
-    clusters = ib.Clusters()
-    m_env().clusters = clusters
-
-    from apache_beam.runners.portability.flink_runner import FlinkRunner
+
+@unittest.skipIf(
+    not ie.current_env().is_interactive_ready,
+    '[interactive] dependency is not installed.')
+@isolated_env
+class TuneForFlinkTest(unittest.TestCase):
+  def test_create_a_new_cluster_for_a_new_pipeline(self):
+    clusters = self.current_env.clusters
     runner = interactive_runner.InteractiveRunner(
         underlying_runner=FlinkRunner())
-    p = beam.Pipeline(
-        options=PipelineOptions(
-            project='test-project',
-            region='test-region',
-        ))
-    runner._get_dataproc_cluster_master_url_if_applicable(p)
+    options = PipelineOptions(project='test-project', region='test-region')
+    p = beam.Pipeline(runner=runner, options=options)
+    runner.configure_for_flink(p, options)
+
+    # Fetch the metadata and assert all side effects.
+    meta = clusters.cluster_metadata(p)
+    # The metadata should have all fields populated.
+    self.assertEqual(meta.project_id, 'test-project')
+    self.assertEqual(meta.region, 'test-region')
+    self.assertTrue(meta.cluster_name.startswith('interactive-beam-'))
+    self.assertTrue(meta.master_url.startswith('test-url'))
+    self.assertEqual(meta.dashboard, 'test-dashboard')
+    # The cluster is known now.
+    self.assertIn(meta, clusters.dataproc_cluster_managers)
+    self.assertIn(meta.master_url, clusters.master_urls)
+    self.assertIn(p, clusters.pipelines)
+    # The default cluster is updated to the created cluster.
+    self.assertIs(meta, clusters.default_cluster_metadata)
+    # The pipeline options is tuned for execution on the cluster.
+    flink_options = options.view_as(FlinkRunnerOptions)
+    self.assertEqual(flink_options.flink_master, meta.master_url)
     self.assertEqual(
-        clusters.describe(p)['cluster_metadata'].project_id, 'test-project')
+        flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
 
-  @unittest.skipIf(
-      not ie.current_env().is_interactive_ready,
-      '[interactive] dependency is not installed.')
-  @patch('apache_beam.runners.interactive.interactive_environment.current_env')
-  def test_get_master_url_no_flink_master_and_master_url_exists(self, m_env):
-    clusters = ib.Clusters()
-    m_env().clusters = clusters
-
-    from apache_beam.runners.portability.flink_runner import FlinkRunner
+  def test_reuse_a_cluster_for_a_known_pipeline(self):
+    clusters = self.current_env.clusters
     runner = interactive_runner.InteractiveRunner(
         underlying_runner=FlinkRunner())
-    p = beam.Pipeline(
-        options=PipelineOptions(
-            project='test-project',
-            region='test-region',
-        ))
-    cluster_name = clusters.default_cluster_name
-    cluster_metadata = MasterURLIdentifier(
-        project_id='test-project',
-        region='test-region',
-        cluster_name=cluster_name)
-    clusters.master_urls['test-url'] = cluster_metadata
-    clusters.master_urls_to_dashboards['test-url'] = 'test-dashboard'
-    flink_master = runner._get_dataproc_cluster_master_url_if_applicable(p)
+    options = PipelineOptions(project='test-project', region='test-region')
+    p = beam.Pipeline(runner=runner, options=options)
+    meta = ClusterMetadata(project_id='test-project', region='test-region')
+    dcm = DataprocClusterManager(meta)
+    # Configure the clusters so that the pipeline is known.
+    clusters.pipelines[p] = dcm
+    runner.configure_for_flink(p, options)
+
+    # A known cluster is reused.
+    tuned_meta = clusters.cluster_metadata(p)
+    self.assertIs(tuned_meta, meta)
+
+  def test_reuse_a_known_cluster_for_unknown_pipeline(self):
+    clusters = self.current_env.clusters
+    runner = interactive_runner.InteractiveRunner(
+        underlying_runner=FlinkRunner())
+    options = PipelineOptions(project='test-project', region='test-region')
+    p = beam.Pipeline(runner=runner, options=options)
+    meta = ClusterMetadata(project_id='test-project', region='test-region')
+    dcm = DataprocClusterManager(meta)
+    # Configure the clusters so that the cluster is known.
+    clusters.dataproc_cluster_managers[meta] = dcm
+    clusters.set_default_cluster(meta)
+    runner.configure_for_flink(p, options)
+
+    # A known cluster is reused.
+    tuned_meta = clusters.cluster_metadata(p)
+    self.assertIs(tuned_meta, meta)
+    # The pipeline is known.
+    self.assertIn(p, clusters.pipelines)
+    registered_dcm = clusters.pipelines[p]
+    self.assertIn(p, registered_dcm.pipelines)
+
+  def test_reuse_default_cluster_if_not_configured(self):
+    clusters = self.current_env.clusters
+    runner = interactive_runner.InteractiveRunner(
+        underlying_runner=FlinkRunner())
+    options = PipelineOptions()
+    # Pipeline is not configured to run on Cloud.
+    p = beam.Pipeline(runner=runner, options=options)
+    meta = ClusterMetadata(project_id='test-project', region='test-region')
+    meta.master_url = 'test-url'
+    meta.dashboard = 'test-dashboard'
+    dcm = DataprocClusterManager(meta)
+    # Configure the clusters so that a default cluster is known.
+    clusters.dataproc_cluster_managers[meta] = dcm
+    clusters.set_default_cluster(meta)
+    runner.configure_for_flink(p, options)
+
+    # The default cluster is used.
+    tuned_meta = clusters.cluster_metadata(p)
+    self.assertIs(tuned_meta, clusters.default_cluster_metadata)
+    # The pipeline is known.
+    self.assertIn(p, clusters.pipelines)
+    registered_dcm = clusters.pipelines[p]
+    self.assertIn(p, registered_dcm.pipelines)
+    # The pipeline options is tuned for execution on the cluster.
+    flink_options = options.view_as(FlinkRunnerOptions)
+    self.assertEqual(flink_options.flink_master, tuned_meta.master_url)
     self.assertEqual(
-        clusters.describe(p)['cluster_metadata'].project_id, 'test-project')
-    self.assertEqual(flink_master, clusters.describe(p)['master_url'])
-
-  @unittest.skipIf(
-      not ie.current_env().is_interactive_ready,
-      '[interactive] dependency is not installed.')
-  @patch('apache_beam.runners.interactive.interactive_environment.current_env')
-  def test_get_master_url_flink_master_provided(self, m_env):
-    clusters = ib.Clusters()
-    m_env().clusters = clusters
-
-    runner = interactive_runner.InteractiveRunner()
-    from apache_beam.runners.portability.flink_runner import FlinkRunner
-    p = beam.Pipeline(
-        interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
-        options=PipelineOptions(flink_master='--flink_master=test.internal:1'))
-    runner._get_dataproc_cluster_master_url_if_applicable(p)
-    self.assertEqual(clusters.describe(), {})
+        flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
index 351beaa30d7..dd7ee947daa 100644
--- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
+++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
@@ -146,28 +146,24 @@ class InteractiveEnvironmentInspector(object):
     and pipelines. Furthermore, copies the mapping to self._clusters.
     """
     from apache_beam.runners.interactive import interactive_environment as ie
+
     clusters = ie.current_env().clusters
     all_cluster_data = {}
-    for master_url in clusters.master_urls:
-      cluster_metadata = clusters.master_urls[master_url]
-      project = cluster_metadata.project_id
-      region = cluster_metadata.region
-      name = cluster_metadata.cluster_name
-
-      all_cluster_data[obfuscate(project, region, name)] = {
-          'cluster_name': name,
-          'project': project,
-          'region': region,
-          'master_url': master_url,
-          'dashboard': clusters.master_urls_to_dashboards[master_url],
-          'pipelines': clusters.master_urls_to_pipelines[master_url]
+    for meta, dcm in clusters.dataproc_cluster_managers.items():
+      all_cluster_data[obfuscate(meta)] = {
+          'cluster_name': meta.cluster_name,
+          'project': meta.project_id,
+          'region': meta.region,
+          'master_url': meta.master_url,
+          'dashboard': meta.dashboard,
+          'pipelines': [str(id(p)) for p in dcm.pipelines]
       }
     self._clusters = all_cluster_data
     return all_cluster_data
 
-  def get_cluster_master_url(self, id: str) -> str:
-    """Returns the master_url corresponding to the provided cluster id."""
-    return self._clusters[id]['master_url']  # The id is guaranteed to exist.
+  def get_cluster_master_url(self, identifier: str) -> str:
+    """Returns the master_url corresponding to the obfuscated identifier."""
+    return self._clusters[identifier]['master_url']  # Guaranteed to exist.
 
 
 def inspect(ignore_synthetic=True):
diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
index fa39d61c623..816a0e86110 100644
--- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
+++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
@@ -21,15 +21,14 @@
 import json
 import sys
 import unittest
-from unittest.mock import patch
 
 import apache_beam as beam
 import apache_beam.runners.interactive.messaging.interactive_environment_inspector as inspector
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
-from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.runners.interactive.dataproc.types import ClusterMetadata
+from apache_beam.runners.interactive.testing.mock_env import isolated_env
 from apache_beam.runners.interactive.utils import obfuscate
 
 
@@ -38,20 +37,17 @@ from apache_beam.runners.interactive.utils import obfuscate
     '[interactive] dependency is not installed.')
 @unittest.skipIf(
     sys.version_info < (3, 7), 'The tests require at least Python 3.7 to work.')
+@isolated_env
 class InteractiveEnvironmentInspectorTest(unittest.TestCase):
-  def setUp(self):
-    ie.new_env()
-
-  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
-  def test_inspect(self, cell):
-    with cell:  # Cell 1
+  def test_inspect(self):
+    with self.cell:  # Cell 1
       pipeline = beam.Pipeline(ir.InteractiveRunner())
       # Early watch the pipeline so that cell re-execution can be handled.
       ib.watch({'pipeline': pipeline})
       # pylint: disable=bad-option-value
       pcoll = pipeline | 'Create' >> beam.Create(range(10))
 
-    with cell:  # Cell 2
+    with self.cell:  # Cell 2
       # Re-executes the line that created the pcoll causing the original
       # pcoll no longer inspectable.
       # pylint: disable=bad-option-value
@@ -71,13 +67,12 @@ class InteractiveEnvironmentInspectorTest(unittest.TestCase):
     for inspectable in inspector.inspect().items():
       self.assertTrue(inspectable in expected_inspectables.items())
 
-  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
-  def test_inspect_pipelines(self, cell):
-    with cell:  # Cell 1
+  def test_inspect_pipelines(self):
+    with self.cell:  # Cell 1
       pipeline_1 = beam.Pipeline(ir.InteractiveRunner())
       pipeline_2 = beam.Pipeline(ir.InteractiveRunner())
 
-    with cell:  # Cell 2
+    with self.cell:  # Cell 2
       # Re-executes the line that created pipeline_1 causing the original
       # pipeline_1 no longer inspectable.
       pipeline_1 = beam.Pipeline(ir.InteractiveRunner())
@@ -103,15 +98,14 @@ class InteractiveEnvironmentInspectorTest(unittest.TestCase):
     for inspectable in inspector.inspect().items():
       self.assertTrue(inspectable in expected_inspectables.items())
 
-  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
-  def test_list_inspectables(self, cell):
-    with cell:  # Cell 1
+  def test_list_inspectables(self):
+    with self.cell:  # Cell 1
       pipeline = beam.Pipeline(ir.InteractiveRunner())
       # pylint: disable=bad-option-value
       pcoll_1 = pipeline | 'Create' >> beam.Create(range(10))
       pcoll_2 = pcoll_1 | 'Square' >> beam.Map(lambda x: x * x)
 
-    with cell:  # Cell 2
+    with self.cell:  # Cell 2
       # Re-executes the line that created pipeline causing the original
       # pipeline become an anonymous pipeline that is still inspectable because
       # its pcoll_1 and pcoll_2 are still inspectable.
@@ -141,14 +135,13 @@ class InteractiveEnvironmentInspectorTest(unittest.TestCase):
     actual_listings = ins.list_inspectables()
     self.assertEqual(actual_listings, json.dumps(expected_inspectable_list))
 
-  @patch('IPython.get_ipython', new_callable=mock_get_ipython)
-  def test_get_val(self, cell):
-    with cell:  # Cell 1
+  def test_get_val(self):
+    with self.cell:  # Cell 1
       pipeline = beam.Pipeline(ir.InteractiveRunner())
       # pylint: disable=bad-option-value
       pcoll = pipeline | 'Create' >> beam.Create(range(10))
 
-    with cell:  # Cell 2
+    with self.cell:  # Cell 2
       # Re-executes the line that created pipeline causing the original
       # pipeline become an anonymous pipeline that is still inspectable because
       # its pcoll is still inspectable.
@@ -190,29 +183,23 @@ class InteractiveEnvironmentInspectorTest(unittest.TestCase):
         actual_counts_with_window_info, expected_counts_with_window_info)
 
   def test_list_clusters(self):
-    master_url = 'test-url'
-    cluster_name = 'test-cluster'
-    project = 'test-project'
-    region = 'test-region'
-    pipelines = ['pid']
-    dashboard = 'test-dashboard'
-    ie.current_env().clusters.master_urls[master_url] = MasterURLIdentifier(
-        project, region, cluster_name)
-    ie.current_env().clusters.master_urls_to_pipelines[master_url] = pipelines
-    ie.current_env().clusters.master_urls_to_dashboards[master_url] = dashboard
-    ins = inspector.InteractiveEnvironmentInspector()
-    cluster_id = obfuscate(project, region, cluster_name)
+    meta = ClusterMetadata(project_id='project')
+    dcm = self.current_env.clusters.create(meta)
+    p = beam.Pipeline()
+    dcm.pipelines.add(p)
+    self.current_env.clusters.pipelines[p] = dcm
+    cluster_id = obfuscate(meta)
     self.assertEqual({
         cluster_id: {
-            'cluster_name': cluster_name,
-            'project': project,
-            'region': region,
-            'master_url': master_url,
-            'dashboard': dashboard,
-            'pipelines': pipelines
+            'cluster_name': meta.cluster_name,
+            'project': meta.project_id,
+            'region': meta.region,
+            'master_url': meta.master_url,
+            'dashboard': meta.dashboard,
+            'pipelines': [str(id(p)) for p in dcm.pipelines]
         }
     },
-                     json.loads(ins.list_clusters()))
+                     json.loads(self.current_env.inspector.list_clusters()))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager.py b/sdks/python/apache_beam/runners/interactive/recording_manager.py
index 1328f596108..2d272edaee2 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager.py
@@ -398,12 +398,11 @@ class RecordingManager:
     utils.watch_sources(self.user_pipeline)
 
     # Attempt to run background caching job to record any sources.
-    if ie.current_env().is_in_ipython:
-      warnings.filterwarnings(
-          'ignore',
-          'options is deprecated since First stable release. References to '
-          '<pipeline>.options will not be supported',
-          category=DeprecationWarning)
+    warnings.filterwarnings(
+        'ignore',
+        'options is deprecated since First stable release. References to '
+        '<pipeline>.options will not be supported',
+        category=DeprecationWarning)
     if bcj.attempt_to_run_background_caching_job(
         runner,
         self.user_pipeline,
diff --git a/sdks/python/apache_beam/runners/interactive/testing/mock_env.py b/sdks/python/apache_beam/runners/interactive/testing/mock_env.py
new file mode 100644
index 00000000000..11a4964de71
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/testing/mock_env.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of mocks to isolated the test environment for each Interactive Beam
+test.
+"""
+
+import unittest
+import uuid
+from unittest.mock import patch
+
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
+from apache_beam.runners.interactive.interactive_environment import InteractiveEnvironment
+from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+
+
+def isolated_env(cls: unittest.TestCase):
+  """A class decorator for unittest.TestCase to set up an isolated test
+  environment for Interactive Beam."""
+  class IsolatedInteractiveEnvironmentTest(cls):
+    def setUp(self):
+      self.env_patchers = []
+      interactive_path = 'apache_beam.runners.interactive'
+
+      if ie.current_env().is_interactive_ready:
+        # Group lines of code into a notebook/IPython cell:
+        # with self.cell:
+        #   arbitrary python code
+        #   ...
+        #   arbitrary python code
+        self.ipython_patcher = patch(
+            'IPython.get_ipython', new_callable=mock_get_ipython)
+        self.cell = self.ipython_patcher.start()
+        self.env_patchers.append(self.ipython_patcher)
+
+      # self.current_env IS interactive_environment.current_env() in tests.
+      self.ie_patcher = patch(
+          f'{interactive_path}.interactive_environment.current_env')
+      self.m_current_env = self.ie_patcher.start()
+      self.current_env = InteractiveEnvironment()
+      self.m_current_env.return_value = self.current_env
+      self.env_patchers.append(self.ie_patcher)
+
+      # Patches dataproc cluster creation and deletion.
+      self.create_cluster_patcher = patch.object(
+          DataprocClusterManager, 'create_flink_cluster', mock_create_cluster)
+      self.alt_create_cluster = self.create_cluster_patcher.start()
+      self.env_patchers.append(self.create_cluster_patcher)
+      self.delete_cluster_patcher = patch.object(
+          DataprocClusterManager, 'cleanup')
+      self.m_delete_cluster = self.delete_cluster_patcher.start()
+      self.env_patchers.append(self.delete_cluster_patcher)
+
+      super().setUp()
+
+    def tearDown(self):
+      super().tearDown()
+      # Explicitly calls the cleanup instead of letting it be called at exit
+      # when the mocks and isolated env instance are out of scope.
+      self.current_env.cleanup()
+      for patcher in reversed(self.env_patchers):
+        patcher.stop()
+
+  return IsolatedInteractiveEnvironmentTest
+
+
+def mock_create_cluster(self):
+  """Mocks a cluster creation and populates derived fields."""
+  self.cluster_metadata.master_url = 'test-url-' + uuid.uuid4().hex
+  self.cluster_metadata.dashboard = 'test-dashboard'
+
+
+# This file contains no tests. Below lines are purely for passing lint.
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py b/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
index bc3f0685da0..e8eb4c4108c 100644
--- a/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
+++ b/sdks/python/apache_beam/runners/interactive/testing/mock_ipython.py
@@ -45,6 +45,8 @@ def mock_get_ipython():
   class MockedGetIpython(object):
     def __init__(self):
       self._execution_count = 0
+      # Mock as if the kernel is connected to a notebook frontend.
+      self.config = {'IPKernelApp': 'mock'}
 
     def __call__(self):
       return self
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index 71305a5d976..cfc2a1a8637 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -55,47 +55,6 @@ _INTERACTIVE_LOG_STYLE = """
 """
 
 
-class bidict(dict):
-  """ Forces a 1:1 bidirectional mapping between key-value pairs.
-
-  Deletion is automatically handled both ways.
-
-  Example setting usage:
-    bd = bidict()
-    bd['foo'] = 'bar'
-
-    In this case, bd will contain the following values:
-      bd = {'foo': 'bar'}
-      bd.inverse = {'bar': 'foo'}
-
-  Example deletion usage:
-    bd = bidict()
-    bd['foo'] = 'bar'
-    del bd['foo']
-
-    In this case, bd and bd.inverse will both be {}.
-  """
-  def __init__(self):
-    self.inverse = {}
-
-  def __setitem__(self, key, value):
-    super().__setitem__(key, value)
-    self.inverse.setdefault(value, key)
-
-  def __delitem__(self, key):
-    if self[key] in self.inverse:
-      del self.inverse[self[key]]
-    super().__delitem__(key)
-
-  def clear(self):
-    super().clear()
-    self.inverse.clear()
-
-  def pop(self, key, default_value=None):
-    value = super().pop(key, default_value)
-    inverse_value = self.inverse.pop(value, default_value)
-    return value, inverse_value
-
 def to_element_list(
     reader,  # type: Generator[Union[beam_runner_api_pb2.TestStreamPayload.Event, WindowedValueHolder]]
     coder,  # type: Coder
@@ -255,8 +214,11 @@ class ProgressIndicator(object):
   # https://code.google.com/archive/p/google-ajax-apis/issues/637 is resolved.
   spinner_template = """
             <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" crossorigin="anonymous">
-            <div id="{id}" class="spinner-border text-info" role="status">
-            </div>"""
+            <div id="{id}">
+              <div class="spinner-border text-info" role="status"></div>
+              <span class="text-info">{text}</span>
+            </div>
+            """
   spinner_removal_template = """
             $("#{id}").remove();"""
 
@@ -273,7 +235,10 @@ class ProgressIndicator(object):
       from IPython.display import display
       from apache_beam.runners.interactive import interactive_environment as ie
       if ie.current_env().is_in_notebook:
-        display(HTML(self.spinner_template.format(id=self._id)))
+        display(
+            HTML(
+                self.spinner_template.format(
+                    id=self._id, text=self._enter_text)))
       else:
         display(self._enter_text)
     except ImportError as e:
@@ -308,7 +273,7 @@ def progress_indicated(func):
   execute the given function within."""
   @functools.wraps(func)
   def run_within_progress_indicator(*args, **kwargs):
-    with ProgressIndicator('Processing...', 'Done.'):
+    with ProgressIndicator(f'Processing... {func.__name__}', 'Done.'):
       return func(*args, **kwargs)
 
   return run_within_progress_indicator
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 66ad6aa5833..ecb71a2bdef 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -263,7 +263,7 @@ class ProgressIndicatorTest(unittest.TestCase):
 
       @utils.progress_indicated
       def progress_indicated_dummy():
-        mocked_display.assert_any_call('Processing...')
+        mocked_display.assert_any_call('Processing... progress_indicated_dummy')
 
       progress_indicated_dummy()
       mocked_display.assert_any_call('Done.')
@@ -380,36 +380,6 @@ class GCSUtilsTest(unittest.TestCase):
     utils.assert_bucket_exists('')
 
 
-class BidictTest(unittest.TestCase):
-  def test_inverse_set_correctly(self):
-    bd = utils.bidict()
-    bd['foo'] = 'bar'
-    self.assertEqual(bd.inverse['bar'], 'foo')
-    bd['foo'] = 'baz'
-    self.assertEqual(bd.inverse['baz'], 'foo')
-
-  def test_on_delete_remove_pair(self):
-    bd = utils.bidict()
-    bd['foo'] = 'bar'
-    del bd['foo']
-    self.assertEqual(bd, {})
-    self.assertEqual(bd.inverse, {})
-
-  def test_clear_bidirectionally(self):
-    bd = utils.bidict()
-    bd['foo'] = 'bar'
-    bd.clear()
-    self.assertEqual(bd, {})
-    self.assertEqual(bd.inverse, {})
-
-  def test_on_pop_pair(self):
-    bd = utils.bidict()
-    bd['foo'] = 'bar'
-    value, inverse_value = bd.pop('foo')
-    self.assertEqual(value, 'bar')
-    self.assertEqual(inverse_value, 'foo')
-
-
 class PipelineUtilTest(unittest.TestCase):
   def test_detect_pipeline_underlying_runner(self):
     p = beam.Pipeline(InteractiveRunner(underlying_runner=FlinkRunner()))