You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/06 17:39:57 UTC

[GitHub] [beam] ryanthompson591 commented on a diff in pull request #17800: [BEAM-14535] Added support for pandas in sklearn inference runner

ryanthompson591 commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r888238813


##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
+    return [
+        PredictionResult(example, inference) for example,
+        inference in zip(splits, predictions)
+    ]
+
+  def get_num_bytes(
+      self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
     """Returns the number of bytes of data for a batch."""
-    return sum(sys.getsizeof(element) for element in batch)
+    if isinstance(batch[0], numpy.ndarray):
+      return sum(sys.getsizeof(element) for element in batch)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return sum(df.memory_usage(deep=True).sum() for df in batch)

Review Comment:
   Deep will introspect the data deeply by interrogating object dtypes for system-level memory consumption, and include it in the returned values.
   



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])
+      actual = pcoll | api.RunInference(
+          SklearnModelLoader(model_uri=temp_file_name))
+
+      splits = [data_frame.loc[[i]] for i in data_frame.index]
+      expected = [
+          api.PredictionResult(splits[0], 5),
+          api.PredictionResult(splits[1], 8),
+          api.PredictionResult(splits[2], 1),
+          api.PredictionResult(splits[3], 1),
+          api.PredictionResult(splits[4], 2),
+      ]
+      assert_that(
+          actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas_with_keys(self):

Review Comment:
   I wanted it for this case, because I'm worried about a regression, because dataframes might be stacked or of different inputs.



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -62,6 +83,44 @@ def build_model():
   return model
 
 
+def pandas_dataframe():
+  csv_string = (
+      'category_1,number_1,category_2,number_2,label,number_3\n'

Review Comment:
   I've decided to not look closely at user defined indexes and instead just use the row index.



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]

Review Comment:
   I think this is actually a bug that I have.
   
   I made a change to account for this. Basically take your code and add this line:
   vectorized_batch.reset_index(inplace=True, drop=True)



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)

Review Comment:
   yeah, I was thinking about your PR and think this sort of refactor would also look nice if it worked out.



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],

Review Comment:
   I don't know if one is used more than the other so I just went alphabetical.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org