You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/05/22 14:35:58 UTC

[beam] branch master updated: Featue/online clustering (#25823)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a573def48c9 Featue/online clustering (#25823)
a573def48c9 is described below

commit a573def48c90fe1309e6dd95d94f55cd1cf2ced5
Author: Jasper Van den Bossche <11...@users.noreply.github.com>
AuthorDate: Mon May 22 16:35:44 2023 +0200

    Featue/online clustering (#25823)
    
    * Online K-Means Transform implementation
    
    * Change example input path
    
    * Improve memory usage + performance
    
    * Add docstring for clarity
    
    * Split clustering transform
    
    * Add licenses
    
    * Fix pylint
    
    * Fix pylint formatting
    
    * Fix pylint formatting
    
    * Prediction dependent on training
    
    * Resolve comments
    
    * Fix pylint
    
    * Fix pylint
    
    * Update sdks/python/apache_beam/examples/california_housing_clustering.py
    
    ---------
    
    Co-authored-by: Danny McCormick <da...@google.com>
---
 .../examples/california_housing_clustering.py      | 117 +++++++
 .../apache_beam/examples/online_clustering.py      | 337 +++++++++++++++++++++
 2 files changed, 454 insertions(+)

diff --git a/sdks/python/apache_beam/examples/california_housing_clustering.py b/sdks/python/apache_beam/examples/california_housing_clustering.py
new file mode 100644
index 00000000000..78afa1482b3
--- /dev/null
+++ b/sdks/python/apache_beam/examples/california_housing_clustering.py
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A pipeline that uses OnlineClustering transform to group houses
+with a similar value together.
+
+This example uses the California Housing Prices dataset from kaggle.
+https://www.kaggle.com/datasets/camnugent/california-housing-prices
+
+In the first step of the pipeline, the clustering model is trained
+using the OnlineKMeans transform, then the AssignClusterLabels
+transform assigns a cluster to each record in the dataset. This
+transform makes use of the RunInference API under the hood.
+
+In order to run this example:
+1. Download the data from kaggle as csv
+2. Run `python california_housing_clustering.py --input <path/to/housing.csv> --checkpoints_path <path/to/checkpoints>`  # pylint: disable=line-too-long
+"""
+
+import argparse
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam import pvalue
+from apache_beam.dataframe.convert import to_pcollection
+from apache_beam.dataframe.io import read_csv
+from apache_beam.examples.online_clustering import AssignClusterLabelsInMemoryModel
+from apache_beam.examples.online_clustering import OnlineClustering
+from apache_beam.examples.online_clustering import OnlineKMeans
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      required=True,
+      help='A csv file containing the data that needs to be clustered.')
+  parser.add_argument(
+      '--checkpoints_path',
+      dest='checkpoints_path',
+      required=True,
+      help='A path to a directory where model checkpoints can be stored.')
+  return parser.parse_known_args(argv)
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  """
+    Args:
+      argv: Command line arguments defined for this example.
+      save_main_session: Used for internal testing.
+      test_pipeline: Used for internal testing.
+    """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  pipeline = test_pipeline
+  if not test_pipeline:
+    pipeline = beam.Pipeline(options=pipeline_options)
+
+  data = pipeline | read_csv(known_args.input)
+
+  features = ['longitude', 'latitude', 'median_income']
+
+  housing_features = to_pcollection(data[features])
+
+  # 1. Calculate clustering centers and save model to persistent storage
+  model = (
+      housing_features
+      | beam.Map(lambda record: list(record))
+      | "Train clustering model" >> OnlineClustering(
+          OnlineKMeans,
+          n_clusters=6,
+          batch_size=256,
+          cluster_args={},
+          checkpoints_path=known_args.checkpoints_path))
+
+  # 2. Calculate labels for all records in the dataset
+  # using the trained clustering model using in memory model
+  _ = (
+      housing_features
+      | beam.Map(lambda sample: np.array(sample))
+      | "RunInference" >> AssignClusterLabelsInMemoryModel(
+          model=pvalue.AsSingleton(model),
+          model_id="kmeans",
+          n_clusters=6,
+          batch_size=512)
+      | beam.Map(print))
+
+  result = pipeline.run()
+  result.wait_until_finish()
+  return result
+
+
+if __name__ == '__main__':
+  run()
diff --git a/sdks/python/apache_beam/examples/online_clustering.py b/sdks/python/apache_beam/examples/online_clustering.py
new file mode 100644
index 00000000000..9746995dce7
--- /dev/null
+++ b/sdks/python/apache_beam/examples/online_clustering.py
@@ -0,0 +1,337 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import datetime
+from operator import itemgetter
+
+import joblib
+import numpy as np
+import pandas as pd
+from sklearn.cluster import MiniBatchKMeans
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder
+from apache_beam.coders import VarIntCoder
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.sklearn_inference import ModelFileType
+from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
+
+
+class SaveModel(core.DoFn):
+  """Saves trained clustering model to persistent storage"""
+  def __init__(self, checkpoints_path: str):
+    self.checkpoints_path = checkpoints_path
+
+  def process(self, model):
+    # generate ISO 8601
+    iso_timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
+    checkpoint_name = f'{self.checkpoints_path}/{iso_timestamp}.checkpoint'
+    latest_checkpoint = f'{self.checkpoints_path}/latest.checkpoint'
+    # rename previous checkpoint
+    if FileSystems.exists(latest_checkpoint):
+      FileSystems.rename([latest_checkpoint], [checkpoint_name])
+    file = FileSystems.create(latest_checkpoint, 'wb')
+    if not joblib:
+      raise ImportError(
+          'Could not import joblib in this execution environment. '
+          'For help with managing dependencies on Python workers.'
+          'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/'  # pylint: disable=line-too-long
+      )
+
+    joblib.dump(model, file)
+
+    yield checkpoint_name
+
+
+class AssignClusterLabelsFn(core.DoFn):
+  """Takes a trained model and input data and labels
+   all data instances using the trained model."""
+  def process(self, batch, model, model_id):
+    cluster_labels = model.predict(batch)
+    for e, i in zip(batch, cluster_labels):
+      yield PredictionResult(example=e, inference=i, model_id=model_id)
+
+
+class SelectLatestModelState(core.CombineFn):
+  """Selects that latest version of a model after training"""
+  def create_accumulator(self):
+    # create and initialise accumulator
+    return None, 0
+
+  def add_input(self, accumulator, element):
+    # accumulates each element from input in accumulator
+    if element[1] > accumulator[1]:
+      return element
+    return accumulator
+
+  def merge_accumulators(self, accumulators):
+    # Multiple accumulators could be processed in parallel,
+    # this function merges them
+    return max(accumulators, key=itemgetter(1))
+
+  def extract_output(self, accumulator):
+    # Only output the tracker
+    return accumulator[0]
+
+
+class ClusteringAlgorithm(core.DoFn):
+  """Abstract class with the interface
+   that clustering algorithms need to follow."""
+
+  MODEL_SPEC = ReadModifyWriteStateSpec("clustering_model", PickleCoder())
+  ITERATION_SPEC = ReadModifyWriteStateSpec(
+      'training_iterations', VarIntCoder())
+  MODEL_ID = 'ClusteringAlgorithm'
+
+  def __init__(
+      self, n_clusters: int, checkpoints_path: str, cluster_args: dict):
+    super().__init__()
+    self.n_clusters = n_clusters
+    self.checkpoints_path = checkpoints_path
+    self.cluster_args = cluster_args
+    self.clustering_algorithm = None
+
+  def process(
+      self,
+      keyed_batch,
+      model_state=core.DoFn.StateParam(MODEL_SPEC),
+      iteration_state=core.DoFn.StateParam(ITERATION_SPEC),
+      *args,
+      **kwargs):
+    raise NotImplementedError
+
+  def load_model_checkpoint(self):
+    latest_checkpoint = f'{self.checkpoints_path}/latest.checkpoint'
+    if FileSystems.exists(latest_checkpoint):
+      file = FileSystems.open(latest_checkpoint, 'rb')
+      if not joblib:
+        raise ImportError(
+            'Could not import joblib in this execution environment. '
+            'For help with managing dependencies on Python workers.'
+            'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/'  # pylint: disable=line-too-long
+        )
+      return joblib.load(file)
+    return self.clustering_algorithm(
+        n_clusters=self.n_clusters, **self.cluster_args)
+
+
+class OnlineKMeans(ClusteringAlgorithm):
+  """Online K-Means function. Used the MiniBatchKMeans from sklearn
+    More information: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.MiniBatchKMeans.html"""  # pylint: disable=line-too-long
+  MODEL_SPEC = ReadModifyWriteStateSpec("clustering_model", PickleCoder())
+  ITERATION_SPEC = ReadModifyWriteStateSpec(
+      'training_iterations', VarIntCoder())
+  MODEL_ID = 'OnlineKmeans'
+
+  def __init__(
+      self, n_clusters: int, checkpoints_path: str, cluster_args: dict):
+    super().__init__(n_clusters, checkpoints_path, cluster_args)
+    self.clustering_algorithm = MiniBatchKMeans
+
+  def process(
+      self,
+      keyed_batch,
+      model_state=core.DoFn.StateParam(MODEL_SPEC),
+      iteration_state=core.DoFn.StateParam(ITERATION_SPEC),
+      *args,
+      **kwargs):
+    # 1. Initialise or load states
+    clustering = model_state.read() or self.load_model_checkpoint()
+
+    iteration = iteration_state.read() or 0
+
+    iteration += 1
+
+    # 2. Remove the temporary assigned keys
+    _, batch = keyed_batch
+
+    # 3. Calculate cluster centroids
+    clustering.partial_fit(batch)
+
+    # 4. Store the training set and model
+    model_state.write(clustering)
+    iteration_state.write(iteration)
+
+    # checkpoint = joblib.dump(clustering, f'kmeans_checkpoint_{iteration}')
+
+    yield clustering, iteration
+
+
+class ConvertToNumpyArray(core.DoFn):
+  """Helper function to convert incoming data
+  to numpy arrays that are accepted by sklearn"""
+  def process(self, element, *args, **kwargs):
+    if isinstance(element, (tuple, list)):
+      yield np.array(element)
+    elif isinstance(element, np.ndarray):
+      yield element
+    elif isinstance(element, (pd.DataFrame, pd.Series)):
+      yield element.to_numpy()
+    else:
+      raise ValueError(f"Unsupported type: {type(element)}")
+
+
+class ClusteringPreprocessing(ptransform.PTransform):
+  def __init__(
+      self, n_clusters: int, batch_size: int, is_batched: bool = False):
+    """ Preprocessing for Clustering Transformation
+        The clustering transform expects batches for performance reasons,
+        therefore this batches the data and converts it to numpy arrays,
+        which are accepted by sklearn. This transform also adds the same key
+        to all batches, such that only 1 state is created and updated during
+        clustering updates.
+
+          Example Usage::
+
+            pcoll | ClusteringPreprocessing(
+              n_clusters=8,
+              batch_size=1024,
+              is_batched=False)
+
+          Args:
+          n_clusters: number of clusters used by the algorithm
+          batch_size: size of the data batches
+          is_batched: boolean value that marks if the collection is already
+            batched and thus doesn't need to be batched by this transform
+          """
+    super().__init__()
+    self.n_clusters = n_clusters
+    self.batch_size = batch_size
+    self.is_batched = is_batched
+
+  def expand(self, pcoll):
+    pcoll = (
+        pcoll
+        |
+        "Convert element to numpy arrays" >> beam.ParDo(ConvertToNumpyArray()))
+
+    if not self.is_batched:
+      pcoll = (
+          pcoll
+          | "Create batches of elements" >> beam.BatchElements(
+              min_batch_size=self.n_clusters, max_batch_size=self.batch_size)
+          | "Covert to 2d numpy array" >>
+          beam.Map(lambda record: np.array(record)))
+
+    return pcoll
+
+
+class OnlineClustering(ptransform.PTransform):
+  def __init__(
+      self,
+      clustering_algorithm,
+      n_clusters: int,
+      cluster_args: dict,
+      checkpoints_path: str,
+      batch_size: int = 1024,
+      is_batched: bool = False):
+    """ Clustering transformation itself, it first preprocesses the data,
+        then it applies the clustering transformation step by step on each
+        of the batches.
+
+          Example Usage::
+
+            pcoll | OnlineClustering(
+                        clustering_algorithm=OnlineKMeansClustering
+                        batch_size=1024,
+                        n_clusters=6
+                        cluster_args={}))
+
+          Args:
+          clustering_algorithm: Clustering algorithm (DoFn)
+          n_clusters: Number of clusters
+          cluster_args: Arguments for the sklearn clustering algorithm
+            (check sklearn documentation for more information)
+          batch_size: size of the data batches
+          is_batched: boolean value that marks if the collection is already
+            batched and thus doesn't need to be batched by this transform
+          """
+    super().__init__()
+    self.clustering_algorithm = clustering_algorithm
+    self.n_clusters = n_clusters
+    self.batch_size = batch_size
+    self.cluster_args = cluster_args
+    self.checkpoints_path = checkpoints_path
+    self.is_batched = is_batched
+
+  def expand(self, pcoll):
+    # 1. Preprocess data for more efficient clustering
+    data = (
+        pcoll
+        | 'Batch data for faster processing' >> ClusteringPreprocessing(
+            n_clusters=self.n_clusters,
+            batch_size=self.batch_size,
+            is_batched=self.is_batched)
+        | "Add a key for stateful processing" >>
+        beam.Map(lambda record: (1, record)))
+
+    # 2. Calculate cluster centers
+    model = (
+        data
+        | 'Cluster' >> core.ParDo(
+            self.clustering_algorithm(
+                n_clusters=self.n_clusters,
+                cluster_args=self.cluster_args,
+                checkpoints_path=self.checkpoints_path))
+        | 'Select latest model state' >> core.CombineGlobally(
+            SelectLatestModelState()).without_defaults())
+
+    # 3. Save the trained model checkpoint to persistent storage,
+    # so it can be loaded for further training in the next window
+    # or loaded into an sklearn modelhandler for inference
+    _ = (model | core.ParDo(SaveModel(checkpoints_path=self.checkpoints_path)))
+
+    return model
+
+
+class AssignClusterLabelsRunInference(ptransform.PTransform):
+  def __init__(self, checkpoints_path):
+    super().__init__()
+    self.clustering_model = SklearnModelHandlerNumpy(
+        model_uri=f'{checkpoints_path}/latest.checkpoint',
+        model_file_type=ModelFileType.JOBLIB)
+
+  def expand(self, pcoll):
+    predictions = (
+        pcoll
+        | "RunInference" >> RunInference(self.clustering_model))
+
+    return predictions
+
+
+class AssignClusterLabelsInMemoryModel(ptransform.PTransform):
+  def __init__(
+      self, model, n_clusters, batch_size, is_batched=False, model_id=None):
+    self.model = model
+    self.n_clusters = n_clusters
+    self.batch_size = batch_size
+    self.is_batched = is_batched
+    self.model_id = model_id
+
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | "Preprocess data for faster prediction" >> ClusteringPreprocessing(
+            n_clusters=self.n_clusters,
+            batch_size=self.batch_size,
+            is_batched=self.is_batched)
+        | "Assign cluster labels" >> core.ParDo(
+            AssignClusterLabelsFn(), model=self.model, model_id=self.model_id))