You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/08 21:15:54 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #23094: Concept guide on orchestrating Beam preprocessing

damccorm commented on code in PR #23094:
URL: https://github.com/apache/beam/pull/23094#discussion_r966382199


##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/Dockerfile:
##########
@@ -0,0 +1,12 @@
+FROM python:3.8-slim
+
+# optional install extra dependencies
+# TODO
+
+# install pypi dependencies
+COPY requirements.txt /
+RUN python3 -m pip install --no-cache-dir -r requirements.txt

Review Comment:
   I don't mind leaving this, but probably would be worth calling out that requirements.txt is empty, but more dependencies can be added there. The empty file threw me off.



##########
website/www/site/content/en/documentation/ml/orchestration-test.md:
##########
@@ -0,0 +1,16 @@
+---
+title: "Orchestration Test"

Review Comment:
   Do we need this file?



##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. One of the central concepts to the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam pipeline is a DAG that can be constructed through the Beam SDK in your programming language of choice (from the set of supported beam SDKs). Each node of this DAG represents a processing step (PTransform) that accepts a collection of data as input (PCollection) and outputs a transformed collection of data (PCollection). The edges define how data flows through the pipeline from one processing step to another. The image below shows an example of such a pipeline.  
+
+![A standalone beam pipeline](/images/standalone-beam-pipeline.svg)
+
+Note that simply defining a pipeline and the corresponding DAG does not mean that data will start flowing through the pipeline. To actually execute the pipeline, it has to be deployed to one of the [supported Beam runners](https://beam.apache.org/documentation/runners/capability-matrix/). These distributed processing back-ends include Apache Flink, Apache Spark and Google Cloud Dataflow. A [Direct Runner](https://beam.apache.org/documentation/runners/direct/) is also provided to execute the pipeline locally on your machine for development and debugging purposes. Make sure to check out the [runner capability matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to guarantee that the chosen runner supports the data processing steps defined in your pipeline, especially when using the Direct Runner.  
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than training a model and calling it a day. In addition, a full ML workflow will often contain a range of other steps including data ingestion, data validation, data preprocessing, model evaluation, model deployment, data drift detection… On top of that, it’s essential to keep track of metadata and artifacts from your experiments to answer important questions like: What data was this model trained on and with which training parameters? When was this model deployed and which accuracy did it get on a test dataset? Without this knowledge at your disposal, it will become increasingly difficult to troubleshoot, monitor and improve your ML solutions as they grow in size.  
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices and guiding principles that aim to make the development and maintenance of machine learning systems seamless and efficient. Simply put, MLOps is most often about automating machine learning workflows throughout the model and data lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/), [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and [TFX](https://www.tensorflow.org/tfx/guide).  
+
+So what does all of this have to do with Beam? Well, since we established that Beam is a great tool for a range of ML tasks, a beam pipeline can either be used as a standalone data processing job or can be part of a larger sequence of steps in such a workflow. In the latter case, the beam DAG is just one node in the overarching DAG composed by the workflow orchestrator. This results in a DAG in a DAG, as illustrated by the example below.  
+
+![An beam pipeline as part of a larger orchestrated workflow](/images/orchestrated-beam-pipeline.svg)
+
+It is important to understand the key difference between the Beam DAG and the orchestrating DAG. The Beam DAG processes data and passes that data between the nodes of its DAG. The focus of Beam is on parallelization and enabling both batch and streaming jobs. In contrast, the orchestration DAG schedules and monitors steps in the workflow and passed between the nodes of the DAG are execution parameters, metadata and artifacts. An example of such an artifact could be a trained model or a dataset. Such artifacts are often passed by a reference URI and not by value.  
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be executed. [Natively supported orchestrators for TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow, Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the [TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):  
+> "Several TFX components rely on Beam for distributed data processing. In addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG. Beam orchestrator uses a different BeamRunner than the one which is used for component data processing."  
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used in production environments. It simply enables to debug the TFX pipeline locally on Beam’s DirectRunner without the need for the extra setup that is needed for Airflow or Kubeflow.

Review Comment:
   ```suggestion
   Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used in production environments. It simply enables debugging TFX pipelines locally on Beam’s DirectRunner without the need for the extra setup that is needed for Airflow or Kubeflow.
   ```



##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/Dockerfile:
##########
@@ -0,0 +1,12 @@
+FROM python:3.8-slim
+
+# optional install extra dependencies
+# TODO

Review Comment:
   Same as above, can we remove the hanging TODO



##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/Dockerfile:
##########
@@ -0,0 +1,12 @@
+FROM python:3.8-slim

Review Comment:
   Could we add a brief README at the root of this folder that points users to the website and describes what this example code is for? Otherwise people who come across it in the repo will probably be confused



##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py:
##########
@@ -0,0 +1,160 @@
+"""Dummy ingestion function that fetches data from one file and simply copies it to another."""
+import re
+import json
+import io
+import argparse
+import time
+from pathlib import Path
+
+import requests
+from PIL import Image, UnidentifiedImageError
+import numpy as np
+import torch
+import torchvision.transforms as T
+import torchvision.transforms.functional as TF
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+PROJECT_ID = "<project-id>"
+LOCATION = "<project-location>"
+STAGING_DIR = "<uri-to-data-flow-staging-dir>"
+BEAM_RUNNER = "<beam-runner>"
+
+# [START preprocess_component_argparse]
+def parse_args():
+  """Parse preprocessing arguments."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+    "--ingested-dataset-path", type=str,
+    help="Path to the ingested dataset")
+  parser.add_argument(
+    "--preprocessed-dataset-path", type=str,
+    help="The target directory for the ingested dataset.")
+  parser.add_argument(
+    "--base-artifact-path", type=str,
+    help="Base path to store pipeline artifacts.")
+  return parser.parse_args()
+# [END preprocess_component_argparse]
+
+
+def preprocess_dataset(
+  ingested_dataset_path: str,
+  preprocessed_dataset_path: str,
+  base_artifact_path: str):
+  """Preprocess the ingested raw dataset and write the result to avro format.
+
+  Args:
+    ingested_dataset_path (str): Path to the ingested dataset
+    preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved
+    base_artifact_path (str): path to the base directory of where artifacts can be stored for
+      this component.
+  """
+  # [START kfp_component_input_output]
+  timestamp = time.time()
+  target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}"
+
+  # the directory where the output file is created may or may not exists
+  # so we have to create it.
+  Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True)
+  with open(preprocessed_dataset_path, 'w') as f:
+    f.write(target_path)
+  # [END kfp_component_input_output]
+
+
+  # [START deploy_preprocessing_beam_pipeline]
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(
+    runner=BEAM_RUNNER,
+    project=PROJECT_ID,
+    job_name=f'preprocessing-{int(time.time())}',
+    temp_location=STAGING_DIR,
+    region=LOCATION,
+    requirements_file="/requirements.txt",
+    save_main_session = True,
+  )
+
+  with beam.Pipeline(options=pipeline_options) as pipeline:
+    (
+      pipeline
+      | "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path)

Review Comment:
   ```suggestion
         | "Read input json file" >> beam.io.ReadFromText(ingested_dataset_path)
   ```



##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/Dockerfile:
##########
@@ -0,0 +1,12 @@
+FROM python:3.8-slim
+
+# optional install extra dependencies
+# TODO

Review Comment:
   What is this hanging TODO for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org