You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/12 15:35:09 UTC

[GitHub] [beam] ryanthompson591 commented on a diff in pull request #22131: TensorRT Initial commit

ryanthompson591 commented on code in PR #22131:
URL: https://github.com/apache/beam/pull/22131#discussion_r918961572


##########
sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py:
##########
@@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline that uses RunInference API to perform object detection with TensorRT."""
+
+import argparse
+import io
+import numpy as np
+import os
+from PIL import Image
+from typing import Iterable, Optional, Tuple
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import (KeyedModelHandler, PredictionResult,
+                                           RunInference)
+from apache_beam.ml.inference.tensorrt_inference import \
+    TensorRTEngineHandlerNumPy
+from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
+
+COCO_OBJ_DET_CLASSES = [
+    'person',
+    'bicycle',
+    'car',
+    'motorcycle',
+    'airplane',
+    'bus',
+    'train',
+    'truck',
+    'boat',
+    'traffic light',
+    'fire hydrant',
+    'street sign',
+    'stop sign',
+    'parking meter',
+    'bench',
+    'bird',
+    'cat',
+    'dog',
+    'horse',
+    'sheep',
+    'cow',
+    'elephant',
+    'bear',
+    'zebra',
+    'giraffe',
+    'hat',
+    'backpack',
+    'umbrella',
+    'shoe',
+    'eye glasses',
+    'handbag',
+    'tie',
+    'suitcase',
+    'frisbee',
+    'skis',
+    'snowboard',
+    'sports ball',
+    'kite',
+    'baseball bat',
+    'baseball glove',
+    'skateboard',
+    'surfboard',
+    'tennis racket',
+    'bottle',
+    'plate',
+    'wine glass',
+    'cup',
+    'fork',
+    'knife',
+    'spoon',
+    'bowl',
+    'banana',
+    'apple',
+    'sandwich',
+    'orange',
+    'broccoli',
+    'carrot',
+    'hot dog',
+    'pizza',
+    'donut',
+    'cake',
+    'chair',
+    'couch',
+    'potted plant',
+    'bed',
+    'mirror',
+    'dining table',
+    'window',
+    'desk',
+    'toilet',
+    'door',
+    'tv',
+    'laptop',
+    'mouse',
+    'remote',
+    'keyboard',
+    'cell phone',
+    'microwave',
+    'oven',
+    'toaster',
+    'sink',
+    'refrigerator',
+    'blender',
+    'book',
+    'clock',
+    'vase',
+    'scissors',
+    'teddy bear',
+    'hair drier',
+    'toothbrush',
+    'hair brush',
+]
+
+
+def attach_im_size_to_key(x):
+    width, height = x[1].size
+    return ((x[0], width, height), x[1])
+
+
+def read_image(image_file_name: str,
+               path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+    return image_file_name, data
+
+
+def preprocess_image(image: Image.Image) -> np.ndarray:
+  ssd_mobilenet_v2_320x320_input_dims = (300, 300)
+  image = image.resize(ssd_mobilenet_v2_320x320_input_dims, 
+    resample=Image.Resampling.BILINEAR)
+  image = np.expand_dims(np.asarray(image, dtype=np.float32), axis=0)
+  return image
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:

Review Comment:
   since this is sample code it would be nice to have a short description of what post processing is done and why.
   
   Something like:
   
   Converts a list of prediction results into a human readable string with the filename, prediction box location, score of the predicted objects, and the name of the predicted object.



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,213 @@
+#
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pycuda.driver as cuda
+import sys
+import tensorrt as trt
+from typing import Any, Dict, Iterable, Optional, Sequence
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler, PredictionResult
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+
+def _load_engine(engine_path):
+  file = FileSystems.open(engine_path, 'rb')
+  runtime = trt.Runtime(LOGGER)
+  engine = runtime.deserialize_cuda_engine(file.read())
+  assert engine
+  return engine
+
+
+def _load_onnx(onnx_path):
+  builder = trt.Builder(LOGGER)
+  network = builder.create_network(
+      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
+  parser = trt.OnnxParser(network, LOGGER)
+  with FileSystems.open(onnx_path) as f:
+    if not parser.parse(f.read()):
+      print("Failed to load ONNX file: {}".format(onnx_path))
+      for error in range(parser.num_errors):
+        print(parser.get_error(error))
+      sys.exit(1)
+  builder.reset()
+  return network
+
+
+def _build_engine(network):
+  builder = trt.Builder(LOGGER)
+  config = builder.create_builder_config()
+  runtime = trt.Runtime(LOGGER)
+  plan = builder.build_serialized_network(network, config)
+  engine = runtime.deserialize_cuda_engine(plan)
+  builder.reset()
+  return engine
+
+
+def _validate_inference_args(inference_args):
+  """Confirms that inference_args is None.
+
+  TensorRT engines do not need extra arguments in their execute_v2() call.
+  However, since inference_args is an argument in the RunInference interface,
+  we want to make sure it is not passed here in TensorRT's implementation of
+  RunInference.
+  """
+  if inference_args:
+    raise ValueError(
+        'inference_args were provided, but should be None because TensorRT '
+        'engines do not need extra arguments in their execute_v2() call.')
+
+
+class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
+                                              PredictionResult,
+                                              trt.ICudaEngine]):
+  def __init__(self, min_batch_size: int, max_batch_size: int, **kwargs):
+    """Implementation of the ModelHandler interface for TensorRT.
+
+    Example Usage:
+      pcoll | RunInference(
+        TensorRTEngineHandlerNumPy(
+          min_batch_size=1,
+          max_batch_size=1,
+          engine_path="my_uri"))
+
+    Args:
+      min_batch_size: minimum accepted batch size.
+      max_batch_size: maximum accepted batch size.
+      kwargs: Additional arguments like 'engine_path' and 'onnx_path' are
+      currently supported.
+
+    See https://docs.nvidia.com/deeplearning/tensorrt/api/python_api/
+    for details
+    """
+    self.min_batch_size = min_batch_size
+    self.max_batch_size = max_batch_size
+    if 'engine_path' in kwargs:
+      self.engine_path = kwargs.get('engine_path')
+    elif 'onnx_path' in kwargs:
+      self.onnx_path = kwargs.get('onnx_path')
+
+    trt.init_libnvinfer_plugins(LOGGER, namespace="")
+
+  def batch_elements_kwargs(self):
+    """Sets min_batch_size and max_batch_size of a TensorRT engine."""
+    return {
+        'min_batch_size': self.min_batch_size,
+        'max_batch_size': self.max_batch_size
+    }
+
+  def load_model(self) -> trt.ICudaEngine:
+    """Loads and initializes a TensorRT engine for processing."""
+    return _load_engine(self.engine_path)
+
+  def load_onnx(self) -> trt.INetworkDefinition:

Review Comment:
   I'm not really familiar with what an onnx model is. Is this only used for tests?



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,213 @@
+#
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long

Review Comment:
   @kennknowles Nvidia wants to add a copyright here. Personally I think it's fine, but just wanted to double check that Apache is fine.



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,213 @@
+#
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pycuda.driver as cuda
+import sys
+import tensorrt as trt
+from typing import Any, Dict, Iterable, Optional, Sequence
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler, PredictionResult
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+
+def _load_engine(engine_path):
+  file = FileSystems.open(engine_path, 'rb')
+  runtime = trt.Runtime(LOGGER)
+  engine = runtime.deserialize_cuda_engine(file.read())
+  assert engine
+  return engine
+
+
+def _load_onnx(onnx_path):
+  builder = trt.Builder(LOGGER)
+  network = builder.create_network(
+      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
+  parser = trt.OnnxParser(network, LOGGER)
+  with FileSystems.open(onnx_path) as f:
+    if not parser.parse(f.read()):
+      print("Failed to load ONNX file: {}".format(onnx_path))

Review Comment:
   Is it possible to log instead of print errors?  I'm just thinking that if this is done on workers users might not see printed messages.



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,213 @@
+#
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pycuda.driver as cuda
+import sys
+import tensorrt as trt
+from typing import Any, Dict, Iterable, Optional, Sequence
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler, PredictionResult
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+
+def _load_engine(engine_path):
+  file = FileSystems.open(engine_path, 'rb')
+  runtime = trt.Runtime(LOGGER)
+  engine = runtime.deserialize_cuda_engine(file.read())
+  assert engine
+  return engine
+
+
+def _load_onnx(onnx_path):
+  builder = trt.Builder(LOGGER)
+  network = builder.create_network(
+      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
+  parser = trt.OnnxParser(network, LOGGER)
+  with FileSystems.open(onnx_path) as f:
+    if not parser.parse(f.read()):
+      print("Failed to load ONNX file: {}".format(onnx_path))
+      for error in range(parser.num_errors):
+        print(parser.get_error(error))
+      sys.exit(1)
+  builder.reset()
+  return network
+
+
+def _build_engine(network):
+  builder = trt.Builder(LOGGER)
+  config = builder.create_builder_config()
+  runtime = trt.Runtime(LOGGER)
+  plan = builder.build_serialized_network(network, config)
+  engine = runtime.deserialize_cuda_engine(plan)
+  builder.reset()
+  return engine
+
+
+def _validate_inference_args(inference_args):
+  """Confirms that inference_args is None.
+
+  TensorRT engines do not need extra arguments in their execute_v2() call.
+  However, since inference_args is an argument in the RunInference interface,
+  we want to make sure it is not passed here in TensorRT's implementation of
+  RunInference.
+  """
+  if inference_args:
+    raise ValueError(
+        'inference_args were provided, but should be None because TensorRT '
+        'engines do not need extra arguments in their execute_v2() call.')
+
+
+class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
+                                              PredictionResult,
+                                              trt.ICudaEngine]):
+  def __init__(self, min_batch_size: int, max_batch_size: int, **kwargs):
+    """Implementation of the ModelHandler interface for TensorRT.
+
+    Example Usage:

Review Comment:
   I find the documentation generates better if you do this:
   
   ```
   Example Usage::
   
   pcoll | RunInference(
           TensorRTEngineHandlerNumPy(
             min_batch_size=1,
             max_batch_size=1,
             engine_path="my_uri"))
   ```
   
   Like how it's done in sklearn_inference.



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,213 @@
+#
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pycuda.driver as cuda
+import sys
+import tensorrt as trt
+from typing import Any, Dict, Iterable, Optional, Sequence
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler, PredictionResult
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+
+def _load_engine(engine_path):
+  file = FileSystems.open(engine_path, 'rb')
+  runtime = trt.Runtime(LOGGER)
+  engine = runtime.deserialize_cuda_engine(file.read())
+  assert engine
+  return engine
+
+
+def _load_onnx(onnx_path):
+  builder = trt.Builder(LOGGER)
+  network = builder.create_network(
+      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
+  parser = trt.OnnxParser(network, LOGGER)
+  with FileSystems.open(onnx_path) as f:
+    if not parser.parse(f.read()):
+      print("Failed to load ONNX file: {}".format(onnx_path))
+      for error in range(parser.num_errors):
+        print(parser.get_error(error))
+      sys.exit(1)
+  builder.reset()
+  return network
+
+
+def _build_engine(network):
+  builder = trt.Builder(LOGGER)
+  config = builder.create_builder_config()
+  runtime = trt.Runtime(LOGGER)
+  plan = builder.build_serialized_network(network, config)
+  engine = runtime.deserialize_cuda_engine(plan)
+  builder.reset()
+  return engine
+
+
+def _validate_inference_args(inference_args):
+  """Confirms that inference_args is None.
+
+  TensorRT engines do not need extra arguments in their execute_v2() call.
+  However, since inference_args is an argument in the RunInference interface,
+  we want to make sure it is not passed here in TensorRT's implementation of
+  RunInference.
+  """
+  if inference_args:
+    raise ValueError(
+        'inference_args were provided, but should be None because TensorRT '
+        'engines do not need extra arguments in their execute_v2() call.')
+
+
+class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
+                                              PredictionResult,
+                                              trt.ICudaEngine]):
+  def __init__(self, min_batch_size: int, max_batch_size: int, **kwargs):
+    """Implementation of the ModelHandler interface for TensorRT.
+
+    Example Usage:
+      pcoll | RunInference(
+        TensorRTEngineHandlerNumPy(
+          min_batch_size=1,
+          max_batch_size=1,
+          engine_path="my_uri"))
+
+    Args:
+      min_batch_size: minimum accepted batch size.
+      max_batch_size: maximum accepted batch size.
+      kwargs: Additional arguments like 'engine_path' and 'onnx_path' are
+      currently supported.
+
+    See https://docs.nvidia.com/deeplearning/tensorrt/api/python_api/
+    for details
+    """
+    self.min_batch_size = min_batch_size
+    self.max_batch_size = max_batch_size
+    if 'engine_path' in kwargs:
+      self.engine_path = kwargs.get('engine_path')
+    elif 'onnx_path' in kwargs:
+      self.onnx_path = kwargs.get('onnx_path')
+
+    trt.init_libnvinfer_plugins(LOGGER, namespace="")
+
+  def batch_elements_kwargs(self):
+    """Sets min_batch_size and max_batch_size of a TensorRT engine."""
+    return {
+        'min_batch_size': self.min_batch_size,
+        'max_batch_size': self.max_batch_size
+    }
+
+  def load_model(self) -> trt.ICudaEngine:
+    """Loads and initializes a TensorRT engine for processing."""
+    return _load_engine(self.engine_path)
+
+  def load_onnx(self) -> trt.INetworkDefinition:
+    """Loads and parses an onnx model for processing."""
+    return _load_onnx(self.onnx_path)
+
+  #
+  def build_engine(self, network: trt.INetworkDefinition) -> trt.ICudaEngine:
+    """Build an engine according to parsed/created network."""
+    return _build_engine(network)
+
+  def run_inference(
+      self,
+      batch: np.ndarray,
+      engine: trt.ICudaEngine,
+      inference_args: Optional[Dict[str, Any]] = None
+  ) -> Iterable[PredictionResult]:
+    """
+    Runs inferences on a batch of Tensors and returns an Iterable of
+    TensorRT Predictions.
+
+    Args:
+      batch: A np.ndarray or a np.ndarray that represents a concatenation
+        of multiple arrays as a batch.
+      engine: A TensorRT engine.
+      inference_args: Any additional arguments for an inference
+        that are not applicable to TensorRT.
+
+    Returns:
+      An Iterable of type PredictionResult.
+    """
+    _validate_inference_args(inference_args)
+    context = engine.create_execution_context()

Review Comment:
   yes, just for some context, anything that should only be done once per thread should be done in load_model if possible.



##########
sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py:
##########
@@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline that uses RunInference API to perform object detection with TensorRT."""
+
+import argparse
+import io
+import numpy as np
+import os
+from PIL import Image
+from typing import Iterable, Optional, Tuple
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import (KeyedModelHandler, PredictionResult,
+                                           RunInference)
+from apache_beam.ml.inference.tensorrt_inference import \
+    TensorRTEngineHandlerNumPy
+from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
+
+COCO_OBJ_DET_CLASSES = [
+    'person',
+    'bicycle',
+    'car',
+    'motorcycle',
+    'airplane',
+    'bus',
+    'train',
+    'truck',
+    'boat',
+    'traffic light',
+    'fire hydrant',
+    'street sign',
+    'stop sign',
+    'parking meter',
+    'bench',
+    'bird',
+    'cat',
+    'dog',
+    'horse',
+    'sheep',
+    'cow',
+    'elephant',
+    'bear',
+    'zebra',
+    'giraffe',
+    'hat',
+    'backpack',
+    'umbrella',
+    'shoe',
+    'eye glasses',
+    'handbag',
+    'tie',
+    'suitcase',
+    'frisbee',
+    'skis',
+    'snowboard',
+    'sports ball',
+    'kite',
+    'baseball bat',
+    'baseball glove',
+    'skateboard',
+    'surfboard',
+    'tennis racket',
+    'bottle',
+    'plate',
+    'wine glass',
+    'cup',
+    'fork',
+    'knife',
+    'spoon',
+    'bowl',
+    'banana',
+    'apple',
+    'sandwich',
+    'orange',
+    'broccoli',
+    'carrot',
+    'hot dog',
+    'pizza',
+    'donut',
+    'cake',
+    'chair',
+    'couch',
+    'potted plant',
+    'bed',
+    'mirror',
+    'dining table',
+    'window',
+    'desk',
+    'toilet',
+    'door',
+    'tv',
+    'laptop',
+    'mouse',
+    'remote',
+    'keyboard',
+    'cell phone',
+    'microwave',
+    'oven',
+    'toaster',
+    'sink',
+    'refrigerator',
+    'blender',
+    'book',
+    'clock',
+    'vase',
+    'scissors',
+    'teddy bear',
+    'hair drier',
+    'toothbrush',
+    'hair brush',
+]
+
+
+def attach_im_size_to_key(x):
+    width, height = x[1].size
+    return ((x[0], width, height), x[1])
+
+
+def read_image(image_file_name: str,
+               path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+    return image_file_name, data
+
+
+def preprocess_image(image: Image.Image) -> np.ndarray:
+  ssd_mobilenet_v2_320x320_input_dims = (300, 300)
+  image = image.resize(ssd_mobilenet_v2_320x320_input_dims, 
+    resample=Image.Resampling.BILINEAR)
+  image = np.expand_dims(np.asarray(image, dtype=np.float32), axis=0)
+  return image
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    key, prediction_result = element
+    filename, im_width, im_height = key
+    nums = prediction_result.inference[0]
+    boxes = prediction_result.inference[1]
+    scores = prediction_result.inference[2]
+    classes = prediction_result.inference[3]
+    detections = []
+    for i in range(int(nums[0])):
+        detections.append({
+                'ymin': str(boxes[i][0] * im_height),
+                'xmin': str(boxes[i][1] * im_width),
+                'ymax': str(boxes[i][2] * im_height),
+                'xmax': str(boxes[i][3] * im_width),
+                'score': str(scores[i]),
+                'class': COCO_OBJ_DET_CLASSES[int(classes[i])]
+            })
+    yield filename + ',' + str(detections)
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      required=True,
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--engine_path',
+      dest='engine_path',
+      required=True,
+      help='Path to the pre-built TFOD ssd_mobilenet_v2_320x320_coco17_tpu-8' 
+      'TensorRT engine.')
+  parser.add_argument(
+      '--images_dir',
+      default=None,
+      help='Path to the directory where images are stored.'
+      'Not required if image names in the input file have absolute path.')
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  engine_handler = KeyedModelHandler(
+      TensorRTEngineHandlerNumPy(
+          min_batch_size=1, max_batch_size=1, 
+        engine_path=known_args.engine_path))
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    filename_value_pair = (
+        p
+        | 'ReadImageNames' >> beam.io.ReadFromText(
+            known_args.input, skip_header_lines=0)
+        | 'ReadImageData' >> beam.Map(
+            lambda image_name: read_image(
+                image_file_name=image_name, path_to_dir=known_args.images_dir))
+        | 'AttachImageSizeToKey' >> beam.Map(attach_im_size_to_key)
+        | 'PreprocessImages' >> beam.MapTuple(
+            lambda file_name, data: (file_name, preprocess_image(data))))
+    predictions = (
+        filename_value_pair
+        | 'PyTorchRunInference' >> RunInference(engine_handler)
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+
+    if known_args.output:
+      predictions | "WriteOutputToGCS" >> beam.io.WriteToText(
+        known_args.output,
+        shard_name_template='',
+        append_trailing_newlines=True)
+

Review Comment:
   you don't have to take this suggestion, but one thing you could do if there is no output directory is just print the results.
   ```
   if known_args.output:
         predictions | "WriteOutputToGCS" >> beam.io.WriteToText(
           known_args.output,
           shard_name_template='',
           append_trailing_newlines=True)
   else:
     predictions | "PrintPredictions" >> beam.Map(print)
   ```



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,213 @@
+#
+# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pycuda.driver as cuda
+import sys
+import tensorrt as trt
+from typing import Any, Dict, Iterable, Optional, Sequence
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler, PredictionResult
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+
+def _load_engine(engine_path):
+  file = FileSystems.open(engine_path, 'rb')
+  runtime = trt.Runtime(LOGGER)
+  engine = runtime.deserialize_cuda_engine(file.read())
+  assert engine
+  return engine
+
+
+def _load_onnx(onnx_path):
+  builder = trt.Builder(LOGGER)
+  network = builder.create_network(
+      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
+  parser = trt.OnnxParser(network, LOGGER)
+  with FileSystems.open(onnx_path) as f:
+    if not parser.parse(f.read()):
+      print("Failed to load ONNX file: {}".format(onnx_path))
+      for error in range(parser.num_errors):
+        print(parser.get_error(error))

Review Comment:
   same here: maybe log instead of print (if possible).



##########
sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py:
##########
@@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline that uses RunInference API to perform object detection with TensorRT."""
+
+import argparse
+import io
+import numpy as np
+import os
+from PIL import Image
+from typing import Iterable, Optional, Tuple
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import (KeyedModelHandler, PredictionResult,
+                                           RunInference)
+from apache_beam.ml.inference.tensorrt_inference import \
+    TensorRTEngineHandlerNumPy
+from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
+
+COCO_OBJ_DET_CLASSES = [
+    'person',
+    'bicycle',
+    'car',
+    'motorcycle',
+    'airplane',
+    'bus',
+    'train',
+    'truck',
+    'boat',
+    'traffic light',
+    'fire hydrant',
+    'street sign',
+    'stop sign',
+    'parking meter',
+    'bench',
+    'bird',
+    'cat',
+    'dog',
+    'horse',
+    'sheep',
+    'cow',
+    'elephant',
+    'bear',
+    'zebra',
+    'giraffe',
+    'hat',
+    'backpack',
+    'umbrella',
+    'shoe',
+    'eye glasses',
+    'handbag',
+    'tie',
+    'suitcase',
+    'frisbee',
+    'skis',
+    'snowboard',
+    'sports ball',
+    'kite',
+    'baseball bat',
+    'baseball glove',
+    'skateboard',
+    'surfboard',
+    'tennis racket',
+    'bottle',
+    'plate',
+    'wine glass',
+    'cup',
+    'fork',
+    'knife',
+    'spoon',
+    'bowl',
+    'banana',
+    'apple',
+    'sandwich',
+    'orange',
+    'broccoli',
+    'carrot',
+    'hot dog',
+    'pizza',
+    'donut',
+    'cake',
+    'chair',
+    'couch',
+    'potted plant',
+    'bed',
+    'mirror',
+    'dining table',
+    'window',
+    'desk',
+    'toilet',
+    'door',
+    'tv',
+    'laptop',
+    'mouse',
+    'remote',
+    'keyboard',
+    'cell phone',
+    'microwave',
+    'oven',
+    'toaster',
+    'sink',
+    'refrigerator',
+    'blender',
+    'book',
+    'clock',
+    'vase',
+    'scissors',
+    'teddy bear',
+    'hair drier',
+    'toothbrush',
+    'hair brush',
+]
+
+
+def attach_im_size_to_key(x):

Review Comment:
   what is x here, can you add a more descriptive variable name?



##########
sdks/python/apache_beam/examples/inference/tensorrt_object_detection.py:
##########
@@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline that uses RunInference API to perform object detection with TensorRT."""
+
+import argparse
+import io
+import numpy as np
+import os
+from PIL import Image
+from typing import Iterable, Optional, Tuple
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import (KeyedModelHandler, PredictionResult,
+                                           RunInference)
+from apache_beam.ml.inference.tensorrt_inference import \
+    TensorRTEngineHandlerNumPy
+from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
+
+COCO_OBJ_DET_CLASSES = [
+    'person',
+    'bicycle',
+    'car',
+    'motorcycle',
+    'airplane',
+    'bus',
+    'train',
+    'truck',
+    'boat',
+    'traffic light',
+    'fire hydrant',
+    'street sign',
+    'stop sign',
+    'parking meter',
+    'bench',
+    'bird',
+    'cat',
+    'dog',
+    'horse',
+    'sheep',
+    'cow',
+    'elephant',
+    'bear',
+    'zebra',
+    'giraffe',
+    'hat',
+    'backpack',
+    'umbrella',
+    'shoe',
+    'eye glasses',
+    'handbag',
+    'tie',
+    'suitcase',
+    'frisbee',
+    'skis',
+    'snowboard',
+    'sports ball',
+    'kite',
+    'baseball bat',
+    'baseball glove',
+    'skateboard',
+    'surfboard',
+    'tennis racket',
+    'bottle',
+    'plate',
+    'wine glass',
+    'cup',
+    'fork',
+    'knife',
+    'spoon',
+    'bowl',
+    'banana',
+    'apple',
+    'sandwich',
+    'orange',
+    'broccoli',
+    'carrot',
+    'hot dog',
+    'pizza',
+    'donut',
+    'cake',
+    'chair',
+    'couch',
+    'potted plant',
+    'bed',
+    'mirror',
+    'dining table',
+    'window',
+    'desk',
+    'toilet',
+    'door',
+    'tv',
+    'laptop',
+    'mouse',
+    'remote',
+    'keyboard',
+    'cell phone',
+    'microwave',
+    'oven',
+    'toaster',
+    'sink',
+    'refrigerator',
+    'blender',
+    'book',
+    'clock',
+    'vase',
+    'scissors',
+    'teddy bear',
+    'hair drier',
+    'toothbrush',
+    'hair brush',
+]
+
+
+def attach_im_size_to_key(x):
+    width, height = x[1].size
+    return ((x[0], width, height), x[1])
+
+
+def read_image(image_file_name: str,
+               path_to_dir: Optional[str] = None) -> Tuple[str, Image.Image]:
+  if path_to_dir is not None:
+    image_file_name = os.path.join(path_to_dir, image_file_name)
+  with FileSystems().open(image_file_name, 'r') as file:
+    data = Image.open(io.BytesIO(file.read())).convert('RGB')
+    return image_file_name, data
+
+
+def preprocess_image(image: Image.Image) -> np.ndarray:
+  ssd_mobilenet_v2_320x320_input_dims = (300, 300)
+  image = image.resize(ssd_mobilenet_v2_320x320_input_dims, 
+    resample=Image.Resampling.BILINEAR)
+  image = np.expand_dims(np.asarray(image, dtype=np.float32), axis=0)
+  return image
+
+
+class PostProcessor(beam.DoFn):
+  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
+    key, prediction_result = element
+    filename, im_width, im_height = key
+    nums = prediction_result.inference[0]

Review Comment:
   can this variable be more descriptive.  Is nums the number of inferences?
   
   maybe:
   num_inferences = prediction_result.inference[0][0]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org