You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/02/03 23:38:36 UTC

[GitHub] [beam] KevinGG commented on a change in pull request #16691: [BEAM-13799] Create an Interactive Beam Dataproc package for users to manage clusters with

KevinGG commented on a change in pull request #16691:
URL: https://github.com/apache/beam/pull/16691#discussion_r799043983



##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):
+    """Initializes the DataprocClusterManager with properties required
+    to interface with the Dataproc ClusterControllerClient.
+    """
+
+    self.project_id = project_id
+    if region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif region:
+      self.region = region
+    else:
+      _LOGGER.warning(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      self.region = 'us-central1'
+
+    if cluster_name:
+      _LOGGER.warning(
+          'A user-specified cluster_name has been detected. '
+          'Please note that you will have to manually delete the Dataproc '
+          'cluster that will be created under the name: %s',
+          cluster_name)
+      self.cluster_name = cluster_name
+    else:
+      self.cluster_name = self.DEFAULT_NAME
+
+    self.cluster_client = dataproc_v1.ClusterControllerClient(
+        client_options={
+            'api_endpoint': f'{self.region}-dataproc.googleapis.com:443'
+        })
+
+  def _create_cluster(self, cluster):
+    """Attempts to create a cluster using attributes that were
+    initialized with the DataprocClusterManager instance.
+
+    Args:
+      cluster: Dictionary representing Dataproc cluster
+    """
+    try:
+      self.cluster_client.create_cluster(
+          request={
+              'project_id': self.project_id,
+              'region': self.region,
+              'cluster': cluster
+          })
+      _LOGGER.info('Cluster created successfully: %s', self.cluster_name)
+    except Exception as e:
+      if e.code == 409:
+        if self.cluster_name == self.DEFAULT_NAME:
+          _LOGGER.info(
+              'Cluster %s already exists. Continuing...', self.DEFAULT_NAME)
+        else:
+          _LOGGER.error(
+              'Cluster already exists - unable to create cluster: %s',
+              self.cluster_name)
+          raise ValueError(
+              'Cluster {} already exists!'.format(self.cluster_name))
+      elif e.code == 403:
+        _LOGGER.error(
+            'Due to insufficient project permissions, '
+            'unable to create cluster: %s',
+            self.cluster_name)
+        raise ValueError(
+            'You cannot create a cluster in project: {}'.format(
+                self.project_id))
+      elif e.code == 501:
+        _LOGGER.error('Invalid region provided: %s', self.region)
+        raise ValueError('Region {} does not exist!'.format(self.region))
+      else:
+        _LOGGER.error('Unable to create cluster: %s', self.cluster_name)
+        raise e
+
+  # TODO(victorhc): Add support for user-specified pip packages
+  def create_flink_cluster(self):
+    """Calls _create_cluster with a configuration that enables FlinkRunner."""
+    cluster = {
+        'project_id': self.project_id,
+        'cluster_name': self.cluster_name,
+        'config': {
+            'software_config': {
+                'optional_components': ['DOCKER', 'FLINK']
+            }
+        }
+    }
+    self._create_cluster(cluster)
+
+  def cleanup(self):
+    """Deletes the cluster that uses the attributes initialized
+    with the DataprocClusterManager instance if the default
+    cluster_name is used."""
+    if self.cluster_name != self.DEFAULT_NAME:
+      return
+
+    try:
+      self.cluster_client.delete_cluster(
+          request={
+              'project_id': self.project_id,
+              'region': self.region,
+              'cluster_name': self.cluster_name,
+          })
+
+    except Exception as e:
+      if e.code == 403:
+        _LOGGER.error(
+            'Due to insufficient project permissions, '
+            'unable to delete cluster: %s',

Review comment:
       Maybe clarify that the cluster to be deleted is the default one, how about:
   `unable to clean up the default cluster: %s`?

##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):
+    """Initializes the DataprocClusterManager with properties required
+    to interface with the Dataproc ClusterControllerClient.
+    """
+
+    self.project_id = project_id
+    if region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif region:
+      self.region = region
+    else:
+      _LOGGER.warning(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      self.region = 'us-central1'
+
+    if cluster_name:
+      _LOGGER.warning(
+          'A user-specified cluster_name has been detected. '
+          'Please note that you will have to manually delete the Dataproc '
+          'cluster that will be created under the name: %s',
+          cluster_name)
+      self.cluster_name = cluster_name
+    else:
+      self.cluster_name = self.DEFAULT_NAME
+
+    self.cluster_client = dataproc_v1.ClusterControllerClient(
+        client_options={
+            'api_endpoint': f'{self.region}-dataproc.googleapis.com:443'
+        })
+
+  def _create_cluster(self, cluster):
+    """Attempts to create a cluster using attributes that were
+    initialized with the DataprocClusterManager instance.
+
+    Args:
+      cluster: Dictionary representing Dataproc cluster
+    """
+    try:
+      self.cluster_client.create_cluster(
+          request={
+              'project_id': self.project_id,
+              'region': self.region,
+              'cluster': cluster
+          })
+      _LOGGER.info('Cluster created successfully: %s', self.cluster_name)
+    except Exception as e:
+      if e.code == 409:
+        if self.cluster_name == self.DEFAULT_NAME:
+          _LOGGER.info(
+              'Cluster %s already exists. Continuing...', self.DEFAULT_NAME)
+        else:
+          _LOGGER.error(
+              'Cluster already exists - unable to create cluster: %s',
+              self.cluster_name)
+          raise ValueError(
+              'Cluster {} already exists!'.format(self.cluster_name))
+      elif e.code == 403:
+        _LOGGER.error(
+            'Due to insufficient project permissions, '
+            'unable to create cluster: %s',
+            self.cluster_name)
+        raise ValueError(
+            'You cannot create a cluster in project: {}'.format(
+                self.project_id))
+      elif e.code == 501:
+        _LOGGER.error('Invalid region provided: %s', self.region)
+        raise ValueError('Region {} does not exist!'.format(self.region))
+      else:
+        _LOGGER.error('Unable to create cluster: %s', self.cluster_name)
+        raise e
+
+  # TODO(victorhc): Add support for user-specified pip packages

Review comment:
       It seems that the Python packages are installed in many different ways based on the Dataproc image version: https://cloud.google.com/dataproc/docs/tutorials/python-configuration.
   
   - We may pick a stable version for now: `image-version` (I assume it's 1.5 and Python 3.7 happens to be the version we use in notebooks) and install `apache-beam[gcp]==${kernel_version}`
   - We still need `metadata` support for `flink-start-yarn-session=true`
   - We might need [Initialization actions](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/init-actions) for more complicated setups.
   
   These are TODO items we can leave for the next PRs.
   

##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):

Review comment:
       Let's add some Python3 typehints to the signature of functions in this module. Details see https://www.python.org/dev/peps/pep-0484/. You can also check some cheat sheet: https://mypy.readthedocs.io/en/stable/cheat_sheet_py3.html.

##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):
+    """Initializes the DataprocClusterManager with properties required
+    to interface with the Dataproc ClusterControllerClient.
+    """
+
+    self.project_id = project_id
+    if region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif region:
+      self.region = region
+    else:
+      _LOGGER.warning(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      self.region = 'us-central1'
+
+    if cluster_name:
+      _LOGGER.warning(
+          'A user-specified cluster_name has been detected. '
+          'Please note that you will have to manually delete the Dataproc '
+          'cluster that will be created under the name: %s',
+          cluster_name)
+      self.cluster_name = cluster_name
+    else:
+      self.cluster_name = self.DEFAULT_NAME
+
+    self.cluster_client = dataproc_v1.ClusterControllerClient(
+        client_options={
+            'api_endpoint': f'{self.region}-dataproc.googleapis.com:443'
+        })
+
+  def _create_cluster(self, cluster):
+    """Attempts to create a cluster using attributes that were
+    initialized with the DataprocClusterManager instance.
+
+    Args:
+      cluster: Dictionary representing Dataproc cluster
+    """
+    try:
+      self.cluster_client.create_cluster(
+          request={
+              'project_id': self.project_id,
+              'region': self.region,
+              'cluster': cluster
+          })
+      _LOGGER.info('Cluster created successfully: %s', self.cluster_name)
+    except Exception as e:
+      if e.code == 409:
+        if self.cluster_name == self.DEFAULT_NAME:
+          _LOGGER.info(
+              'Cluster %s already exists. Continuing...', self.DEFAULT_NAME)
+        else:
+          _LOGGER.error(
+              'Cluster already exists - unable to create cluster: %s',
+              self.cluster_name)
+          raise ValueError(
+              'Cluster {} already exists!'.format(self.cluster_name))
+      elif e.code == 403:
+        _LOGGER.error(
+            'Due to insufficient project permissions, '
+            'unable to create cluster: %s',
+            self.cluster_name)
+        raise ValueError(
+            'You cannot create a cluster in project: {}'.format(
+                self.project_id))
+      elif e.code == 501:
+        _LOGGER.error('Invalid region provided: %s', self.region)
+        raise ValueError('Region {} does not exist!'.format(self.region))
+      else:
+        _LOGGER.error('Unable to create cluster: %s', self.cluster_name)
+        raise e
+
+  # TODO(victorhc): Add support for user-specified pip packages
+  def create_flink_cluster(self):
+    """Calls _create_cluster with a configuration that enables FlinkRunner."""
+    cluster = {
+        'project_id': self.project_id,
+        'cluster_name': self.cluster_name,
+        'config': {
+            'software_config': {
+                'optional_components': ['DOCKER', 'FLINK']
+            }
+        }
+    }
+    self._create_cluster(cluster)
+
+  def cleanup(self):
+    """Deletes the cluster that uses the attributes initialized
+    with the DataprocClusterManager instance if the default
+    cluster_name is used."""
+    if self.cluster_name != self.DEFAULT_NAME:
+      return
+
+    try:

Review comment:
       This feels a bit unnatural. Since the `cluster_name` is already an attribute of the manager, this function is better rephrased into 2 functions:
   - `delete_cluster(self)`: deletes the cluster
   - `cleanup_if_default(self)`: checks the cluster_name and invoke `self.delete_cluster` if the name is default value.

##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):
+    """Initializes the DataprocClusterManager with properties required
+    to interface with the Dataproc ClusterControllerClient.
+    """
+
+    self.project_id = project_id
+    if region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif region:
+      self.region = region
+    else:
+      _LOGGER.warning(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      self.region = 'us-central1'
+
+    if cluster_name:
+      _LOGGER.warning(
+          'A user-specified cluster_name has been detected. '
+          'Please note that you will have to manually delete the Dataproc '
+          'cluster that will be created under the name: %s',
+          cluster_name)
+      self.cluster_name = cluster_name
+    else:
+      self.cluster_name = self.DEFAULT_NAME
+
+    self.cluster_client = dataproc_v1.ClusterControllerClient(
+        client_options={
+            'api_endpoint': f'{self.region}-dataproc.googleapis.com:443'
+        })
+
+  def _create_cluster(self, cluster):
+    """Attempts to create a cluster using attributes that were
+    initialized with the DataprocClusterManager instance.
+
+    Args:
+      cluster: Dictionary representing Dataproc cluster
+    """
+    try:
+      self.cluster_client.create_cluster(
+          request={
+              'project_id': self.project_id,
+              'region': self.region,
+              'cluster': cluster
+          })
+      _LOGGER.info('Cluster created successfully: %s', self.cluster_name)
+    except Exception as e:
+      if e.code == 409:
+        if self.cluster_name == self.DEFAULT_NAME:
+          _LOGGER.info(
+              'Cluster %s already exists. Continuing...', self.DEFAULT_NAME)
+        else:
+          _LOGGER.error(
+              'Cluster already exists - unable to create cluster: %s',
+              self.cluster_name)
+          raise ValueError(
+              'Cluster {} already exists!'.format(self.cluster_name))
+      elif e.code == 403:
+        _LOGGER.error(
+            'Due to insufficient project permissions, '
+            'unable to create cluster: %s',
+            self.cluster_name)
+        raise ValueError(
+            'You cannot create a cluster in project: {}'.format(
+                self.project_id))
+      elif e.code == 501:
+        _LOGGER.error('Invalid region provided: %s', self.region)
+        raise ValueError('Region {} does not exist!'.format(self.region))
+      else:
+        _LOGGER.error('Unable to create cluster: %s', self.cluster_name)
+        raise e
+
+  # TODO(victorhc): Add support for user-specified pip packages
+  def create_flink_cluster(self):
+    """Calls _create_cluster with a configuration that enables FlinkRunner."""
+    cluster = {
+        'project_id': self.project_id,
+        'cluster_name': self.cluster_name,
+        'config': {
+            'software_config': {
+                'optional_components': ['DOCKER', 'FLINK']
+            }
+        }
+    }
+    self._create_cluster(cluster)
+
+  def cleanup(self):
+    """Deletes the cluster that uses the attributes initialized
+    with the DataprocClusterManager instance if the default
+    cluster_name is used."""
+    if self.cluster_name != self.DEFAULT_NAME:
+      return
+
+    try:
+      self.cluster_client.delete_cluster(
+          request={
+              'project_id': self.project_id,
+              'region': self.region,
+              'cluster_name': self.cluster_name,
+          })
+

Review comment:
       nit: empty line

##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):
+    """Initializes the DataprocClusterManager with properties required
+    to interface with the Dataproc ClusterControllerClient.
+    """
+
+    self.project_id = project_id
+    if region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif region:
+      self.region = region
+    else:
+      _LOGGER.warning(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      self.region = 'us-central1'
+
+    if cluster_name:
+      _LOGGER.warning(
+          'A user-specified cluster_name has been detected. '
+          'Please note that you will have to manually delete the Dataproc '
+          'cluster that will be created under the name: %s',
+          cluster_name)
+      self.cluster_name = cluster_name
+    else:
+      self.cluster_name = self.DEFAULT_NAME
+
+    self.cluster_client = dataproc_v1.ClusterControllerClient(
+        client_options={
+            'api_endpoint': f'{self.region}-dataproc.googleapis.com:443'
+        })
+
+  def _create_cluster(self, cluster):
+    """Attempts to create a cluster using attributes that were
+    initialized with the DataprocClusterManager instance.
+
+    Args:
+      cluster: Dictionary representing Dataproc cluster

Review comment:
       If possible, could you please also add the link to the schema of this dictionary?

##########
File path: sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+
+try:
+  from google.cloud import dataproc_v1
+except ImportError:
+  raise ImportError(
+      'Google Cloud Dataproc not supported for this execution environment.')
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class DataprocClusterManager:
+  """The DataprocClusterManager object simplifies the operations
+  required for creating and deleting Dataproc clusters for use
+  under Interactive Beam.
+  """
+  DEFAULT_NAME = 'interactive-beam-cluster'
+
+  def __init__(self, project_id=None, region=None, cluster_name=None):
+    """Initializes the DataprocClusterManager with properties required
+    to interface with the Dataproc ClusterControllerClient.
+    """
+
+    self.project_id = project_id
+    if region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif region:
+      self.region = region
+    else:
+      _LOGGER.warning(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      self.region = 'us-central1'
+
+    if cluster_name:
+      _LOGGER.warning(
+          'A user-specified cluster_name has been detected. '
+          'Please note that you will have to manually delete the Dataproc '
+          'cluster that will be created under the name: %s',
+          cluster_name)
+      self.cluster_name = cluster_name
+    else:
+      self.cluster_name = self.DEFAULT_NAME
+
+    self.cluster_client = dataproc_v1.ClusterControllerClient(
+        client_options={
+            'api_endpoint': f'{self.region}-dataproc.googleapis.com:443'
+        })
+
+  def _create_cluster(self, cluster):

Review comment:
       Since the manager is DataprocClusterManager, you can make this function public: `create_cluster`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org