You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/16 13:36:23 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #23266: Add drop_example flag to the RunInference and Model Handler

damccorm commented on code in PR #23266:
URL: https://github.com/apache/beam/pull/23266#discussion_r973025417


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -82,6 +82,33 @@ def _to_microseconds(time_ns: int) -> int:
   return int(time_ns / _NANOSECOND_TO_MICROSECOND)
 
 
+def _convert_to_result(
+    batch: Iterable,
+    predictions: Union[Iterable, Dict[Any, Iterable]],
+    drop_example: bool = False) -> Iterable[PredictionResult]:
+  if isinstance(predictions, dict):
+    # Go from one dictionary of type: {key_type1: Iterable<val_type1>,
+    # key_type2: Iterable<val_type2>, ...} where each Iterable is of
+    # length batch_size, to a list of dictionaries:
+    # [{key_type1: value_type1, key_type2: value_type2}]
+    predictions_per_tensor = [
+        dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())
+    ]
+    if not drop_example:
+      return [
+          PredictionResult(x, y) for x, y in zip(batch, predictions_per_tensor)
+      ]
+    else:
+      return [
+          PredictionResult(None, y) for x,
+          y in zip(batch, predictions_per_tensor)
+      ]

Review Comment:
   Rather than returning from inside this nested loop, could we just set `predictions = [dict(zip(predictions.keys(), v)) for v in zip(*predictions.values())]` and let the if/else handle it below? That way we don't need to duplicate logic



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -48,13 +48,40 @@ def run_inference(
       self,
       batch: Sequence[int],
       model: FakeModel,
-      inference_args=None) -> Iterable[int]:
+      inference_args=None,
+      drop_example=False) -> Iterable[int]:
     if self._fake_clock:
       self._fake_clock.current_time_ns += 3_000_000  # 3 milliseconds
     for example in batch:
       yield model.predict(example)
 
 
+class FakeModelHandlerWithPredictionResult(base.ModelHandler[int,
+                                                             int,
+                                                             FakeModel]):
+  def __init__(self, clock=None):
+    self._fake_clock = clock
+
+  def load_model(self):
+    if self._fake_clock:
+      self._fake_clock.current_time_ns += 500_000_000  # 500ms
+    return FakeModel()
+
+  def run_inference(
+      self,
+      batch: Sequence[int],
+      model: FakeModel,
+      inference_args=None,
+      drop_example=False) -> Iterable[int]:
+    if self._fake_clock:
+      self._fake_clock.current_time_ns += 3_000_000  # 3 milliseconds
+    for example in batch:
+      if not drop_example:
+        yield base.PredictionResult(example, model.predict(example))
+      else:
+        yield base.PredictionResult(None, model.predict(example))

Review Comment:
   Isn't this the behavior we're testing? Why are we baking it into our mock?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -183,8 +167,8 @@ def run_inference(
       self,
       batch: Sequence[pandas.DataFrame],
       model: BaseEstimator,
-      inference_args: Optional[Dict[str, Any]] = None
-  ) -> Iterable[PredictionResult]:
+      inference_args: Optional[Dict[str, Any]] = None,
+      drop_example: bool = False) -> Iterable[PredictionResult]:

Review Comment:
   Don't we need to pass this into _convert_to_result? Same question exists elsewhere too



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -82,6 +82,33 @@ def _to_microseconds(time_ns: int) -> int:
   return int(time_ns / _NANOSECOND_TO_MICROSECOND)
 
 
+def _convert_to_result(

Review Comment:
   Good call moving this out to the base class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org