You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damccorm (via GitHub)" <gi...@apache.org> on 2023/08/01 18:14:14 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #27399: [Python] Hugging Face pipeline support

damccorm commented on code in PR #27399:
URL: https://github.com/apache/beam/pull/27399#discussion_r1280973248


##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -109,6 +149,13 @@ def is_gpu_available_tensorflow(device):
   return True
 
 
+def _validate_constructor_args_hf_pipeline(task, model):
+  if not task and not model:
+    raise RuntimeError(
+        'Please provide both task and model to HuggingFacePipelineModelHandler.'
+        'If the model already defines the task, no need to specify the task.')

Review Comment:
   If we have a task, do we always need a model? Should this error just be `Please provide either a task or a model to HuggingFacePipelineModelHandler` instead of asking the user to provide both?



##########
sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py:
##########
@@ -74,6 +75,37 @@ def test_hf_language_modeling(self):
       predicted_predicted_text = predictions_dict[text]
       self.assertEqual(actual_predicted_text, predicted_predicted_text)
 
+  def test_hf_pipeline(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    # Path to text file containing some questions and context
+    input_file = 'gs://apache-beam-ml/datasets/custom/questions.txt'
+    output_file_dir = 'gs://apache-beam-ml/hf/testing/predictions'

Review Comment:
   Will this break if the underlying model changes? Could we pin to a single revision?



##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -483,6 +533,130 @@ def share_model_across_processes(self) -> bool:
   def get_metrics_namespace(self) -> str:
     """
     Returns:
-        A namespace for metrics collected by the RunInference transform.
+       A namespace for metrics collected by the RunInference transform.
     """
-    return "BeamML_HuggingFaceModelHandler_Tensor"
+    return 'BeamML_HuggingFaceModelHandler_Tensor'
+
+
+def _convert_to_result(
+    batch: Iterable,
+    predictions: Union[Iterable, Dict[Any, Iterable]],
+    model_id: Optional[str] = None,
+) -> Iterable[PredictionResult]:
+  return [
+      PredictionResult(x, y, model_id) for x, y in zip(batch, [predictions])
+  ]
+
+
+def _default_pipeline_inference_fn(
+    batch, model, inference_args) -> Iterable[PredictionResult]:

Review Comment:
   Nit: Could we change model to pipeline in this function and others? I think that will help for readability, and some might try to use this as an example of what to do.



##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -483,6 +533,130 @@ def share_model_across_processes(self) -> bool:
   def get_metrics_namespace(self) -> str:
     """
     Returns:
-        A namespace for metrics collected by the RunInference transform.
+       A namespace for metrics collected by the RunInference transform.
     """
-    return "BeamML_HuggingFaceModelHandler_Tensor"
+    return 'BeamML_HuggingFaceModelHandler_Tensor'
+
+
+def _convert_to_result(
+    batch: Iterable,
+    predictions: Union[Iterable, Dict[Any, Iterable]],
+    model_id: Optional[str] = None,
+) -> Iterable[PredictionResult]:
+  return [
+      PredictionResult(x, y, model_id) for x, y in zip(batch, [predictions])
+  ]
+
+
+def _default_pipeline_inference_fn(
+    batch, model, inference_args) -> Iterable[PredictionResult]:
+  predicitons = model(batch, **inference_args)
+  return predicitons
+
+
+class HuggingFacePipelineModelHandler(ModelHandler[str,
+                                                   PredictionResult,
+                                                   Pipeline]):
+  def __init__(
+      self,
+      task: Union[str, PipelineTask] = "",
+      model=None,
+      *,
+      inference_fn: PipelineInferenceFn = _default_pipeline_inference_fn,
+      load_model_args: Optional[Dict[str, Any]] = None,

Review Comment:
   Should this be `load_pipeline_args` per https://docs.google.com/document/d/107T71QHMxvu67PyG-mkHwRhm7W2-07jihnqUMz-yuEM/edit



##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -71,6 +72,39 @@
     Iterable[PredictionResult]]
 
 
+class PipelineTask(str, Enum):

Review Comment:
   Could you add a short docstring explaining the purpose of this Enum?



##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -483,6 +533,130 @@ def share_model_across_processes(self) -> bool:
   def get_metrics_namespace(self) -> str:
     """
     Returns:
-        A namespace for metrics collected by the RunInference transform.
+       A namespace for metrics collected by the RunInference transform.
     """
-    return "BeamML_HuggingFaceModelHandler_Tensor"
+    return 'BeamML_HuggingFaceModelHandler_Tensor'
+
+
+def _convert_to_result(
+    batch: Iterable,
+    predictions: Union[Iterable, Dict[Any, Iterable]],
+    model_id: Optional[str] = None,
+) -> Iterable[PredictionResult]:
+  return [
+      PredictionResult(x, y, model_id) for x, y in zip(batch, [predictions])
+  ]
+
+
+def _default_pipeline_inference_fn(
+    batch, model, inference_args) -> Iterable[PredictionResult]:
+  predicitons = model(batch, **inference_args)
+  return predicitons
+
+
+class HuggingFacePipelineModelHandler(ModelHandler[str,
+                                                   PredictionResult,
+                                                   Pipeline]):
+  def __init__(
+      self,
+      task: Union[str, PipelineTask] = "",
+      model=None,
+      *,
+      inference_fn: PipelineInferenceFn = _default_pipeline_inference_fn,
+      load_model_args: Optional[Dict[str, Any]] = None,
+      inference_args: Optional[Dict[str, Any]] = None,
+      min_batch_size: Optional[int] = None,
+      max_batch_size: Optional[int] = None,
+      large_model: bool = False,
+      **kwargs):
+    """
+    Implementation of the ModelHandler interface for Hugging Face Pipelines.
+
+    **Note:** To specify which device to use (CPU/GPU),
+    use the load_model_args with key-value as you would do in the usual
+    Hugging Face pipeline. Ex: load_model_args={'device':0})
+
+    Example Usage model::
+      pcoll | RunInference(HuggingFacePipelineModelHandler(
+        task="fill-mask"))
+
+    Args:
+      task (str or enum.Enum): task supported by HuggingFace Pipelines.
+        Accepts a string task or an enum.Enum from PipelineTask.
+      model : path to pretrained model on Hugging Face Models Hub to use custom
+        model for the chosen task. If the model already defines the task then
+        no need to specify the task parameter.
+      inference_fn: the inference function to use during RunInference.
+        Default is _default_pipeline_inference_fn.
+      load_model_args (Dict[str, Any]): keyword arguments to provide load
+        options while loading models from Hugging Face Hub. Defaults to None.
+      inference_args (Dict[str, Any]): Non-batchable arguments
+        required as inputs to the model's inference function.
+        Defaults to None.
+      min_batch_size: the minimum batch size to use when batching inputs.
+      max_batch_size: the maximum batch size to use when batching inputs.
+      large_model: set to true if your model is large enough to run into
+        memory pressure if you load multiple copies. Given a model that
+        consumes N memory and a machine with W cores and M memory, you should
+        set this to True if N*W > M.
+      kwargs: 'env_vars' can be used to set environment variables
+        before loading the model.
+
+    **Supported Versions:** HuggingFacePipelineModelHandler supports
+    transformers>=4.18.0.
+    """
+    self._task = task
+    self._model = model
+    self._inference_fn = inference_fn
+    self._load_model_args = load_model_args if load_model_args else {}
+    self._inference_args = inference_args if inference_args else {}
+    self._batching_kwargs = {}
+    self._framework = "torch"
+    self._env_vars = kwargs.get('env_vars', {})
+    if min_batch_size is not None:
+      self._batching_kwargs['min_batch_size'] = min_batch_size
+    if max_batch_size is not None:
+      self._batching_kwargs['max_batch_size'] = max_batch_size
+    self._large_model = large_model
+    _validate_constructor_args_hf_pipeline(self._task, self._model)
+
+  def load_model(self):
+    return pipeline(task=self._task, model=self._model, **self._load_model_args)
+
+  def run_inference(
+      self,
+      batch: Sequence[str],
+      model: Pipeline,
+      inference_args: Optional[Dict[str, Any]] = None
+  ) -> Iterable[PredictionResult]:
+    """
+    Runs inferences on a batch of examples passed as a string resource.
+    These can either be string sentences, or string path to images or
+    audio files.
+
+    Args:
+      batch: A sequence of strings resources.
+      model: A Hugging Face Pipeline.
+      inference_args: Non-batchable arguments required as inputs to the model's
+        inference function.
+    Returns:
+      An Iterable of type PredictionResult.
+    """
+    inference_args = {} if not inference_args else inference_args
+    predictions = self._inference_fn(batch, model, inference_args)
+    return _convert_to_result(batch, predictions)
+
+  def update_model_path(self, model_path: Optional[str] = None):
+    self._model = model_path if model_path else self._model

Review Comment:
   I'm trying to figure out what model updates mean in the context of hugging face pipelines. I imagine not too many people would do it, but I think leaving it makes sense as is. If they do updates, its unlikely to be a task update. Could you add a brief docstring to this function explaining what model updates mean.



##########
sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py:
##########
@@ -74,6 +75,37 @@ def test_hf_language_modeling(self):
       predicted_predicted_text = predictions_dict[text]
       self.assertEqual(actual_predicted_text, predicted_predicted_text)
 
+  def test_hf_pipeline(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    # Path to text file containing some questions and context
+    input_file = 'gs://apache-beam-ml/datasets/custom/questions.txt'
+    output_file_dir = 'gs://apache-beam-ml/hf/testing/predictions'

Review Comment:
   Ooh, actually same question about the existing tests



##########
sdks/python/apache_beam/examples/inference/huggingface_question_answering.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Question Answering using the
+model from Hugging Face Models Hub.
+
+This pipeline takes questions and context from a custom text file separated by
+a semicolon. These are converted to SquadExamples by using the utility provided
+by transformers.QuestionAnsweringPipeline and passed to the model handler.
+We just provide the model name here because the model repository specifies the
+task that it will do. The pipeline then writes the prediction to an output
+file in which users can then compare against the original context.
+"""
+
+import argparse
+import logging
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler
+from apache_beam.ml.inference.huggingface_inference import PipelineTask
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from transformers import QuestionAnsweringPipeline
+
+
+class PostProcessor(beam.DoFn):
+  """Processes the PredictionResult to get the predicted answer.
+
+  Hugging Face Pipeline for Question Answering returns a dictionary
+  with score, start and end index of answer and the answer.
+  """
+  def process(self, result: Tuple[str, PredictionResult]) -> Iterable[str]:
+    text, prediction = result
+    predicted_answer = prediction.inference['answer']
+    yield text + ';' + predicted_answer
+
+
+def preprocess(text):
+  if len(text.strip()) > 0:
+    question, context = text.split(';')
+    yield (question, context)
+
+
+def create_squad_example(text):
+  question, context = text
+  yield question, QuestionAnsweringPipeline.create_sample(question, context)

Review Comment:
   Could you add a docstring to this function explaining what it is doing? Probably good to do that for the other preprocessing function too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org