You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ni...@apache.org on 2022/03/02 19:08:39 UTC

[beam] branch master updated: [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904)

This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e4106b  [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904)
1e4106b is described below

commit 1e4106bf5cdc5a3166e9feceac2d09f533309d47
Author: Victor <ca...@victorplusc.com>
AuthorDate: Wed Mar 2 14:06:56 2022 -0500

    [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904)
---
 .../dataproc/dataproc_cluster_manager.py           | 153 ++++++++++++--
 .../dataproc/dataproc_cluster_manager_test.py      | 232 ++++++++++++++++++---
 .../runners/interactive/interactive_beam.py        |  13 +-
 .../runners/interactive/interactive_beam_test.py   |  17 +-
 .../runners/interactive/interactive_runner.py      |  23 +-
 .../runners/interactive/interactive_runner_test.py |  37 +++-
 .../apache_beam/runners/interactive/utils_test.py  |   3 -
 7 files changed, 412 insertions(+), 66 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
index d1c2734..4a9c688 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
@@ -18,10 +18,26 @@
 # pytype: skip-file
 
 import logging
+import re
+import time
 from dataclasses import dataclass
 from typing import Optional
+from typing import Tuple
 
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.utils import progress_indicated
+from apache_beam.version import __version__ as beam_version
+
+try:
+  from google.cloud import dataproc_v1
+  from apache_beam.io.gcp import gcsfilesystem  #pylint: disable=ungrouped-imports
+except ImportError:
+
+  class UnimportedDataproc:
+    Cluster = None
+
+  dataproc_v1 = UnimportedDataproc()
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -51,6 +67,9 @@ class DataprocClusterManager:
   required for creating and deleting Dataproc clusters for use
   under Interactive Beam.
   """
+  IMAGE_VERSION = '2.0.31-debian10'
+  STAGING_LOG_NAME = 'dataproc-startup-script_output'
+
   def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
     """Initializes the DataprocClusterManager with properties required
     to interface with the Dataproc ClusterControllerClient.
@@ -69,7 +88,6 @@ class DataprocClusterManager:
       self.cluster_metadata.cluster_name = ie.current_env(
       ).clusters.default_cluster_name
 
-    from google.cloud import dataproc_v1
     self._cluster_client = dataproc_v1.ClusterControllerClient(
         client_options={
             'api_endpoint': \
@@ -79,9 +97,16 @@ class DataprocClusterManager:
     if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse:
       self.master_url = ie.current_env().clusters.master_urls.inverse[
           self.cluster_metadata]
+      self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[
+          self.master_url]
     else:
       self.master_url = None
+      self.dashboard = None
 
+    self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
+    self._staging_directory = None
+
+  @progress_indicated
   def create_cluster(self, cluster: dict) -> None:
     """Attempts to create a cluster using attributes that were
     initialized with the DataprocClusterManager instance.
@@ -103,7 +128,10 @@ class DataprocClusterManager:
       _LOGGER.info(
           'Cluster created successfully: %s',
           self.cluster_metadata.cluster_name)
-      self.master_url = self.get_master_url(self.cluster_metadata)
+      self._staging_directory = self.get_staging_location(self.cluster_metadata)
+      self.master_url, self.dashboard = self.get_master_url_and_dashboard(
+        self.cluster_metadata,
+        self._staging_directory)
     except Exception as e:
       if e.code == 409:
         _LOGGER.info(
@@ -127,7 +155,6 @@ class DataprocClusterManager:
             'Unable to create cluster: %s', self.cluster_metadata.cluster_name)
         raise e
 
-  # TODO(victorhc): Add support for user-specified pip packages
   def create_flink_cluster(self) -> None:
     """Calls _create_cluster with a configuration that enables FlinkRunner."""
     cluster = {
@@ -135,11 +162,13 @@ class DataprocClusterManager:
         'cluster_name': self.cluster_metadata.cluster_name,
         'config': {
             'software_config': {
+                'image_version': self.IMAGE_VERSION,
                 'optional_components': ['DOCKER', 'FLINK']
             },
             'gce_cluster_config': {
                 'metadata': {
-                    'flink-start-yarn-session': 'true'
+                    'flink-start-yarn-session': 'true',
+                    'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version)
                 },
                 'service_account_scopes': [
                     'https://www.googleapis.com/auth/cloud-platform'
@@ -156,6 +185,8 @@ class DataprocClusterManager:
     """Deletes the cluster that uses the attributes initialized
     with the DataprocClusterManager instance."""
     try:
+      if self._staging_directory:
+        self.cleanup_staging_files(self._staging_directory)
       self._cluster_client.delete_cluster(
           request={
               'project_id': self.cluster_metadata.project_id,
@@ -186,15 +217,111 @@ class DataprocClusterManager:
     """Returns a dictionary describing the cluster."""
     return {
         'cluster_metadata': self.cluster_metadata,
-        'master_url': self.master_url
+        'master_url': self.master_url,
+        'dashboard': self.dashboard
     }
 
-  def get_master_url(self, identifier) -> None:
+  def get_cluster_details(
+      self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
+    """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
+    try:
+      return self._cluster_client.get_cluster(
+          request={
+              'project_id': cluster_metadata.project_id,
+              'region': cluster_metadata.region,
+              'cluster_name': cluster_metadata.cluster_name
+          })
+    except Exception as e:
+      if e.code == 403:
+        _LOGGER.error(
+            'Due to insufficient project permissions, '
+            'unable to retrieve information for cluster: %s',
+            cluster_metadata.cluster_name)
+        raise ValueError(
+            'You cannot view clusters in project: {}'.format(
+                cluster_metadata.project_id))
+      elif e.code == 404:
+        _LOGGER.error(
+            'Cluster does not exist: %s', cluster_metadata.cluster_name)
+        raise ValueError(
+            'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+      else:
+        _LOGGER.error(
+            'Failed to get information for cluster: %s',
+            cluster_metadata.cluster_name)
+        raise e
+
+  def wait_for_cluster_to_provision(
+      self, cluster_metadata: MasterURLIdentifier) -> None:
+    while self.get_cluster_details(
+        cluster_metadata).status.state.name == 'CREATING':
+      time.sleep(15)
+
+  def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str:
+    """Gets the staging bucket of an existing Dataproc cluster."""
+    try:
+      self.wait_for_cluster_to_provision(cluster_metadata)
+      cluster_details = self.get_cluster_details(cluster_metadata)
+      bucket_name = cluster_details.config.config_bucket
+      gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/'
+      for file in self._fs._list(gcs_path):
+        if cluster_metadata.cluster_name in file.path:
+          # this file path split will look something like:
+          # ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/',
+          # '-{node-type}/dataproc-startup-script_output']
+          return file.path.split(cluster_metadata.cluster_name)[0]
+    except Exception as e:
+      _LOGGER.error(
+          'Failed to get %s cluster staging bucket.',
+          cluster_metadata.cluster_name)
+      raise e
+
+  def parse_master_url_and_dashboard(
+      self, cluster_metadata: MasterURLIdentifier,
+      line: str) -> Tuple[str, str]:
+    """Parses the master_url and YARN application_id of the Flink process from
+    an input line. The line containing both the master_url and application id
+    is always formatted as such:
+    {text} Found Web Interface {master_url} of application
+    '{application_id}'.\\n
+
+    Truncated example where '...' represents additional text between segments:
+    ... google-dataproc-startup[000]: ... activate-component-flink[0000]:
+    ...org.apache.flink.yarn.YarnClusterDescriptor... [] -
+    Found Web Interface example-master-url:50000 of application
+    'application_123456789000_0001'.
+
+    Returns the flink_master_url and dashboard link as a tuple."""
+    cluster_details = self.get_cluster_details(cluster_metadata)
+    yarn_endpoint = cluster_details.config.endpoint_config.http_ports[
+        'YARN ResourceManager']
+    segment = line.split('Found Web Interface ')[1].split(' of application ')
+    master_url = segment[0]
+    application_id = re.sub('\'|.\n', '', segment[1])
+    dashboard = re.sub(
+        '/yarn/',
+        '/gateway/default/yarn/proxy/' + application_id + '/',
+        yarn_endpoint)
+    return master_url, dashboard
+
+  def get_master_url_and_dashboard(
+      self, cluster_metadata: MasterURLIdentifier,
+      staging_bucket) -> Tuple[Optional[str], Optional[str]]:
     """Returns the master_url of the current cluster."""
-    # TODO(victorhc): Implement the following method to fetch the cluster
-    # master_url from Dataproc.
-    return '.'.join([
-        self.cluster_metadata.project_id,
-        self.cluster_metadata.region,
-        self.cluster_metadata.cluster_name
-    ])
+    startup_logs = []
+    for file in self._fs._list(staging_bucket):
+      if self.STAGING_LOG_NAME in file.path:
+        startup_logs.append(file.path)
+
+    for log in startup_logs:
+      content = self._fs.open(log)
+      for line in content.readlines():
+        decoded_line = line.decode()
+        if 'Found Web Interface' in decoded_line:
+          return self.parse_master_url_and_dashboard(
+              cluster_metadata, decoded_line)
+    return None, None
+
+  def cleanup_staging_files(self, staging_directory: str) -> None:
+    staging_files = [file.path for file in self._fs._list(staging_directory)]
+    self._fs.delete(staging_files)
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
index b34641d..ba59cf6 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
@@ -15,25 +15,57 @@
 # limitations under the License.
 #
 
+"""Tests for apache_beam.runners.interactive.dataproc.
+dataproc_cluster_manager."""
 # pytype: skip-file
 
 import unittest
 from unittest.mock import patch
 
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier
+
 try:
   from google.cloud import dataproc_v1  # pylint: disable=unused-import
-  from apache_beam.runners.interactive.dataproc import dataproc_cluster_manager
 except ImportError:
   _dataproc_imported = False
 else:
   _dataproc_imported = True
 
 
+class MockProperty:
+  def __init__(self, property, value):
+    object.__setattr__(self, property, value)
+
+
 class MockException(Exception):
   def __init__(self, code=-1):
     self.code = code
 
 
+class MockCluster:
+  def __init__(self, config_bucket=None):
+    self.config = MockProperty('config_bucket', config_bucket)
+    self.status = MockProperty('state', MockProperty('name', None))
+
+
+class MockFileSystem:
+  def _list(self, dir=None):
+    return [MockProperty('path', 'test-path/dataproc-startup-script_output')]
+
+  def open(self, dir=None):
+    return MockFileIO('test-line Found Web Interface test-master-url' \
+    ' of application \'test-app-id\'.\n')
+
+
+class MockFileIO:
+  def __init__(self, contents):
+    self.contents = contents
+
+  def readlines(self):
+    return [self.contents.encode('utf-8')]
+
+
 @unittest.skipIf(not _dataproc_imported, 'dataproc package was not imported.')
 class DataprocClusterManagerTest(unittest.TestCase):
   """Unit test for DataprocClusterManager"""
@@ -45,10 +77,9 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that no exception is thrown when a cluster already exists,
     but is using ie.current_env().clusters.default_cluster_name.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='INFO') as context_manager:
       cluster_manager.create_cluster({})
@@ -62,10 +93,9 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when a user is trying to write to
     a project while having insufficient permissions.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
       self.assertRaises(ValueError, cluster_manager.create_cluster, {})
@@ -81,10 +111,9 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when a user specifies a region
     that does not exist.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
       self.assertRaises(ValueError, cluster_manager.create_cluster, {})
@@ -98,27 +127,29 @@ class DataprocClusterManagerTest(unittest.TestCase):
     Tests that an exception is thrown when the exception is not handled by
     any other case under _create_cluster.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
       self.assertRaises(MockException, cluster_manager.create_cluster, {})
       self.assertTrue('Unable to create cluster' in context_manager.output[0])
 
   @patch(
+      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+      'DataprocClusterManager.cleanup_staging_files',
+      return_value=None)
+  @patch(
       'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster',
       side_effect=MockException(403))
-  def test_cleanup_permission_denied(self, mock_cluster_client):
+  def test_cleanup_permission_denied(self, mock_cluster_client, mock_cleanup):
     """
     Tests that an exception is thrown when a user is trying to delete
     a project that they have insufficient permissions for.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
       self.assertRaises(ValueError, cluster_manager.cleanup)
@@ -127,39 +158,190 @@ class DataprocClusterManagerTest(unittest.TestCase):
           context_manager.output[0])
 
   @patch(
+      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+      'DataprocClusterManager.cleanup_staging_files',
+      return_value=None)
+  @patch(
       'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster',
       side_effect=MockException(404))
-  def test_cleanup_does_not_exist(self, mock_cluster_client):
+  def test_cleanup_does_not_exist(self, mock_cluster_client, mock_cleanup):
     """
     Tests that an exception is thrown when cleanup attempts to delete
     a cluster that does not exist.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
       self.assertRaises(ValueError, cluster_manager.cleanup)
       self.assertTrue('Cluster does not exist' in context_manager.output[0])
 
   @patch(
+      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+      'DataprocClusterManager.cleanup_staging_files',
+      return_value=None)
+  @patch(
       'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster',
       side_effect=MockException())
-  def test_cleanup_other_exception(self, mock_cluster_client):
+  def test_cleanup_other_exception(self, mock_cluster_client, mock_cleanup):
     """
     Tests that an exception is thrown when the exception is not handled by
     any other case under cleanup.
     """
-    cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+    cluster_metadata = MasterURLIdentifier(
         project_id='test-project', region='test-region')
-    cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
-        cluster_metadata)
+    cluster_manager = DataprocClusterManager(cluster_metadata)
     from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
     with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
       self.assertRaises(MockException, cluster_manager.cleanup)
       self.assertTrue('Failed to delete cluster' in context_manager.output[0])
 
+  @patch(
+      'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._list',
+      return_value=[
+          MockProperty(
+              'path',
+              'gs://test-bucket/google-cloud-dataproc-metainfo'
+              '/test-cluster/item')
+      ])
+  @patch(
+      'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+      return_value=MockCluster('test-bucket'))
+  def test_get_staging_location(self, mock_cluster_client, mock_list):
+    """
+    Test to receive a mock staging location successfully under
+    get_staging_location.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project',
+        region='test-region',
+        cluster_name='test-cluster')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    self.assertEqual(
+        cluster_manager.get_staging_location(cluster_metadata),
+        'gs://test-bucket/google-cloud-dataproc-metainfo/')
+
+  @patch(
+      'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+      side_effect=MockException())
+  def test_get_staging_location_exception(self, mock_cluster_client):
+    """
+    Test to catch when an error is raised inside get_staging_location.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project',
+        region='test-region',
+        cluster_name='test-cluster')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    with self.assertRaises(MockException):
+      cluster_manager.get_staging_location(cluster_metadata)
+
+  @patch(
+      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+      'DataprocClusterManager.get_cluster_details',
+      return_value=MockProperty(
+          'config',
+          MockProperty(
+              'endpoint_config',
+              MockProperty(
+                  'http_ports',
+                  {'YARN ResourceManager': 'test-resource-manager/yarn/'}))))
+  def test_parse_master_url_and_dashboard(self, mock_cluster_details):
+    """
+    Tests that parse_master_url_and_dashboard properly parses the input
+    string and produces a mock master_url and mock dashboard link.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project', region='test-region')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    line = 'test-line Found Web Interface test-master-url' \
+    ' of application \'test-app-id\'.\n'
+    master_url, dashboard = cluster_manager.parse_master_url_and_dashboard(
+      cluster_metadata, line)
+    self.assertEqual('test-master-url', master_url)
+    self.assertEqual(
+        'test-resource-manager/gateway/default/yarn/proxy/test-app-id/',
+        dashboard)
+
+  @patch(
+      'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+      side_effect=MockException(403))
+  def test_get_cluster_details_permission_denied(self, mock_cluster_client):
+    """
+    Tests that an exception is thrown when a user is trying to get information
+    for a project without sufficient permissions to do so.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project', region='test-region')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
+    with self.assertLogs(
+        _LOGGER,
+        level='ERROR') as context_manager, self.assertRaises(ValueError):
+      cluster_manager.get_cluster_details(cluster_metadata)
+      self.assertTrue(
+          'Due to insufficient project permissions' in
+          context_manager.output[0])
+
+  @patch(
+      'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+      side_effect=MockException(404))
+  def test_get_cluster_details_does_not_exist(self, mock_cluster_client):
+    """
+    Tests that an exception is thrown when cleanup attempts to get information
+    for a cluster that does not exist.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project', region='test-region')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
+    with self.assertLogs(
+        _LOGGER,
+        level='ERROR') as context_manager, self.assertRaises(ValueError):
+      cluster_manager.get_cluster_details(cluster_metadata)
+      self.assertTrue('Cluster does not exist' in context_manager.output[0])
+
+  @patch(
+      'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+      side_effect=MockException())
+  def test_get_cluster_details_other_exception(self, mock_cluster_client):
+    """
+    Tests that an exception is thrown when the exception is not handled by
+    any other case under get_cluster_details.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project', region='test-region')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
+    with self.assertLogs(
+        _LOGGER,
+        level='ERROR') as context_manager, self.assertRaises(MockException):
+      cluster_manager.get_cluster_details(cluster_metadata)
+      self.assertTrue(
+          'Failed to get information for cluster' in context_manager.output[0])
+
+  @patch(
+      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+      'DataprocClusterManager.parse_master_url_and_dashboard',
+      return_value=('test-master-url', 'test-dashboard-link'))
+  def test_get_master_url_and_dashboard(self, mock_parse_method):
+    """
+    Tests that get_master_url_and_dashboard detect the line containing the
+    unique substring which identifies the location of the master_url and
+    application id of the Flink master.
+    """
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project', region='test-region')
+    cluster_manager = DataprocClusterManager(cluster_metadata)
+    cluster_manager._fs = MockFileSystem()
+    master_url, dashboard = cluster_manager.get_master_url_and_dashboard(
+        cluster_metadata,
+        'test-staging-bucket'
+    )
+    self.assertEqual(master_url, 'test-master-url')
+    self.assertEqual(dashboard, 'test-dashboard-link')
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 5eab489..6098b07 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -364,20 +364,23 @@ class Clusters:
     # pipelines that use the corresponding master_url.
     self.master_urls_to_pipelines: DefaultDict[
         str, List[beam.Pipeline]] = defaultdict(list)
+    # self.master_urls_to_dashboards map string master_urls to the
+    # corresponding Apache Flink dashboards.
+    self.master_urls_to_dashboards: Dict[str, str] = {}
 
   def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
     """Returns a description of the cluster associated to the given pipeline.
 
     If no pipeline is given then this returns a dictionary of descriptions for
-    all pipelines.
+    all pipelines, mapped to by id.
     """
     description = {
-        ie.current_env().pipeline_id_to_pipeline(pid): dcm.describe()
+        pid: dcm.describe()
         for pid,
         dcm in self.dataproc_cluster_managers.items()
     }
     if pipeline:
-      return description.get(pipeline, None)
+      return description.get(str(id(pipeline)), None)
     return description
 
   def cleanup(
@@ -419,6 +422,7 @@ class Clusters:
           cluster_manager.cleanup()
           self.master_urls.pop(master_url, None)
           self.master_urls_to_pipelines.pop(master_url, None)
+          self.master_urls_to_dashboards.pop(master_url, None)
           self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
     else:
       cluster_manager_identifiers = set()
@@ -429,6 +433,7 @@ class Clusters:
       self.dataproc_cluster_managers.clear()
       self.master_urls.clear()
       self.master_urls_to_pipelines.clear()
+      self.master_urls_to_dashboards.clear()
 
 
 # Users can set options to guide how Interactive Beam works.
@@ -452,7 +457,7 @@ recordings = Recordings()
 # Examples:
 # ib.clusters.describe(p)
 # Check the docstrings for detailed usages.
-# TODO(victorhc): Implement all functionality for Clusters()
+# TODO(victorhc): Resolve connection issue and add a working example
 # clusters = Clusters()
 
 
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index b14848c..4541463 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -306,15 +306,16 @@ class InteractiveBeamClustersTest(unittest.TestCase):
             region=region,
         ))
     cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
-    clusters.dataproc_cluster_managers[p] = DataprocClusterManager(
-        cluster_metadata)
-    self.assertEqual('test-project', clusters.describe()[None] \
-    ['cluster_metadata'].project_id)
+    clusters.dataproc_cluster_managers[str(
+        id(p))] = DataprocClusterManager(cluster_metadata)
+    self.assertEqual(
+        'test-project',
+        clusters.describe()[str(id(p))]['cluster_metadata'].project_id)
 
   @patch(
       'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.get_master_url',
-      return_value='test-master-url')
+      'DataprocClusterManager.get_master_url_and_dashboard',
+      return_value=('test-master-url', None))
   @patch(
       'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
       'DataprocClusterManager.cleanup',
@@ -350,8 +351,8 @@ class InteractiveBeamClustersTest(unittest.TestCase):
 
   @patch(
       'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.get_master_url',
-      return_value='test-master-url')
+      'DataprocClusterManager.get_master_url_and_dashboard',
+      return_value=('test-master-url', None))
   def test_clusters_cleanup_skip_on_duplicate(self, mock_master_url):
     clusters = ib.Clusters()
     project = 'test-project'
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index d76d689..a384be9 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -27,6 +27,7 @@ import warnings
 
 import apache_beam as beam
 from apache_beam import runners
+from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.direct import direct_runner
@@ -137,6 +138,14 @@ class InteractiveRunner(runners.PipelineRunner):
     watch_sources(pipeline)
 
     user_pipeline = ie.current_env().user_pipeline(pipeline)
+    if user_pipeline:
+      # When the underlying_runner is a FlinkRunner instance, create a
+      # corresponding DataprocClusterManager for it if no flink_master_url
+      # is provided.
+      master_url = self._get_dataproc_cluster_master_url_if_applicable(
+          user_pipeline)
+      if master_url:
+        options.view_as(FlinkRunnerOptions).flink_master = master_url
     pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)
 
     # The user_pipeline analyzed might be None if the pipeline given has nothing
@@ -169,11 +178,6 @@ class InteractiveRunner(runners.PipelineRunner):
           ie.current_env().set_test_stream_service_controller(
               user_pipeline, test_stream_service)
 
-      # When the underlying_runner is a FlinkRunner instance, create a
-      # corresponding DataprocClusterManager for it if no flink_master_url
-      # is provided.
-      self._create_dataproc_cluster_if_applicable(user_pipeline)
-
     pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
         pipeline_instrument.instrumented_pipeline_proto(),
         self._underlying_runner,
@@ -220,7 +224,8 @@ class InteractiveRunner(runners.PipelineRunner):
 
   # TODO(victorhc): Move this method somewhere else if performance is impacted
   # by generating a cluster during runtime.
-  def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+  def _get_dataproc_cluster_master_url_if_applicable(
+      self, user_pipeline: beam.Pipeline) -> str:
     """ Creates a Dataproc cluster if the provided user_pipeline is running
     FlinkRunner and no flink_master_url was provided as an option. A cluster
     is not created when a flink_master_url is detected.
@@ -241,7 +246,6 @@ class InteractiveRunner(runners.PipelineRunner):
       ])
     """
     from apache_beam.runners.portability.flink_runner import FlinkRunner
-    from apache_beam.options.pipeline_options import FlinkRunnerOptions
     flink_master = user_pipeline.options.view_as(
         FlinkRunnerOptions).flink_master
     clusters = ie.current_env().clusters
@@ -264,7 +268,7 @@ class InteractiveRunner(runners.PipelineRunner):
         cluster_metadata = MasterURLIdentifier(
             project_id=project_id, region=region, cluster_name=cluster_name)
       else:
-        cluster_metadata = clusters.master_urls.inverse.get(flink_master, None)
+        cluster_metadata = clusters.master_urls.get(flink_master, None)
       # else noop, no need to log anything because we allow a master_url
       # (not managed by us) provided by the user.
       if cluster_metadata:
@@ -278,6 +282,9 @@ class InteractiveRunner(runners.PipelineRunner):
             id(user_pipeline))] = cluster_manager
         clusters.master_urls_to_pipelines[cluster_manager.master_url].append(
             str(id(user_pipeline)))
+        clusters.master_urls_to_dashboards[
+            cluster_manager.master_url] = cluster_manager.dashboard
+        return cluster_manager.master_url
 
 
 class PipelineResult(beam.runners.runner.PipelineResult):
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index de5f1a5..47dedd6 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -37,6 +37,7 @@ from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier
 from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.transforms.window import GlobalWindow
@@ -491,7 +492,7 @@ class InteractiveRunnerTest(unittest.TestCase):
       'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
       'DataprocClusterManager.create_flink_cluster',
       return_value=None)
-  def test_create_dataproc_cluster_no_flink_master_or_master_url(
+  def test_get_master_url_no_flink_master_or_provided_master_url(
       self, mock_create_cluster):
     from apache_beam.runners.portability.flink_runner import FlinkRunner
     runner = interactive_runner.InteractiveRunner(
@@ -501,8 +502,7 @@ class InteractiveRunnerTest(unittest.TestCase):
             project='test-project',
             region='test-region',
         ))
-    runner._create_dataproc_cluster_if_applicable(p)
-    ie.current_env()._tracked_user_pipelines.add_user_pipeline(p)
+    runner._get_dataproc_cluster_master_url_if_applicable(p)
     self.assertEqual(
         ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
         'test-project')
@@ -511,14 +511,41 @@ class InteractiveRunnerTest(unittest.TestCase):
   @unittest.skipIf(
       not ie.current_env().is_interactive_ready,
       '[interactive] dependency is not installed.')
-  def test_create_dataproc_cluster_flink_master_provided(self):
+  def test_get_master_url_no_flink_master_and_master_url_exists(self):
+    from apache_beam.runners.portability.flink_runner import FlinkRunner
+    runner = interactive_runner.InteractiveRunner(
+        underlying_runner=FlinkRunner())
+    p = beam.Pipeline(
+        options=PipelineOptions(
+            project='test-project',
+            region='test-region',
+        ))
+    cluster_name = ie.current_env().clusters.default_cluster_name
+    cluster_metadata = MasterURLIdentifier(
+        project_id='test-project',
+        region='test-region',
+        cluster_name=cluster_name)
+    ie.current_env().clusters.master_urls['test-url'] = cluster_metadata
+    ie.current_env(
+    ).clusters.master_urls_to_dashboards['test-url'] = 'test-dashboard'
+    flink_master = runner._get_dataproc_cluster_master_url_if_applicable(p)
+    self.assertEqual(
+        ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
+        'test-project')
+    self.assertEqual(
+        flink_master, ie.current_env().clusters.describe(p)['master_url'])
+
+  @unittest.skipIf(
+      not ie.current_env().is_interactive_ready,
+      '[interactive] dependency is not installed.')
+  def test_get_master_url_flink_master_provided(self):
     runner = interactive_runner.InteractiveRunner()
     from apache_beam.runners.portability.flink_runner import FlinkRunner
     p = beam.Pipeline(
         interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
         options=PipelineOptions(
             flink_master='--flink_master=example.internal:1'))
-    runner._create_dataproc_cluster_if_applicable(p)
+    runner._get_dataproc_cluster_master_url_if_applicable(p)
     self.assertEqual(ie.current_env().clusters.describe(), {})
     ie.current_env().clusters = ib.Clusters()
 
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 26734d4..9984791 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -55,9 +55,6 @@ else:
 
 
 class MockBuckets():
-  def __init__(self):
-    pass
-
   def Get(self, path):
     if path == 'test-bucket-not-found':
       raise HttpNotFoundError({'status': 404}, {}, '')