You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ni...@apache.org on 2022/03/02 19:08:39 UTC
[beam] branch master updated: [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904)
This is an automated email from the ASF dual-hosted git repository.
ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e4106b [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904)
1e4106b is described below
commit 1e4106bf5cdc5a3166e9feceac2d09f533309d47
Author: Victor <ca...@victorplusc.com>
AuthorDate: Wed Mar 2 14:06:56 2022 -0500
[BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904)
---
.../dataproc/dataproc_cluster_manager.py | 153 ++++++++++++--
.../dataproc/dataproc_cluster_manager_test.py | 232 ++++++++++++++++++---
.../runners/interactive/interactive_beam.py | 13 +-
.../runners/interactive/interactive_beam_test.py | 17 +-
.../runners/interactive/interactive_runner.py | 23 +-
.../runners/interactive/interactive_runner_test.py | 37 +++-
.../apache_beam/runners/interactive/utils_test.py | 3 -
7 files changed, 412 insertions(+), 66 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
index d1c2734..4a9c688 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
@@ -18,10 +18,26 @@
# pytype: skip-file
import logging
+import re
+import time
from dataclasses import dataclass
from typing import Optional
+from typing import Tuple
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.utils import progress_indicated
+from apache_beam.version import __version__ as beam_version
+
+try:
+ from google.cloud import dataproc_v1
+ from apache_beam.io.gcp import gcsfilesystem #pylint: disable=ungrouped-imports
+except ImportError:
+
+ class UnimportedDataproc:
+ Cluster = None
+
+ dataproc_v1 = UnimportedDataproc()
_LOGGER = logging.getLogger(__name__)
@@ -51,6 +67,9 @@ class DataprocClusterManager:
required for creating and deleting Dataproc clusters for use
under Interactive Beam.
"""
+ IMAGE_VERSION = '2.0.31-debian10'
+ STAGING_LOG_NAME = 'dataproc-startup-script_output'
+
def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
"""Initializes the DataprocClusterManager with properties required
to interface with the Dataproc ClusterControllerClient.
@@ -69,7 +88,6 @@ class DataprocClusterManager:
self.cluster_metadata.cluster_name = ie.current_env(
).clusters.default_cluster_name
- from google.cloud import dataproc_v1
self._cluster_client = dataproc_v1.ClusterControllerClient(
client_options={
'api_endpoint': \
@@ -79,9 +97,16 @@ class DataprocClusterManager:
if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse:
self.master_url = ie.current_env().clusters.master_urls.inverse[
self.cluster_metadata]
+ self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[
+ self.master_url]
else:
self.master_url = None
+ self.dashboard = None
+ self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
+ self._staging_directory = None
+
+ @progress_indicated
def create_cluster(self, cluster: dict) -> None:
"""Attempts to create a cluster using attributes that were
initialized with the DataprocClusterManager instance.
@@ -103,7 +128,10 @@ class DataprocClusterManager:
_LOGGER.info(
'Cluster created successfully: %s',
self.cluster_metadata.cluster_name)
- self.master_url = self.get_master_url(self.cluster_metadata)
+ self._staging_directory = self.get_staging_location(self.cluster_metadata)
+ self.master_url, self.dashboard = self.get_master_url_and_dashboard(
+ self.cluster_metadata,
+ self._staging_directory)
except Exception as e:
if e.code == 409:
_LOGGER.info(
@@ -127,7 +155,6 @@ class DataprocClusterManager:
'Unable to create cluster: %s', self.cluster_metadata.cluster_name)
raise e
- # TODO(victorhc): Add support for user-specified pip packages
def create_flink_cluster(self) -> None:
"""Calls _create_cluster with a configuration that enables FlinkRunner."""
cluster = {
@@ -135,11 +162,13 @@ class DataprocClusterManager:
'cluster_name': self.cluster_metadata.cluster_name,
'config': {
'software_config': {
+ 'image_version': self.IMAGE_VERSION,
'optional_components': ['DOCKER', 'FLINK']
},
'gce_cluster_config': {
'metadata': {
- 'flink-start-yarn-session': 'true'
+ 'flink-start-yarn-session': 'true',
+ 'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version)
},
'service_account_scopes': [
'https://www.googleapis.com/auth/cloud-platform'
@@ -156,6 +185,8 @@ class DataprocClusterManager:
"""Deletes the cluster that uses the attributes initialized
with the DataprocClusterManager instance."""
try:
+ if self._staging_directory:
+ self.cleanup_staging_files(self._staging_directory)
self._cluster_client.delete_cluster(
request={
'project_id': self.cluster_metadata.project_id,
@@ -186,15 +217,111 @@ class DataprocClusterManager:
"""Returns a dictionary describing the cluster."""
return {
'cluster_metadata': self.cluster_metadata,
- 'master_url': self.master_url
+ 'master_url': self.master_url,
+ 'dashboard': self.dashboard
}
- def get_master_url(self, identifier) -> None:
+ def get_cluster_details(
+ self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
+ """Gets the Dataproc_v1 Cluster object for the current cluster manager."""
+ try:
+ return self._cluster_client.get_cluster(
+ request={
+ 'project_id': cluster_metadata.project_id,
+ 'region': cluster_metadata.region,
+ 'cluster_name': cluster_metadata.cluster_name
+ })
+ except Exception as e:
+ if e.code == 403:
+ _LOGGER.error(
+ 'Due to insufficient project permissions, '
+ 'unable to retrieve information for cluster: %s',
+ cluster_metadata.cluster_name)
+ raise ValueError(
+ 'You cannot view clusters in project: {}'.format(
+ cluster_metadata.project_id))
+ elif e.code == 404:
+ _LOGGER.error(
+ 'Cluster does not exist: %s', cluster_metadata.cluster_name)
+ raise ValueError(
+ 'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
+ else:
+ _LOGGER.error(
+ 'Failed to get information for cluster: %s',
+ cluster_metadata.cluster_name)
+ raise e
+
+ def wait_for_cluster_to_provision(
+ self, cluster_metadata: MasterURLIdentifier) -> None:
+ while self.get_cluster_details(
+ cluster_metadata).status.state.name == 'CREATING':
+ time.sleep(15)
+
+ def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str:
+ """Gets the staging bucket of an existing Dataproc cluster."""
+ try:
+ self.wait_for_cluster_to_provision(cluster_metadata)
+ cluster_details = self.get_cluster_details(cluster_metadata)
+ bucket_name = cluster_details.config.config_bucket
+ gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/'
+ for file in self._fs._list(gcs_path):
+ if cluster_metadata.cluster_name in file.path:
+ # this file path split will look something like:
+ # ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/',
+ # '-{node-type}/dataproc-startup-script_output']
+ return file.path.split(cluster_metadata.cluster_name)[0]
+ except Exception as e:
+ _LOGGER.error(
+ 'Failed to get %s cluster staging bucket.',
+ cluster_metadata.cluster_name)
+ raise e
+
+ def parse_master_url_and_dashboard(
+ self, cluster_metadata: MasterURLIdentifier,
+ line: str) -> Tuple[str, str]:
+ """Parses the master_url and YARN application_id of the Flink process from
+ an input line. The line containing both the master_url and application id
+ is always formatted as such:
+ {text} Found Web Interface {master_url} of application
+ '{application_id}'.\\n
+
+ Truncated example where '...' represents additional text between segments:
+ ... google-dataproc-startup[000]: ... activate-component-flink[0000]:
+ ...org.apache.flink.yarn.YarnClusterDescriptor... [] -
+ Found Web Interface example-master-url:50000 of application
+ 'application_123456789000_0001'.
+
+ Returns the flink_master_url and dashboard link as a tuple."""
+ cluster_details = self.get_cluster_details(cluster_metadata)
+ yarn_endpoint = cluster_details.config.endpoint_config.http_ports[
+ 'YARN ResourceManager']
+ segment = line.split('Found Web Interface ')[1].split(' of application ')
+ master_url = segment[0]
+ application_id = re.sub('\'|.\n', '', segment[1])
+ dashboard = re.sub(
+ '/yarn/',
+ '/gateway/default/yarn/proxy/' + application_id + '/',
+ yarn_endpoint)
+ return master_url, dashboard
+
+ def get_master_url_and_dashboard(
+ self, cluster_metadata: MasterURLIdentifier,
+ staging_bucket) -> Tuple[Optional[str], Optional[str]]:
"""Returns the master_url of the current cluster."""
- # TODO(victorhc): Implement the following method to fetch the cluster
- # master_url from Dataproc.
- return '.'.join([
- self.cluster_metadata.project_id,
- self.cluster_metadata.region,
- self.cluster_metadata.cluster_name
- ])
+ startup_logs = []
+ for file in self._fs._list(staging_bucket):
+ if self.STAGING_LOG_NAME in file.path:
+ startup_logs.append(file.path)
+
+ for log in startup_logs:
+ content = self._fs.open(log)
+ for line in content.readlines():
+ decoded_line = line.decode()
+ if 'Found Web Interface' in decoded_line:
+ return self.parse_master_url_and_dashboard(
+ cluster_metadata, decoded_line)
+ return None, None
+
+ def cleanup_staging_files(self, staging_directory: str) -> None:
+ staging_files = [file.path for file in self._fs._list(staging_directory)]
+ self._fs.delete(staging_files)
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
index b34641d..ba59cf6 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
@@ -15,25 +15,57 @@
# limitations under the License.
#
+"""Tests for apache_beam.runners.interactive.dataproc.
+dataproc_cluster_manager."""
# pytype: skip-file
import unittest
from unittest.mock import patch
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier
+
try:
from google.cloud import dataproc_v1 # pylint: disable=unused-import
- from apache_beam.runners.interactive.dataproc import dataproc_cluster_manager
except ImportError:
_dataproc_imported = False
else:
_dataproc_imported = True
+class MockProperty:
+ def __init__(self, property, value):
+ object.__setattr__(self, property, value)
+
+
class MockException(Exception):
def __init__(self, code=-1):
self.code = code
+class MockCluster:
+ def __init__(self, config_bucket=None):
+ self.config = MockProperty('config_bucket', config_bucket)
+ self.status = MockProperty('state', MockProperty('name', None))
+
+
+class MockFileSystem:
+ def _list(self, dir=None):
+ return [MockProperty('path', 'test-path/dataproc-startup-script_output')]
+
+ def open(self, dir=None):
+ return MockFileIO('test-line Found Web Interface test-master-url' \
+ ' of application \'test-app-id\'.\n')
+
+
+class MockFileIO:
+ def __init__(self, contents):
+ self.contents = contents
+
+ def readlines(self):
+ return [self.contents.encode('utf-8')]
+
+
@unittest.skipIf(not _dataproc_imported, 'dataproc package was not imported.')
class DataprocClusterManagerTest(unittest.TestCase):
"""Unit test for DataprocClusterManager"""
@@ -45,10 +77,9 @@ class DataprocClusterManagerTest(unittest.TestCase):
Tests that no exception is thrown when a cluster already exists,
but is using ie.current_env().clusters.default_cluster_name.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='INFO') as context_manager:
cluster_manager.create_cluster({})
@@ -62,10 +93,9 @@ class DataprocClusterManagerTest(unittest.TestCase):
Tests that an exception is thrown when a user is trying to write to
a project while having insufficient permissions.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
self.assertRaises(ValueError, cluster_manager.create_cluster, {})
@@ -81,10 +111,9 @@ class DataprocClusterManagerTest(unittest.TestCase):
Tests that an exception is thrown when a user specifies a region
that does not exist.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
self.assertRaises(ValueError, cluster_manager.create_cluster, {})
@@ -98,27 +127,29 @@ class DataprocClusterManagerTest(unittest.TestCase):
Tests that an exception is thrown when the exception is not handled by
any other case under _create_cluster.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
self.assertRaises(MockException, cluster_manager.create_cluster, {})
self.assertTrue('Unable to create cluster' in context_manager.output[0])
@patch(
+ 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+ 'DataprocClusterManager.cleanup_staging_files',
+ return_value=None)
+ @patch(
'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster',
side_effect=MockException(403))
- def test_cleanup_permission_denied(self, mock_cluster_client):
+ def test_cleanup_permission_denied(self, mock_cluster_client, mock_cleanup):
"""
Tests that an exception is thrown when a user is trying to delete
a project that they have insufficient permissions for.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
self.assertRaises(ValueError, cluster_manager.cleanup)
@@ -127,39 +158,190 @@ class DataprocClusterManagerTest(unittest.TestCase):
context_manager.output[0])
@patch(
+ 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+ 'DataprocClusterManager.cleanup_staging_files',
+ return_value=None)
+ @patch(
'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster',
side_effect=MockException(404))
- def test_cleanup_does_not_exist(self, mock_cluster_client):
+ def test_cleanup_does_not_exist(self, mock_cluster_client, mock_cleanup):
"""
Tests that an exception is thrown when cleanup attempts to delete
a cluster that does not exist.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
self.assertRaises(ValueError, cluster_manager.cleanup)
self.assertTrue('Cluster does not exist' in context_manager.output[0])
@patch(
+ 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+ 'DataprocClusterManager.cleanup_staging_files',
+ return_value=None)
+ @patch(
'google.cloud.dataproc_v1.ClusterControllerClient.delete_cluster',
side_effect=MockException())
- def test_cleanup_other_exception(self, mock_cluster_client):
+ def test_cleanup_other_exception(self, mock_cluster_client, mock_cleanup):
"""
Tests that an exception is thrown when the exception is not handled by
any other case under cleanup.
"""
- cluster_metadata = dataproc_cluster_manager.MasterURLIdentifier(
+ cluster_metadata = MasterURLIdentifier(
project_id='test-project', region='test-region')
- cluster_manager = dataproc_cluster_manager.DataprocClusterManager(
- cluster_metadata)
+ cluster_manager = DataprocClusterManager(cluster_metadata)
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
with self.assertLogs(_LOGGER, level='ERROR') as context_manager:
self.assertRaises(MockException, cluster_manager.cleanup)
self.assertTrue('Failed to delete cluster' in context_manager.output[0])
+ @patch(
+ 'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._list',
+ return_value=[
+ MockProperty(
+ 'path',
+ 'gs://test-bucket/google-cloud-dataproc-metainfo'
+ '/test-cluster/item')
+ ])
+ @patch(
+ 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+ return_value=MockCluster('test-bucket'))
+ def test_get_staging_location(self, mock_cluster_client, mock_list):
+ """
+ Test to receive a mock staging location successfully under
+ get_staging_location.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project',
+ region='test-region',
+ cluster_name='test-cluster')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ self.assertEqual(
+ cluster_manager.get_staging_location(cluster_metadata),
+ 'gs://test-bucket/google-cloud-dataproc-metainfo/')
+
+ @patch(
+ 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+ side_effect=MockException())
+ def test_get_staging_location_exception(self, mock_cluster_client):
+ """
+ Test to catch when an error is raised inside get_staging_location.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project',
+ region='test-region',
+ cluster_name='test-cluster')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ with self.assertRaises(MockException):
+ cluster_manager.get_staging_location(cluster_metadata)
+
+ @patch(
+ 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+ 'DataprocClusterManager.get_cluster_details',
+ return_value=MockProperty(
+ 'config',
+ MockProperty(
+ 'endpoint_config',
+ MockProperty(
+ 'http_ports',
+ {'YARN ResourceManager': 'test-resource-manager/yarn/'}))))
+ def test_parse_master_url_and_dashboard(self, mock_cluster_details):
+ """
+ Tests that parse_master_url_and_dashboard properly parses the input
+ string and produces a mock master_url and mock dashboard link.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project', region='test-region')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ line = 'test-line Found Web Interface test-master-url' \
+ ' of application \'test-app-id\'.\n'
+ master_url, dashboard = cluster_manager.parse_master_url_and_dashboard(
+ cluster_metadata, line)
+ self.assertEqual('test-master-url', master_url)
+ self.assertEqual(
+ 'test-resource-manager/gateway/default/yarn/proxy/test-app-id/',
+ dashboard)
+
+ @patch(
+ 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+ side_effect=MockException(403))
+ def test_get_cluster_details_permission_denied(self, mock_cluster_client):
+ """
+ Tests that an exception is thrown when a user is trying to get information
+ for a project without sufficient permissions to do so.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project', region='test-region')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
+ with self.assertLogs(
+ _LOGGER,
+ level='ERROR') as context_manager, self.assertRaises(ValueError):
+ cluster_manager.get_cluster_details(cluster_metadata)
+ self.assertTrue(
+ 'Due to insufficient project permissions' in
+ context_manager.output[0])
+
+ @patch(
+ 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+ side_effect=MockException(404))
+ def test_get_cluster_details_does_not_exist(self, mock_cluster_client):
+ """
+ Tests that an exception is thrown when cleanup attempts to get information
+ for a cluster that does not exist.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project', region='test-region')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
+ with self.assertLogs(
+ _LOGGER,
+ level='ERROR') as context_manager, self.assertRaises(ValueError):
+ cluster_manager.get_cluster_details(cluster_metadata)
+ self.assertTrue('Cluster does not exist' in context_manager.output[0])
+
+ @patch(
+ 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
+ side_effect=MockException())
+ def test_get_cluster_details_other_exception(self, mock_cluster_client):
+ """
+ Tests that an exception is thrown when the exception is not handled by
+ any other case under get_cluster_details.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project', region='test-region')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import _LOGGER
+ with self.assertLogs(
+ _LOGGER,
+ level='ERROR') as context_manager, self.assertRaises(MockException):
+ cluster_manager.get_cluster_details(cluster_metadata)
+ self.assertTrue(
+ 'Failed to get information for cluster' in context_manager.output[0])
+
+ @patch(
+ 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
+ 'DataprocClusterManager.parse_master_url_and_dashboard',
+ return_value=('test-master-url', 'test-dashboard-link'))
+ def test_get_master_url_and_dashboard(self, mock_parse_method):
+ """
+ Tests that get_master_url_and_dashboard detect the line containing the
+ unique substring which identifies the location of the master_url and
+ application id of the Flink master.
+ """
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project', region='test-region')
+ cluster_manager = DataprocClusterManager(cluster_metadata)
+ cluster_manager._fs = MockFileSystem()
+ master_url, dashboard = cluster_manager.get_master_url_and_dashboard(
+ cluster_metadata,
+ 'test-staging-bucket'
+ )
+ self.assertEqual(master_url, 'test-master-url')
+ self.assertEqual(dashboard, 'test-dashboard-link')
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 5eab489..6098b07 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -364,20 +364,23 @@ class Clusters:
# pipelines that use the corresponding master_url.
self.master_urls_to_pipelines: DefaultDict[
str, List[beam.Pipeline]] = defaultdict(list)
+ # self.master_urls_to_dashboards map string master_urls to the
+ # corresponding Apache Flink dashboards.
+ self.master_urls_to_dashboards: Dict[str, str] = {}
def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
"""Returns a description of the cluster associated to the given pipeline.
If no pipeline is given then this returns a dictionary of descriptions for
- all pipelines.
+ all pipelines, mapped to by id.
"""
description = {
- ie.current_env().pipeline_id_to_pipeline(pid): dcm.describe()
+ pid: dcm.describe()
for pid,
dcm in self.dataproc_cluster_managers.items()
}
if pipeline:
- return description.get(pipeline, None)
+ return description.get(str(id(pipeline)), None)
return description
def cleanup(
@@ -419,6 +422,7 @@ class Clusters:
cluster_manager.cleanup()
self.master_urls.pop(master_url, None)
self.master_urls_to_pipelines.pop(master_url, None)
+ self.master_urls_to_dashboards.pop(master_url, None)
self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
else:
cluster_manager_identifiers = set()
@@ -429,6 +433,7 @@ class Clusters:
self.dataproc_cluster_managers.clear()
self.master_urls.clear()
self.master_urls_to_pipelines.clear()
+ self.master_urls_to_dashboards.clear()
# Users can set options to guide how Interactive Beam works.
@@ -452,7 +457,7 @@ recordings = Recordings()
# Examples:
# ib.clusters.describe(p)
# Check the docstrings for detailed usages.
-# TODO(victorhc): Implement all functionality for Clusters()
+# TODO(victorhc): Resolve connection issue and add a working example
# clusters = Clusters()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index b14848c..4541463 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -306,15 +306,16 @@ class InteractiveBeamClustersTest(unittest.TestCase):
region=region,
))
cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
- clusters.dataproc_cluster_managers[p] = DataprocClusterManager(
- cluster_metadata)
- self.assertEqual('test-project', clusters.describe()[None] \
- ['cluster_metadata'].project_id)
+ clusters.dataproc_cluster_managers[str(
+ id(p))] = DataprocClusterManager(cluster_metadata)
+ self.assertEqual(
+ 'test-project',
+ clusters.describe()[str(id(p))]['cluster_metadata'].project_id)
@patch(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
- 'DataprocClusterManager.get_master_url',
- return_value='test-master-url')
+ 'DataprocClusterManager.get_master_url_and_dashboard',
+ return_value=('test-master-url', None))
@patch(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
'DataprocClusterManager.cleanup',
@@ -350,8 +351,8 @@ class InteractiveBeamClustersTest(unittest.TestCase):
@patch(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
- 'DataprocClusterManager.get_master_url',
- return_value='test-master-url')
+ 'DataprocClusterManager.get_master_url_and_dashboard',
+ return_value=('test-master-url', None))
def test_clusters_cleanup_skip_on_duplicate(self, mock_master_url):
clusters = ib.Clusters()
project = 'test-project'
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index d76d689..a384be9 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -27,6 +27,7 @@ import warnings
import apache_beam as beam
from apache_beam import runners
+from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct import direct_runner
@@ -137,6 +138,14 @@ class InteractiveRunner(runners.PipelineRunner):
watch_sources(pipeline)
user_pipeline = ie.current_env().user_pipeline(pipeline)
+ if user_pipeline:
+ # When the underlying_runner is a FlinkRunner instance, create a
+ # corresponding DataprocClusterManager for it if no flink_master_url
+ # is provided.
+ master_url = self._get_dataproc_cluster_master_url_if_applicable(
+ user_pipeline)
+ if master_url:
+ options.view_as(FlinkRunnerOptions).flink_master = master_url
pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)
# The user_pipeline analyzed might be None if the pipeline given has nothing
@@ -169,11 +178,6 @@ class InteractiveRunner(runners.PipelineRunner):
ie.current_env().set_test_stream_service_controller(
user_pipeline, test_stream_service)
- # When the underlying_runner is a FlinkRunner instance, create a
- # corresponding DataprocClusterManager for it if no flink_master_url
- # is provided.
- self._create_dataproc_cluster_if_applicable(user_pipeline)
-
pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
pipeline_instrument.instrumented_pipeline_proto(),
self._underlying_runner,
@@ -220,7 +224,8 @@ class InteractiveRunner(runners.PipelineRunner):
# TODO(victorhc): Move this method somewhere else if performance is impacted
# by generating a cluster during runtime.
- def _create_dataproc_cluster_if_applicable(self, user_pipeline):
+ def _get_dataproc_cluster_master_url_if_applicable(
+ self, user_pipeline: beam.Pipeline) -> str:
""" Creates a Dataproc cluster if the provided user_pipeline is running
FlinkRunner and no flink_master_url was provided as an option. A cluster
is not created when a flink_master_url is detected.
@@ -241,7 +246,6 @@ class InteractiveRunner(runners.PipelineRunner):
])
"""
from apache_beam.runners.portability.flink_runner import FlinkRunner
- from apache_beam.options.pipeline_options import FlinkRunnerOptions
flink_master = user_pipeline.options.view_as(
FlinkRunnerOptions).flink_master
clusters = ie.current_env().clusters
@@ -264,7 +268,7 @@ class InteractiveRunner(runners.PipelineRunner):
cluster_metadata = MasterURLIdentifier(
project_id=project_id, region=region, cluster_name=cluster_name)
else:
- cluster_metadata = clusters.master_urls.inverse.get(flink_master, None)
+ cluster_metadata = clusters.master_urls.get(flink_master, None)
# else noop, no need to log anything because we allow a master_url
# (not managed by us) provided by the user.
if cluster_metadata:
@@ -278,6 +282,9 @@ class InteractiveRunner(runners.PipelineRunner):
id(user_pipeline))] = cluster_manager
clusters.master_urls_to_pipelines[cluster_manager.master_url].append(
str(id(user_pipeline)))
+ clusters.master_urls_to_dashboards[
+ cluster_manager.master_url] = cluster_manager.dashboard
+ return cluster_manager.master_url
class PipelineResult(beam.runners.runner.PipelineResult):
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index de5f1a5..47dedd6 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -37,6 +37,7 @@ from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier
from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import GlobalWindow
@@ -491,7 +492,7 @@ class InteractiveRunnerTest(unittest.TestCase):
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
'DataprocClusterManager.create_flink_cluster',
return_value=None)
- def test_create_dataproc_cluster_no_flink_master_or_master_url(
+ def test_get_master_url_no_flink_master_or_provided_master_url(
self, mock_create_cluster):
from apache_beam.runners.portability.flink_runner import FlinkRunner
runner = interactive_runner.InteractiveRunner(
@@ -501,8 +502,7 @@ class InteractiveRunnerTest(unittest.TestCase):
project='test-project',
region='test-region',
))
- runner._create_dataproc_cluster_if_applicable(p)
- ie.current_env()._tracked_user_pipelines.add_user_pipeline(p)
+ runner._get_dataproc_cluster_master_url_if_applicable(p)
self.assertEqual(
ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
'test-project')
@@ -511,14 +511,41 @@ class InteractiveRunnerTest(unittest.TestCase):
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
- def test_create_dataproc_cluster_flink_master_provided(self):
+ def test_get_master_url_no_flink_master_and_master_url_exists(self):
+ from apache_beam.runners.portability.flink_runner import FlinkRunner
+ runner = interactive_runner.InteractiveRunner(
+ underlying_runner=FlinkRunner())
+ p = beam.Pipeline(
+ options=PipelineOptions(
+ project='test-project',
+ region='test-region',
+ ))
+ cluster_name = ie.current_env().clusters.default_cluster_name
+ cluster_metadata = MasterURLIdentifier(
+ project_id='test-project',
+ region='test-region',
+ cluster_name=cluster_name)
+ ie.current_env().clusters.master_urls['test-url'] = cluster_metadata
+ ie.current_env(
+ ).clusters.master_urls_to_dashboards['test-url'] = 'test-dashboard'
+ flink_master = runner._get_dataproc_cluster_master_url_if_applicable(p)
+ self.assertEqual(
+ ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
+ 'test-project')
+ self.assertEqual(
+ flink_master, ie.current_env().clusters.describe(p)['master_url'])
+
+ @unittest.skipIf(
+ not ie.current_env().is_interactive_ready,
+ '[interactive] dependency is not installed.')
+ def test_get_master_url_flink_master_provided(self):
runner = interactive_runner.InteractiveRunner()
from apache_beam.runners.portability.flink_runner import FlinkRunner
p = beam.Pipeline(
interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
options=PipelineOptions(
flink_master='--flink_master=example.internal:1'))
- runner._create_dataproc_cluster_if_applicable(p)
+ runner._get_dataproc_cluster_master_url_if_applicable(p)
self.assertEqual(ie.current_env().clusters.describe(), {})
ie.current_env().clusters = ib.Clusters()
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 26734d4..9984791 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -55,9 +55,6 @@ else:
class MockBuckets():
- def __init__(self):
- pass
-
def Get(self, path):
if path == 'test-bucket-not-found':
raise HttpNotFoundError({'status': 404}, {}, '')