You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/01 21:05:40 UTC

[GitHub] [beam] yeandy commented on a diff in pull request #17800: [BEAM-14535] Added support for pandas in sklearn inference runner

yeandy commented on code in PR #17800:
URL: https://github.com/apache/beam/pull/17800#discussion_r887267918


##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()

Review Comment:
   nit
   ```suggestion
         dataframe = pandas_dataframe()
   ```



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])
+      actual = pcoll | api.RunInference(
+          SklearnModelLoader(model_uri=temp_file_name))
+
+      splits = [data_frame.loc[[i]] for i in data_frame.index]
+      expected = [
+          api.PredictionResult(splits[0], 5),
+          api.PredictionResult(splits[1], 8),
+          api.PredictionResult(splits[2], 1),
+          api.PredictionResult(splits[3], 1),
+          api.PredictionResult(splits[4], 2),
+      ]
+      assert_that(
+          actual, equal_to(expected, equals_fn=_compare_dataframe_predictions))
+
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas_with_keys(self):

Review Comment:
   We're already have a keyed test in `base_test.py`. Is it necessary to test a keyed example for each framework?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],

Review Comment:
   I don't think there's a convention pertaining the order of elements in a Union? Since `pandas.DataFrame` is used more, would this be more useful?
   ```suggestion
         batch: List[Union[pandas.DataFrame, numpy.ndarray]],
   ```



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]

Review Comment:
   I did a toy example like so:
   ```
   >>> df1 = pd.DataFrame([[1,2,3],[4,5,6]])
   >>> df2 = pd.DataFrame([[11,12,13],[14,15,16]])
   >>> vectorized_batch = pd.concat([df1, df2])
   >>> vectorized_batch.index
   Int64Index([0, 1, 0, 1], dtype='int64')
   >>> vectorized_batch.loc[[0]]
       0   1   2
   0   1   2   3
   0  11  12  13
   >>> vectorized_batch.loc[[0]]
       0   1   2
   1   4   5   6
   1  14  15  16
   >>> splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
   >>> splits
   [    0   1   2
   0   1   2   3
   0  11  12  13,     0   1   2
   1   4   5   6
   1  14  15  16,     0   1   2
   0   1   2   3
   0  11  12  13,     0   1   2
   1   4   5   6
   1  14  15  16]
   ```
   I see that index 0 is found in both the first and second batch. Same for index 1. So indexing by `vectorized_batch.index` appears to up data from the different DataFrames. 
   
   Can you clarify how `vectorized_batch.index` is supposed to differentiate between the batches?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -62,6 +83,44 @@ def build_model():
   return model
 
 
+def pandas_dataframe():
+  csv_string = (
+      'category_1,number_1,category_2,number_2,label,number_3\n'

Review Comment:
   Does this implicitly add indexes starting from 0? What if the user has custom indexes?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference_test.py:
##########
@@ -172,6 +231,53 @@ def test_bad_input_type_raises(self):
             model_uri=file.name, model_file_type=None)
         model_loader.load_model()
 
+  @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359')
+  def test_pipeline_pandas(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(build_pandas_pipeline(), file)
+    with TestPipeline() as pipeline:
+      data_frame = pandas_dataframe()
+
+      pcoll = pipeline | 'start' >> beam.Create([data_frame])

Review Comment:
   So here, we have a list of 1 DataFrame. Can we test a list of 2 or more DataFrames which would simulate reading in separate text files into their own DFs?
   



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)

Review Comment:
   Nice refactor



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,19 +44,46 @@ class ModelFileType(enum.Enum):
   JOBLIB = 2
 
 
-class SklearnInferenceRunner(InferenceRunner[numpy.ndarray,
+class SklearnInferenceRunner(InferenceRunner[Union[numpy.ndarray,
+                                                   pandas.DataFrame],
                                              PredictionResult,
                                              BaseEstimator]):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: BaseEstimator) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[numpy.ndarray, pandas.DataFrame]],
+      model: BaseEstimator) -> Iterable[PredictionResult]:
+    if isinstance(batch[0], numpy.ndarray):
+      return SklearnInferenceRunner._predict_np_array(batch, model)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return SklearnInferenceRunner._predict_pandas_dataframe(batch, model)
+
+  @staticmethod
+  def _predict_np_array(batch: List[numpy.ndarray],
+                        model: Any) -> Iterable[PredictionResult]:
     # vectorize data for better performance
     vectorized_batch = numpy.stack(batch, axis=0)
     predictions = model.predict(vectorized_batch)
     return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
 
-  def get_num_bytes(self, batch: List[numpy.ndarray]) -> int:
+  @staticmethod
+  def _predict_pandas_dataframe(batch: List[pandas.DataFrame],
+                                model: Any) -> Iterable[PredictionResult]:
+    # vectorize data for better performance
+    vectorized_batch = pandas.concat(batch, axis=0)
+    predictions = model.predict(vectorized_batch)
+    splits = [vectorized_batch.loc[[i]] for i in vectorized_batch.index]
+    return [
+        PredictionResult(example, inference) for example,
+        inference in zip(splits, predictions)
+    ]
+
+  def get_num_bytes(
+      self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int:
     """Returns the number of bytes of data for a batch."""
-    return sum(sys.getsizeof(element) for element in batch)
+    if isinstance(batch[0], numpy.ndarray):
+      return sum(sys.getsizeof(element) for element in batch)
+    elif isinstance(batch[0], pandas.DataFrame):
+      return sum(df.memory_usage(deep=True).sum() for df in batch)

Review Comment:
   What does `deep=True` mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org