You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/04 23:45:39 UTC

[GitHub] [beam] rohdesamuel commented on a change in pull request #15647: [BEAM-10708] Enable submit beam_sql built jobs to Dataflow

rohdesamuel commented on a change in pull request #15647:
URL: https://github.com/apache/beam/pull/15647#discussion_r721785015



##########
File path: sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
##########
@@ -194,45 +220,72 @@ def beam_sql(self, line: str, cell: Optional[str] = None) -> Optional[PValue]:
             pcoll.element_type)
         return
       register_coder_for_schema(pcoll.element_type, verbose=verbose)
-
-    output_name, output = apply_sql(query, output_name, found)
+      # Only care about schemas defined by the user in the main module.
+      if hasattr(main_session, pcoll.element_type.__name__):
+        schemas.add(pcoll.element_type)
+
+    if runner and runner != 'DirectRunner':

Review comment:
       Why do we condition based on if the runner is the DirectRunner?

##########
File path: sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
##########
@@ -181,11 +196,22 @@ def beam_sql(self, line: str, cell: Optional[str] = None) -> Optional[PValue]:
     if not query:
       on_error('Please supply the SQL query to be executed.')
       return
+    if runner and runner not in _SUPPORTED_RUNNERS:
+      on_error(
+          'Runner "%s" is not supported. Supported runners are %s.',
+          runner,
+          _SUPPORTED_RUNNERS)
     query = ' '.join(query)
 
-    found = find_pcolls(query, pcoll_by_name(), verbose=verbose)
+    found = find_pcolls(
+        query,
+        pcoll_by_name(),
+        run=runner in ('DirectRunner', None),
+        verbose=verbose)
+    schemas = set()
+    main_session = importlib.import_module('__main__')

Review comment:
       Move this line to right before the hasattr(main_session), since it's not used elsewhere

##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -123,3 +134,299 @@ def pformat_namedtuple(schema: NamedTuple) -> str:
           '{}: {}'.format(k, v.__name__) for k,
           v in schema.__annotations__.items()
       ]))
+
+
+@dataclass
+class OptionsEntry:
+  """An entry of PipelineOptions that can be visualized through ipywidgets to
+  take inputs in IPython notebooks interactively.
+
+  Attributes:
+    label: The value of the Label widget.
+    help: The help message of the entry, usually the same to the help in
+      PipelineOptions.
+    cls: The PipelineOptions class/subclass the options belong to.
+    arg_builder: Builds the argument/option. If it's a str, this entry
+      assigns the input ipywidget's value directly to the argument. If it's a
+      Dict, use the corresponding Callable to assign the input value to each
+      argument. If Callable is None, fallback to assign the input value
+      directly. This allows building multiple similar PipelineOptions
+      arguments from a single input, such as staging_location and
+      temp_location in GoogleCloudOptions.
+    default: The default value of the entry, None if absent.
+  """
+  label: str
+  help: str
+  cls: Type[PipelineOptions]
+  arg_builder: Union[str, Dict[str, Optional[Callable]]]
+  default: Optional[str] = None
+
+  def __post_init__(self):
+    # The attribute holds an ipywidget, currently only supports Text.
+    # The str value can be accessed by self.input.value.
+    self.input = None
+
+
+class OptionsForm:
+  """A form visualized to take inputs from users in IPython Notebooks and
+  generate PipelineOptions to run pipelines.
+  """
+  def __init__(self):
+    self.options = PipelineOptions()
+    self.entries = []
+
+  def add(self, entry: OptionsEntry) -> 'OptionsForm':
+    """Adds an OptionsEntry to the form.
+    """
+    self.entries.append(entry)
+    return self
+
+  def to_options(self) -> PipelineOptions:
+    """Builds the PipelineOptions based on user inputs.
+
+    Can only be invoked after display_for_input.
+    """
+    for entry in self.entries:
+      assert entry.input, (
+          'to_options invoked before display_for_input. '
+          'Wrong usage.')
+      view = self.options.view_as(entry.cls)
+      if isinstance(entry.arg_builder, str):
+        setattr(view, entry.arg_builder, entry.input.value)
+      else:
+        for arg, builder in entry.arg_builder.items():
+          if builder:
+            setattr(view, arg, builder(entry.input.value))
+          else:
+            setattr(view, arg, entry.input.value)
+    self.additional_options()
+    return self.options
+
+  def additional_options(self):
+    """Alters the self.options with additional config."""
+    pass

Review comment:
       Is this method overridden somewhere?

##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -123,3 +134,299 @@ def pformat_namedtuple(schema: NamedTuple) -> str:
           '{}: {}'.format(k, v.__name__) for k,
           v in schema.__annotations__.items()
       ]))
+
+
+@dataclass
+class OptionsEntry:
+  """An entry of PipelineOptions that can be visualized through ipywidgets to
+  take inputs in IPython notebooks interactively.
+
+  Attributes:
+    label: The value of the Label widget.
+    help: The help message of the entry, usually the same to the help in
+      PipelineOptions.
+    cls: The PipelineOptions class/subclass the options belong to.
+    arg_builder: Builds the argument/option. If it's a str, this entry
+      assigns the input ipywidget's value directly to the argument. If it's a
+      Dict, use the corresponding Callable to assign the input value to each
+      argument. If Callable is None, fallback to assign the input value
+      directly. This allows building multiple similar PipelineOptions
+      arguments from a single input, such as staging_location and
+      temp_location in GoogleCloudOptions.
+    default: The default value of the entry, None if absent.
+  """
+  label: str
+  help: str
+  cls: Type[PipelineOptions]
+  arg_builder: Union[str, Dict[str, Optional[Callable]]]
+  default: Optional[str] = None
+
+  def __post_init__(self):
+    # The attribute holds an ipywidget, currently only supports Text.
+    # The str value can be accessed by self.input.value.
+    self.input = None
+
+
+class OptionsForm:
+  """A form visualized to take inputs from users in IPython Notebooks and
+  generate PipelineOptions to run pipelines.
+  """
+  def __init__(self):
+    self.options = PipelineOptions()
+    self.entries = []
+
+  def add(self, entry: OptionsEntry) -> 'OptionsForm':
+    """Adds an OptionsEntry to the form.
+    """
+    self.entries.append(entry)
+    return self
+
+  def to_options(self) -> PipelineOptions:
+    """Builds the PipelineOptions based on user inputs.
+
+    Can only be invoked after display_for_input.
+    """
+    for entry in self.entries:
+      assert entry.input, (
+          'to_options invoked before display_for_input. '
+          'Wrong usage.')
+      view = self.options.view_as(entry.cls)
+      if isinstance(entry.arg_builder, str):
+        setattr(view, entry.arg_builder, entry.input.value)
+      else:
+        for arg, builder in entry.arg_builder.items():
+          if builder:
+            setattr(view, arg, builder(entry.input.value))
+          else:
+            setattr(view, arg, entry.input.value)
+    self.additional_options()
+    return self.options
+
+  def additional_options(self):
+    """Alters the self.options with additional config."""
+    pass
+
+  def display_for_input(self) -> 'OptionsForm':
+    """Displays the widgets to take user inputs."""
+    from IPython.display import display
+    from ipywidgets import GridBox
+    from ipywidgets import Label
+    from ipywidgets import Layout
+    from ipywidgets import Text
+    widgets = []
+    for entry in self.entries:
+      text_label = Label(value=entry.label)
+      text_input = entry.input if entry.input else Text(
+          value=entry.default if entry.default else '')
+      text_help = Label(value=entry.help)
+      entry.input = text_input
+      widgets.append(text_label)
+      widgets.append(text_input)
+      widgets.append(text_help)
+    grid = GridBox(widgets, layout=Layout(grid_template_columns='1fr 2fr 6fr'))
+    display(grid)
+    self.display_actions()
+    return self
+
+  def display_actions(self):
+    """Displays actionable widgets to utilize the options, run pipelines and
+    etc."""
+    pass
+
+
+class DataflowOptionsForm(OptionsForm):
+  """A form to take inputs from users in IPython Notebooks to build
+  PipelineOptions to run pipelines on Dataflow.
+
+  Only contains minimum fields needed.
+  """
+  @staticmethod
+  def _build_default_project() -> str:
+    """Builds a default project id."""
+    try:
+      # pylint: disable=c-extension-no-member
+      import google.auth
+      return google.auth.default()[1]
+    except (KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as e:
+      _LOGGER.warning('There is some issue with your gcloud auth: %s', e)
+      return 'your-project-id'
+
+  @staticmethod
+  def _build_req_file_from_pkgs(pkgs) -> Optional[str]:
+    """Builds a requirements file that contains all additional PYPI packages
+    needed."""
+    if pkgs:
+      deps = pkgs.split()
+      req_file = os.path.join(
+          tempfile.mkdtemp(prefix='beam-sql-dataflow-'), 'req.txt')
+      with open(req_file, 'a') as f:
+        for dep in deps:
+          f.write(dep + '\n')
+      return req_file
+    return None
+
+  def __init__(
+      self,
+      output_name: str,
+      output_pcoll: beam.PCollection,
+      verbose: bool = False):
+    """Inits the OptionsForm for setting up Dataflow jobs."""
+    super().__init__()
+    self.p = output_pcoll.pipeline
+    self.output_name = output_name
+    self.output_pcoll = output_pcoll
+    self.verbose = verbose
+    self.notice_shown = False
+    self.add(
+        OptionsEntry(
+            label='Project Id',
+            help='Name of the Cloud project owning the Dataflow job.',
+            cls=GoogleCloudOptions,
+            arg_builder='project',
+            default=DataflowOptionsForm._build_default_project())
+    ).add(
+        OptionsEntry(
+            label='Region',
+            help='The Google Compute Engine region for creating Dataflow job.',
+            cls=GoogleCloudOptions,
+            arg_builder='region',
+            default='us-central1')
+    ).add(
+        OptionsEntry(
+            label='GCS Bucket',
+            help=(
+                'GCS path to stage code packages needed by workers and save '
+                'temporary workflow jobs.'),
+            cls=GoogleCloudOptions,
+            arg_builder={
+                'staging_location': lambda x: x + '/staging',
+                'temp_location': lambda x: x + '/temp'
+            },
+            default='gs://YOUR_GCS_BUCKET_HERE')
+    ).add(
+        OptionsEntry(
+            label='Additional Packages',
+            help=(
+                'PYPI packages installed, whitespace separated. If None, leave '
+                'this field empty.'),
+            cls=SetupOptions,
+            arg_builder={
+                'requirements_file': lambda x: DataflowOptionsForm.
+                _build_req_file_from_pkgs(x)
+            },
+            default=''))
+
+  def additional_options(self):
+    # Use the latest Java SDK by default.
+    sdk_overrides = self.options.view_as(
+        WorkerOptions).sdk_harness_container_image_overrides
+    override = '.*java.*,apache/beam_java11_sdk:latest'
+    if sdk_overrides and override not in sdk_overrides:
+      sdk_overrides.append(override)
+    else:
+      self.options.view_as(
+          WorkerOptions).sdk_harness_container_image_overrides = [override]
+
+  def display_actions(self):
+    from IPython.display import HTML
+    from IPython.display import display
+    from ipywidgets import Button
+    from ipywidgets import GridBox
+    from ipywidgets import Layout
+    from ipywidgets import Output
+    output_area = Output()
+    run_btn = Button(
+        description='Run on Dataflow',
+        button_style='success',
+        tooltip=(
+            'Submit to Dataflow for execution with the configured options. The '
+            'output PCollection\'s data will be written to the GCS bucket you '
+            'configure.'))
+    show_options_btn = Button(
+        description='Show Options',
+        button_style='info',
+        tooltip='Show current pipeline options configured.')
+
+    def _run_on_dataflow(btn):
+      with output_area:
+
+        @progress_indicated
+        def _inner():
+          options = self.to_options()
+          # Caches the output_pcoll to a GCS bucket.
+          try:
+            output_location = '{}/{}'.format(
+                options.view_as(GoogleCloudOptions).staging_location,
+                self.output_name)
+            _ = self.output_pcoll | 'WriteOuput{}ToGCS'.format(
+                self.output_name) >> WriteToText(output_location)
+            _LOGGER.info(
+                'Data of output PCollection %s will be written to %s',
+                self.output_name,
+                output_location)
+          except (KeyboardInterrupt, SystemExit):
+            raise
+          except:  # pylint: disable=bare-except
+            # The transform has been added before, noop.
+            pass
+          if self.verbose:
+            _LOGGER.info(
+                'Running the pipeline on Dataflow with pipeline options %s.',
+                options.display_data())

Review comment:
       Can we pretty print the options? They're a little hard to read at the moment.

##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -77,21 +86,23 @@ def find_pcolls(
     if verbose:
       _LOGGER.info('Found PCollections used in the magic: %s.', found)
       _LOGGER.info('Collecting data...')
-    for name, pcoll in found.items():
-      try:
-        _ = ib.collect(pcoll)
-      except (KeyboardInterrupt, SystemExit):
-        raise
-      except:
-        _LOGGER.error(
-            'Cannot collect data for PCollection %s. Please make sure the '
-            'PCollections queried in the sql "%s" are all from a single '
-            'pipeline using an InteractiveRunner. Make sure there is no '
-            'ambiguity, for example, same named PCollections from multiple '
-            'pipelines or notebook re-executions.',
-            name,
-            sql)
-        raise
+    if run:
+      from apache_beam.runners.interactive import interactive_beam as ib
+      for name, pcoll in found.items():
+        try:
+          _ = ib.collect(pcoll)
+        except (KeyboardInterrupt, SystemExit):
+          raise
+        except:
+          _LOGGER.error(
+              'Cannot collect data for PCollection %s. Please make sure the '
+              'PCollections queried in the sql "%s" are all from a single '
+              'pipeline using an InteractiveRunner. Make sure there is no '
+              'ambiguity, for example, same named PCollections from multiple '
+              'pipelines or notebook re-executions.',
+              name,
+              sql)
+          raise

Review comment:
       It feels weird to also be running a pipeline when the method is named "find_pcolls". Can you please move this functionality out from the find_pcolls into its own method?

##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils.py
##########
@@ -123,3 +134,299 @@ def pformat_namedtuple(schema: NamedTuple) -> str:
           '{}: {}'.format(k, v.__name__) for k,
           v in schema.__annotations__.items()
       ]))
+
+
+@dataclass
+class OptionsEntry:
+  """An entry of PipelineOptions that can be visualized through ipywidgets to
+  take inputs in IPython notebooks interactively.
+
+  Attributes:
+    label: The value of the Label widget.
+    help: The help message of the entry, usually the same to the help in
+      PipelineOptions.
+    cls: The PipelineOptions class/subclass the options belong to.
+    arg_builder: Builds the argument/option. If it's a str, this entry
+      assigns the input ipywidget's value directly to the argument. If it's a
+      Dict, use the corresponding Callable to assign the input value to each
+      argument. If Callable is None, fallback to assign the input value
+      directly. This allows building multiple similar PipelineOptions
+      arguments from a single input, such as staging_location and
+      temp_location in GoogleCloudOptions.
+    default: The default value of the entry, None if absent.
+  """
+  label: str
+  help: str
+  cls: Type[PipelineOptions]
+  arg_builder: Union[str, Dict[str, Optional[Callable]]]
+  default: Optional[str] = None
+
+  def __post_init__(self):
+    # The attribute holds an ipywidget, currently only supports Text.
+    # The str value can be accessed by self.input.value.
+    self.input = None
+
+
+class OptionsForm:
+  """A form visualized to take inputs from users in IPython Notebooks and
+  generate PipelineOptions to run pipelines.
+  """
+  def __init__(self):
+    self.options = PipelineOptions()
+    self.entries = []
+
+  def add(self, entry: OptionsEntry) -> 'OptionsForm':
+    """Adds an OptionsEntry to the form.
+    """
+    self.entries.append(entry)
+    return self
+
+  def to_options(self) -> PipelineOptions:
+    """Builds the PipelineOptions based on user inputs.
+
+    Can only be invoked after display_for_input.
+    """
+    for entry in self.entries:
+      assert entry.input, (
+          'to_options invoked before display_for_input. '
+          'Wrong usage.')
+      view = self.options.view_as(entry.cls)
+      if isinstance(entry.arg_builder, str):
+        setattr(view, entry.arg_builder, entry.input.value)
+      else:
+        for arg, builder in entry.arg_builder.items():
+          if builder:
+            setattr(view, arg, builder(entry.input.value))
+          else:
+            setattr(view, arg, entry.input.value)
+    self.additional_options()
+    return self.options
+
+  def additional_options(self):
+    """Alters the self.options with additional config."""
+    pass
+
+  def display_for_input(self) -> 'OptionsForm':
+    """Displays the widgets to take user inputs."""
+    from IPython.display import display
+    from ipywidgets import GridBox
+    from ipywidgets import Label
+    from ipywidgets import Layout
+    from ipywidgets import Text
+    widgets = []
+    for entry in self.entries:
+      text_label = Label(value=entry.label)
+      text_input = entry.input if entry.input else Text(
+          value=entry.default if entry.default else '')
+      text_help = Label(value=entry.help)
+      entry.input = text_input
+      widgets.append(text_label)
+      widgets.append(text_input)
+      widgets.append(text_help)
+    grid = GridBox(widgets, layout=Layout(grid_template_columns='1fr 2fr 6fr'))
+    display(grid)
+    self.display_actions()
+    return self
+
+  def display_actions(self):
+    """Displays actionable widgets to utilize the options, run pipelines and
+    etc."""
+    pass
+
+
+class DataflowOptionsForm(OptionsForm):
+  """A form to take inputs from users in IPython Notebooks to build
+  PipelineOptions to run pipelines on Dataflow.
+
+  Only contains minimum fields needed.
+  """
+  @staticmethod
+  def _build_default_project() -> str:
+    """Builds a default project id."""
+    try:
+      # pylint: disable=c-extension-no-member
+      import google.auth
+      return google.auth.default()[1]
+    except (KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as e:
+      _LOGGER.warning('There is some issue with your gcloud auth: %s', e)
+      return 'your-project-id'
+
+  @staticmethod
+  def _build_req_file_from_pkgs(pkgs) -> Optional[str]:
+    """Builds a requirements file that contains all additional PYPI packages
+    needed."""
+    if pkgs:
+      deps = pkgs.split()
+      req_file = os.path.join(
+          tempfile.mkdtemp(prefix='beam-sql-dataflow-'), 'req.txt')
+      with open(req_file, 'a') as f:
+        for dep in deps:
+          f.write(dep + '\n')
+      return req_file
+    return None
+
+  def __init__(
+      self,
+      output_name: str,
+      output_pcoll: beam.PCollection,
+      verbose: bool = False):
+    """Inits the OptionsForm for setting up Dataflow jobs."""
+    super().__init__()
+    self.p = output_pcoll.pipeline
+    self.output_name = output_name
+    self.output_pcoll = output_pcoll
+    self.verbose = verbose
+    self.notice_shown = False
+    self.add(
+        OptionsEntry(
+            label='Project Id',
+            help='Name of the Cloud project owning the Dataflow job.',
+            cls=GoogleCloudOptions,
+            arg_builder='project',
+            default=DataflowOptionsForm._build_default_project())
+    ).add(
+        OptionsEntry(
+            label='Region',
+            help='The Google Compute Engine region for creating Dataflow job.',
+            cls=GoogleCloudOptions,
+            arg_builder='region',
+            default='us-central1')
+    ).add(
+        OptionsEntry(
+            label='GCS Bucket',
+            help=(
+                'GCS path to stage code packages needed by workers and save '
+                'temporary workflow jobs.'),
+            cls=GoogleCloudOptions,
+            arg_builder={
+                'staging_location': lambda x: x + '/staging',
+                'temp_location': lambda x: x + '/temp'
+            },
+            default='gs://YOUR_GCS_BUCKET_HERE')
+    ).add(
+        OptionsEntry(
+            label='Additional Packages',
+            help=(
+                'PYPI packages installed, whitespace separated. If None, leave '
+                'this field empty.'),

Review comment:
       Can this instead be comma-separated? This feels more natural

##########
File path: sdks/python/apache_beam/runners/interactive/sql/utils_test.py
##########
@@ -81,5 +73,35 @@ def test_pformat_namedtuple(self):
         'ANamedTuple(a: int, b: str)', pformat_namedtuple(ANamedTuple))
 
 
+@unittest.skipIf(
+    not ie.current_env().is_interactive_ready,
+    '[interactive] dependency is not installed.')
+@pytest.mark.skipif(
+    not ie.current_env().is_interactive_ready,
+    reason='[interactive] dependency is not installed.')
+class OptionsFormTest(unittest.TestCase):
+  def test_dataflow_options_form(self):
+    p = beam.Pipeline()
+    pcoll = p | beam.Create([1, 2, 3])
+    with patch('google.auth') as ga:
+      ga.default = lambda: ['', 'default_project_id']
+      df_form = DataflowOptionsForm('pcoll', pcoll)
+      df_form.display_for_input()
+      df_form.entries[2].input.value = 'gs://test-bucket'
+      df_form.entries[3].input.value = 'a-pkg'
+      options = df_form.to_options()
+      self.assertEqual(
+          options.view_as(GoogleCloudOptions).project, 'default_project_id')
+      self.assertEqual(
+          options.view_as(GoogleCloudOptions).region, 'us-central1')
+      self.assertEqual(
+          options.view_as(GoogleCloudOptions).staging_location,
+          'gs://test-bucket/staging')
+      self.assertEqual(
+          options.view_as(GoogleCloudOptions).temp_location,
+          'gs://test-bucket/temp')

Review comment:
       It's cleaner to set a local variable from the options.view_as(GoogleCloudOptions) then do assertions on it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org