You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/09 00:06:11 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17800: [BEAM-14535] Added support for pandas in sklearn inference runner

TheNeuralBit commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r892963554


##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -41,20 +43,57 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
   def run_inference(
-      self, batch: List[numpy.ndarray], model: BaseEstimator,
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator,
       **kwargs) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)

Review Comment:
   Could we have two separate InferenceRunner implementations, one for np.array and one for DataFrame? Then we wouldn't need to branch on the type here for every batch. The user could indicate which they prefer with an argument on the model loader.



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -41,20 +43,57 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
   def run_inference(
-      self, batch: List[numpy.ndarray], model: BaseEstimator,
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator,
       **kwargs) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+    raise ValueError('Unsupported data type.')
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # sklearn_inference currently only supports single rowed dataframes.
+    for dataframe in batch:
+      if dataframe.shape[0] != 1:
+        raise ValueError('Only dataframes with single rows are supported.')
+
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [
+        vectorized_batch.iloc[[i]] for i in range(vectorized_batch.shape[0])
+    ]
+    return [
+        PredictionResult(example, inference) for example,
+        inference in zip(splits, predictions)
+    ]
+
+  def get_num_bytes(
+      self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
     """Returns the number of bytes of data for a batch."""
-    return sum(sys.getsizeof(element) for element in batch)
+    if isinstance(batch[0], numpy.ndarray):
+      return sum(sys.getsizeof(element) for element in batch)
+    elif isinstance(batch[0], pandas.DataFrame):
+      data_frames: List[pandas.DataFrame] = batch
+      return sum(df.memory_usage(deep=True).sum() for df in data_frames)
+    raise ValueError('Unsupported data type.')
 
 
 class SklearnModelLoader(ModelLoader[numpy.ndarray,

Review Comment:
   I think you missed this typehint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org