You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/11/30 17:59:54 UTC

[beam] branch master updated: Add Large Language Model RunInference Example (#24350)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 050288545cc Add Large Language Model RunInference Example (#24350)
050288545cc is described below

commit 050288545ccfe80f6d5f660864716be81e23c05d
Author: Shubham Krishna <sh...@gmail.com>
AuthorDate: Wed Nov 30 23:29:47 2022 +0530

    Add Large Language Model RunInference Example (#24350)
    
    * Add Large Language Model RunInference Example
    
    * Fix formatting and linting issues
    
    * Fix formatting and linting issues
    
    * Fix pylinting
    
    * Fix package import pylint issue
    
    * Fix package import pylint issue
    
    * Adapt example according to latest RunInference changes
    
    * Fix linting and formating issues
    
    * Add documentation
    
    * Add missing link
    
    * Replace closing shortcode with opening shortcode
    
    * Seperate pipeline_args and known_args
    
    * Improve docstrings and documentation
    
    * Improve docstrings and documentation
    
    Co-authored-by: Shubham Krishna <“shubham.krishna@ml6.eu”>
---
 .../inference/large_language_modeling/main.py      | 140 +++++++++++++++++++++
 .../large_language_modeling/requirements.txt       |  21 ++++
 .../en/documentation/ml/large-language-modeling.md |  73 +++++++++++
 .../site/content/en/documentation/ml/overview.md   |   3 +-
 .../partials/section-menu/en/documentation.html    |   1 +
 5 files changed, 237 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/examples/inference/large_language_modeling/main.py b/sdks/python/apache_beam/examples/inference/large_language_modeling/main.py
new file mode 100644
index 00000000000..a373f8377e1
--- /dev/null
+++ b/sdks/python/apache_beam/examples/inference/large_language_modeling/main.py
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+
+""""A pipeline that uses RunInference to perform translation
+with a T5 language model.
+
+This pipeline takes a list of english sentences and then uses
+the T5ForConditionalGeneration from Hugging Face to translate the
+english sentence into german.
+"""
+import argparse
+import sys
+
+import apache_beam as beam
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
+from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn
+from apache_beam.options.pipeline_options import PipelineOptions
+from transformers import AutoConfig
+from transformers import AutoTokenizer
+from transformers import T5ForConditionalGeneration
+
+
+class Preprocess(beam.DoFn):
+  def __init__(self, tokenizer: AutoTokenizer):
+    self._tokenizer = tokenizer
+
+  def process(self, element):
+    """
+        Process the raw text input to a format suitable for
+        T5ForConditionalGeneration model inference
+
+        Args:
+          element: A string of text
+
+        Returns:
+          A tokenized example that can be read by the
+          T5ForConditionalGeneration
+        """
+    input_ids = self._tokenizer(
+        element, return_tensors="pt", padding="max_length",
+        max_length=512).input_ids
+    return input_ids
+
+
+class Postprocess(beam.DoFn):
+  def __init__(self, tokenizer: AutoTokenizer):
+    self._tokenizer = tokenizer
+
+  def process(self, element):
+    """
+        Process the PredictionResult to print the translated texts
+
+        Args:
+          element: The RunInference output to be processed.
+        """
+    decoded_inputs = self._tokenizer.decode(
+        element.example, skip_special_tokens=True)
+    decoded_outputs = self._tokenizer.decode(
+        element.inference, skip_special_tokens=True)
+    print(f"{decoded_inputs} \t Output: {decoded_outputs}")
+
+
+def parse_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      "--model_state_dict_path",
+      dest="model_state_dict_path",
+      required=True,
+      help="Path to the model's state_dict.",
+  )
+  parser.add_argument(
+      "--model_name",
+      dest="model_name",
+      required=True,
+      help="Path to the model's state_dict.",
+      default="t5-small",
+  )
+
+  return parser.parse_known_args(args=argv)
+
+
+def run():
+  """
+    Runs the interjector pipeline which translates English sentences
+    into German using the RunInference API. """
+
+  known_args, pipeline_args = parse_args(sys.argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+
+  gen_fn = make_tensor_model_fn('generate')
+  model_handler = PytorchModelHandlerTensor(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=T5ForConditionalGeneration,
+      model_params={
+          "config": AutoConfig.from_pretrained(known_args.model_name)
+      },
+      device="cpu",
+      inference_fn=gen_fn)
+
+  eng_sentences = [
+      "The house is wonderful.",
+      "I like to work in NYC.",
+      "My name is Shubham.",
+      "I want to work for Google.",
+      "I am from India."
+  ]
+  task_prefix = "translate English to German: "
+  task_sentences = [task_prefix + sentence for sentence in eng_sentences]
+  tokenizer = AutoTokenizer.from_pretrained(known_args.model_name)
+
+  # [START Pipeline]
+  with beam.Pipeline(options=pipeline_options) as pipeline:
+    _ = (
+        pipeline
+        | "CreateInputs" >> beam.Create(task_sentences)
+        | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
+        | "RunInference" >> RunInference(model_handler=model_handler)
+        | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer)))
+  # [END Pipeline]
+
+
+if __name__ == "__main__":
+  run()
diff --git a/sdks/python/apache_beam/examples/inference/large_language_modeling/requirements.txt b/sdks/python/apache_beam/examples/inference/large_language_modeling/requirements.txt
new file mode 100644
index 00000000000..8c4ba6aeea8
--- /dev/null
+++ b/sdks/python/apache_beam/examples/inference/large_language_modeling/requirements.txt
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+
+torch==1.12.1
+transformers==4.24.0
+sentencepiece==0.1.97
\ No newline at end of file
diff --git a/website/www/site/content/en/documentation/ml/large-language-modeling.md b/website/www/site/content/en/documentation/ml/large-language-modeling.md
new file mode 100644
index 00000000000..7db18335dd1
--- /dev/null
+++ b/website/www/site/content/en/documentation/ml/large-language-modeling.md
@@ -0,0 +1,73 @@
+---
+title: "Large Language Model Inference in Beam"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# RunInference
+In Apache Beam 2.40.0, Beam introduced the RunInference API, which lets you deploy a machine learning model in a Beam pipeline. A `RunInference` transform performs inference on a `PCollection` of examples using a machine learning (ML) model. The transform outputs a PCollection that contains the input examples and output predictions. For more information, see RunInference [here](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/). You can also find [inferenc [...]
+
+
+## Using RunInference with very large models
+RunInference works well on arbitrarily large models as long as they can fit on your hardware.
+
+This example demonstrates running inference with a `T5` language model using `RunInference` in a pipeline. `T5` is an encoder-decoder model pre-trained on a multi-task mixture of unsupervised and supervised tasks. Each task is converted into a text-to-text format. The example uses `T5-11B`, which contains 11 billion parameters and is 45 GB in size. In  order to work well on a variety of tasks, `T5` prepends a different prefix to the input corresponding to each task. For example, for tran [...]
+
+### Run the Pipeline ?
+First, install the required packages and pass the required arguments.
+You can view the code on [GitHub](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/large_language_modeling/main.py)
+
+1. Locally on your machine: `python main.py --runner DirectRunner`. You need to have 45 GB of disk space available to run this example.
+2. On Google Cloud using Dataflow: `python main.py --runner DataflowRunner`
+
+### Pipeline Steps
+The pipeline contains the following steps:
+1. Read the inputs.
+2. Encode the text into transformer-readable token ID integers using a tokenizer.
+3. Use RunInference to get the output.
+4. Decode the RunInference output and print it.
+
+The following code snippet contains the four steps:
+
+{{< highlight >}}
+    with beam.Pipeline(options=pipeline_options) as pipeline:
+        _ = (
+            pipeline
+            | "CreateInputs" >> beam.Create(task_sentences)
+            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
+            | "RunInference" >> RunInference(model_handler=model_handler)
+            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
+        )
+{{< /highlight >}}
+
+In the third step of pipeline we use `RunInference`.
+In order to use it, you must first define a `ModelHandler`. RunInference provides model handlers for `PyTorch`, `TensorFlow` and `Scikit-Learn`. Because the example uses a `PyTorch` model, it uses the `PyTorchModelHandlerTensor` model handler.
+
+{{< highlight >}}
+  gen_fn = make_tensor_model_fn('generate')
+
+  model_handler = PytorchModelHandlerTensor(
+      state_dict_path=args.model_state_dict_path,
+      model_class=T5ForConditionalGeneration,
+      model_params={"config": AutoConfig.from_pretrained(args.model_name)},
+      device="cpu",
+      inference_fn=gen_fn)
+{{< /highlight >}}
+
+A `ModelHandler` requires parameters like:
+* `state_dict_path` – The path to the saved dictionary of the model state.
+* `model_class` – The class of the Pytorch model that defines the model structure.
+* `model_params` – A dictionary of arguments required to instantiate the model class.
+* `device` – The device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.
+* `inference_fn` -  The inference function to use during RunInference.
diff --git a/website/www/site/content/en/documentation/ml/overview.md b/website/www/site/content/en/documentation/ml/overview.md
index d8df9f02cab..d2737f5fe38 100644
--- a/website/www/site/content/en/documentation/ml/overview.md
+++ b/website/www/site/content/en/documentation/ml/overview.md
@@ -89,4 +89,5 @@ You can find examples of end-to-end AI/ML pipelines for several use cases:
 * [ML Workflow Orchestration](/documentation/ml/orchestration): Illustrates how to orchestrate ML workflows consisting of multiple steps by using Kubeflow Pipelines and Tensorflow Extended.
 * [Multi model pipelines in Beam](/documentation/ml/multi-model-pipelines): Explains how multi-model pipelines work and gives an overview of what you need to know to build one using the RunInference API.
 * [Online Clustering in Beam](/documentation/ml/online-clustering): Demonstrates how to set up a real-time clustering pipeline that can read text from Pub/Sub, convert the text into an embedding using a transformer-based language model with the RunInference API, and cluster the text using BIRCH with stateful processing.
-* [Anomaly Detection in Beam](/documentation/ml/anomaly-detection): Demonstrates how to set up an anomaly detection pipeline that reads text from Pub/Sub in real time and then detects anomalies using a trained HDBSCAN clustering model with the RunInference API.
\ No newline at end of file
+* [Anomaly Detection in Beam](/documentation/ml/anomaly-detection): Demonstrates how to set up an anomaly detection pipeline that reads text from Pub/Sub in real time and then detects anomalies using a trained HDBSCAN clustering model with the RunInference API.
+* [Large Language Model Inference in Beam](/documentation/ml/large-language-modeling): Demonstrates a pipeline that uses RunInference to perform translation with the T5 language model which contains 11 billion parameters.
\ No newline at end of file
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index c4e10a607df..df00b9d099a 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -220,6 +220,7 @@
     <li><a href="/documentation/ml/online-clustering/">Online Clustering</a></li>
     <li><a href="/documentation/ml/runinference-metrics/">RunInference Metrics</a></li>
     <li><a href="/documentation/ml/anomaly-detection/">Anomaly Detection</a></li>
+    <li><a href="/documentation/ml/large-language-modeling">Large Language Model Inference in Beam</a></li>
   </ul>
 </li>
 <li class="section-nav-item--collapsible">