You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/19 14:00:37 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #23289: Example of Online Clustering

damccorm commented on code in PR #23289:
URL: https://github.com/apache/beam/pull/23289#discussion_r974229358


##########
sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/config.py:
##########
@@ -0,0 +1,10 @@
+PROJECT_ID = "apache-beam-testing"

Review Comment:
   All files will need an apache header, including the otherwise empty __init__.py files. For example, see https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py#L1-L16 
   
   Also, please run our linter + fix the formatting violations reported by CI (all information on those/how to run the linter should be available in the failing check's logs_



##########
sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/setup.py:
##########
@@ -0,0 +1,25 @@
+"""Setup.py module for the workflow's worker utilities.
+All the workflow related code is gathered in a package that will be built as a
+source distribution, staged in the staging area for the workflow being run and
+then installed in the workers when they start running.
+This behavior is triggered by specifying the --setup_file command line option
+when running the workflow for remote execution.
+"""
+
+import setuptools
+from setuptools import find_packages
+
+REQUIREMENTS = [
+    "apache-beam[gcp]==2.40.0",
+    "datasets==2.4.0",
+]
+
+setuptools.setup(
+    name="write-to-pubsub-pipeline",
+    version="1.1.1",
+    install_requires=REQUIREMENTS,
+    packages=find_packages(),
+    author="Shubham Krishna",
+    author_email="shubham.krishna@ml6.eu",

Review Comment:
   ```suggestion
       author="Apache Software Foundation",
       author_email="dev@beam.apache.org",
   ```



##########
website/www/site/content/en/documentation/ml/online-clustering.md:
##########
@@ -0,0 +1,211 @@
+---
+title: "Online Clustering"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# OnlineClustering Example
+
+The OnlineClustering example demonstrates how to setup a realtime clustering pipeline that can read text from PubSub, convert the text into an embedding using a language model, and cluster them using BIRCH.
+
+### Dataset for Clustering
+For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation and test (for performance evaluation). It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).
+
+To have a better understanding of the dataset, here are some examples from the train split of the dataset:
+
+
+| Text        | Type of emotion |
+| :---        |    :----:   |
+| im grabbing a minute to post i feel greedy wrong      | Anger       |
+| i am ever feeling nostalgic about the fireplace i will know that it is still on the property   | Love        |
+| ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
+| on a boat trip to denmark | Joy |
+| i feel you know basically like a fake in the realm of science fiction | Sadness |
+| i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
+
+### Clustering Algorithm
+For the clustering of tweets, we use an incremental clustering algorithm called BIRCH. It stands for balanced iterative reducing and clustering using hierarchies and is an unsupervised data mining algorithm used to perform hierarchical clustering over particularly large data-sets. An advantage of BIRCH is its ability to incrementally and dynamically cluster incoming, multi-dimensional metric data points in an attempt to produce the best quality clustering for a given set of resources (memory and time constraints).
+
+
+## Ingestion to PubSub
+We first ingest the data into [PubSub](https://cloud.google.com/pubsub/docs/overview) so that while clustering we can read the tweets from PubSub. PubSub is a messaging service for exchanging event data among applications and services. It is used for streaming analytics and data integration pipelines to ingest and distribute data.
+
+The full example code for ingesting data to PubSub can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/)
+
+The file structure for ingestion pipeline is:
+
+    write_data_to_pubsub_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── utils.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline

Review Comment:
   ```suggestion
   `pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation.
   
   `pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline.
   
   `config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times.
   
   `setup.py` defines the packages/requirements for the pipeline to run.
   
   `main.py` contains the pipeline code and some additional function used for running the pipeline.
   ```
   
   I like the spacing in the Readme, but in practice this renders as a single sentence if you don't double space - http://apache-beam-website-pull-requests.storage.googleapis.com/23289/documentation/ml/online-clustering/index.html#ingestion-to-pubsub



##########
sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/setup.py:
##########
@@ -0,0 +1,27 @@
+"""Setup.py module for the workflow's worker utilities.
+All the workflow related code is gathered in a package that will be built as a
+source distribution, staged in the staging area for the workflow being run and
+then installed in the workers when they start running.
+This behavior is triggered by specifying the --setup_file command line option
+when running the workflow for remote execution.
+"""
+
+import setuptools
+from setuptools import find_packages
+
+REQUIREMENTS = [
+    "apache-beam[gcp]==2.40.0",
+    "transformers==4.21.1",
+    "torch==1.12.1",
+    "scikit-learn==1.0.2",
+]
+
+setuptools.setup(
+    name="catalog-dataflow-pipeline",
+    version="1.1.1",
+    install_requires=REQUIREMENTS,
+    packages=find_packages(),
+    author="Shubham Krishna",
+    author_email="shubham.krishna@ml6.eu",

Review Comment:
   ```suggestion
       author="Apache Software Foundation'",
       author_email="dev@beam.apache.org",
   ```
   
   By policy these should point to the Apache's contact info. Plus, that way people will reach out to dev@beam.apache.org instead of to you directly



##########
website/www/site/content/en/documentation/ml/online-clustering.md:
##########
@@ -0,0 +1,211 @@
+---
+title: "Online Clustering"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# OnlineClustering Example
+
+The OnlineClustering example demonstrates how to setup a realtime clustering pipeline that can read text from PubSub, convert the text into an embedding using a language model, and cluster them using BIRCH.
+
+### Dataset for Clustering
+For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation and test (for performance evaluation). It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).
+
+To have a better understanding of the dataset, here are some examples from the train split of the dataset:
+
+
+| Text        | Type of emotion |
+| :---        |    :----:   |
+| im grabbing a minute to post i feel greedy wrong      | Anger       |
+| i am ever feeling nostalgic about the fireplace i will know that it is still on the property   | Love        |
+| ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
+| on a boat trip to denmark | Joy |
+| i feel you know basically like a fake in the realm of science fiction | Sadness |
+| i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
+
+### Clustering Algorithm
+For the clustering of tweets, we use an incremental clustering algorithm called BIRCH. It stands for balanced iterative reducing and clustering using hierarchies and is an unsupervised data mining algorithm used to perform hierarchical clustering over particularly large data-sets. An advantage of BIRCH is its ability to incrementally and dynamically cluster incoming, multi-dimensional metric data points in an attempt to produce the best quality clustering for a given set of resources (memory and time constraints).
+
+
+## Ingestion to PubSub
+We first ingest the data into [PubSub](https://cloud.google.com/pubsub/docs/overview) so that while clustering we can read the tweets from PubSub. PubSub is a messaging service for exchanging event data among applications and services. It is used for streaming analytics and data integration pipelines to ingest and distribute data.
+
+The full example code for ingesting data to PubSub can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/)
+
+The file structure for ingestion pipeline is:
+
+    write_data_to_pubsub_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── utils.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+
+The `write_data_to_pubsub_pipeline` contains four different transforms:
+1. Load emotion dataset using HuggingFace Datasets (We take samples from 3 classes instead of 6 for simplicity)
+2. Associate each text with a unique identifier (UID)
+3. Convert the text into a format PubSub is expecting
+4. Write the formatted message to PubSub
+
+
+## Clustering on Streaming Data
+
+After having the data ingested to PubSub, we can now look into the second pipeline, where we read the streaming message from PubSub, convert the text to a embedding using a language model and cluster them using BIRCH.
+
+The full example code for all the steps mentioned above can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/).
+
+
+The file structure for clustering_pipeline is:
+
+    clustering_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── transformations.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline

Review Comment:
   ```suggestion
   `pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation.
   
   `pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline.
   
   `config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times.
   
   `setup.py` defines the packages/requirements for the pipeline to run.
   
   `main.py` contains the pipeline code and some additional function used for running the pipeline.
   ```



##########
website/www/site/content/en/documentation/ml/online-clustering.md:
##########
@@ -0,0 +1,211 @@
+---
+title: "Online Clustering"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# OnlineClustering Example
+
+The OnlineClustering example demonstrates how to setup a realtime clustering pipeline that can read text from PubSub, convert the text into an embedding using a language model, and cluster them using BIRCH.
+
+### Dataset for Clustering
+For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation and test (for performance evaluation). It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).

Review Comment:
   ```suggestion
   ### Dataset for Clustering
   For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train, validation and test. It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).
   ```



##########
website/www/site/content/en/documentation/ml/online-clustering.md:
##########
@@ -0,0 +1,211 @@
+---
+title: "Online Clustering"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# OnlineClustering Example
+
+The OnlineClustering example demonstrates how to setup a realtime clustering pipeline that can read text from PubSub, convert the text into an embedding using a language model, and cluster them using BIRCH.
+
+### Dataset for Clustering
+For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation and test (for performance evaluation). It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).
+
+To have a better understanding of the dataset, here are some examples from the train split of the dataset:
+
+
+| Text        | Type of emotion |
+| :---        |    :----:   |
+| im grabbing a minute to post i feel greedy wrong      | Anger       |
+| i am ever feeling nostalgic about the fireplace i will know that it is still on the property   | Love        |
+| ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
+| on a boat trip to denmark | Joy |
+| i feel you know basically like a fake in the realm of science fiction | Sadness |
+| i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
+
+### Clustering Algorithm
+For the clustering of tweets, we use an incremental clustering algorithm called BIRCH. It stands for balanced iterative reducing and clustering using hierarchies and is an unsupervised data mining algorithm used to perform hierarchical clustering over particularly large data-sets. An advantage of BIRCH is its ability to incrementally and dynamically cluster incoming, multi-dimensional metric data points in an attempt to produce the best quality clustering for a given set of resources (memory and time constraints).
+
+
+## Ingestion to PubSub
+We first ingest the data into [PubSub](https://cloud.google.com/pubsub/docs/overview) so that while clustering we can read the tweets from PubSub. PubSub is a messaging service for exchanging event data among applications and services. It is used for streaming analytics and data integration pipelines to ingest and distribute data.
+
+The full example code for ingesting data to PubSub can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/)
+
+The file structure for ingestion pipeline is:
+
+    write_data_to_pubsub_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── utils.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+
+The `write_data_to_pubsub_pipeline` contains four different transforms:
+1. Load emotion dataset using HuggingFace Datasets (We take samples from 3 classes instead of 6 for simplicity)
+2. Associate each text with a unique identifier (UID)
+3. Convert the text into a format PubSub is expecting
+4. Write the formatted message to PubSub
+
+
+## Clustering on Streaming Data
+
+After having the data ingested to PubSub, we can now look into the second pipeline, where we read the streaming message from PubSub, convert the text to a embedding using a language model and cluster them using BIRCH.
+
+The full example code for all the steps mentioned above can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/).
+
+
+The file structure for clustering_pipeline is:
+
+    clustering_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── transformations.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages and you have pushed data to PubSub.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+The pipeline can be broken down into few simple steps:
+
+1. Reading the message from PubSub
+2. Converting the PubSub message into a PCollection of dictionary where key is UID and value is twitter text
+3. Encoding the text into transformer-readable token ID integers using a tokenizer
+4. Using RunInference to get the vector embedding from a Transformer based Language Model
+5. Normalizing the embedding for Clustering
+6. Performing BIRCH Clustering using Stateful Processing
+7. Printing the texts assigned to clusters
+
+The code snippet for first two steps of pipeline where message from PubSub is read and converted into a dictionary
+
+{{< highlight >}}
+    docs = (
+        pipeline
+        | "Read from PubSub"
+        >> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True)
+        | "Decode PubSubMessage" >> beam.ParDo(Decode())
+    )
+{{< /highlight >}}
+
+
+We now closely look at three important steps of pipeline where we tokenize the text, fed the tokenized text to get embedding from a Transformer based Language Model and performing clustering using [Stateful Processing](https://beam.apache.org/blog/stateful-processing/).
+
+
+### Getting Embedding from a Language Model
+
+In order to do clustering with text data, we first need to map the text into vectors of numerical values suitable for statistical analysis. We use a transformer based language model called [sentence-transformers/stsb-distilbert-base/stsb-distilbert-base](https://huggingface.co/sentence-transformers/stsb-distilbert-base). It maps sentences & paragraphs to a 768 dimensional dense vector space and can be used for tasks like clustering or semantic search. But, we first need to tokenize the text as the language model is expecting a tokenized input instead of raw text.
+
+Tokenization can be seen as a preprocessing task as it transforms text in a way that it can be fed into the model for getting predictions.
+
+{{< highlight >}}
+    normalized_embedding = (
+        docs
+        | "Tokenize Text" >> beam.Map(tokenize_sentence)
+{{< /highlight >}}
+
+Here, `tokenize_sentence` is a function that takes a dictionary with a text and an id, tokenizes the text, and returns a tuple (text, id) and the tokenized output.
+
+
+Tokenized output is then passed to the language model for getting the embeddings. For getting embeddings from language model, we use `RunInference()` from beam.
+
+{{< highlight >}}
+    | "Get Embedding" >> RunInference(KeyedModelHandler(model_handler))
+
+{{< /highlight >}}
+
+After getting the embedding for each twitter text, the embeddings are normalized as it helps to make better clusters.
+
+{{< highlight >}}
+    | "Normalize Embedding" >> beam.ParDo(NormalizeEmbedding())
+
+{{< /highlight >}}
+
+
+### StatefulOnlineClustering
+As the data is coming in a streaming fashion, so to cluster them we need an iterative clustering algorithm like BIRCH. As, the algorithm is iterative, we need a mechanism to store the previous state so that when a twitter text arrives, it can be updated accordingly.  ** Stateful Processing ** enables a `DoFn` with the ability to have a persistent state which can be read and written during the processing of each element. One can read about Stateful Processing in the official documentation from Beam: [Link](https://beam.apache.org/blog/stateful-processing/).
+
+In this example, every time a new message is Read from PubSub, we retrieve the existing state of the clustering model, update it and write it back to the state.
+
+{{< highlight >}}
+    clustering = (
+        normalized_embedding
+        | "Map doc to key" >> beam.Map(lambda x: (1, x))
+        | "StatefulClustering using Birch" >> beam.ParDo(StatefulOnlineClustering())
+    )
+{{< /highlight >}}
+
+As BIRCH doesn't support parallelization, so we need to make sure that all the StatefulProcessing is taking place only by one worker. In order to do that, we use the `Beam.Map` to associate each text to the same key `1`.
+
+`StatefulOnlineClustering` is a `DoFn` that an embedding of a text and updates the clustering model. For storing the state it uses `ReadModifyWriteStateSpec` state object that acts as a container for storage.
+
+{{< highlight >}}
+
+class StatefulOnlineClustering(beam.DoFn):
+
+    BIRCH_MODEL_SPEC = ReadModifyWriteStateSpec("clustering_model", PickleCoder())
+    DATA_ITEMS_SPEC = ReadModifyWriteStateSpec("data_items", PickleCoder())
+    EMBEDDINGS_SPEC = ReadModifyWriteStateSpec("embeddings", PickleCoder())
+    UPDATE_COUNTER_SPEC = ReadModifyWriteStateSpec("update_counter", PickleCoder())
+
+{{< /highlight >}}
+
+We declare four different `ReadModifyWriteStateSpec objects`:
+
+* `BIRCH_MODEL_SPEC`: holds the state of clustering model
+* `DATA_ITEMS_SPEC`: holds the twitter texts seen so far
+* `EMBEDDINGS_SPEC`: holds the normalized embeddings
+* `UPDATE_COUNTER_SPEC`: holds the number of texts processed
+
+
+These `ReadModifyWriteStateSpec objects` are passed as an additional argument to the process fun, where whenever a news item comes in, we retrieve the existing state of the different objects, update it and then write it back.
+
+{{< highlight file="sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/pipeline/transformations.py" >}}
+{{< code_sample "sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/pipeline/transformations.py" stateful_clustering >}}
+{{< /highlight >}}
+
+
+`GetUpdates` is a `DoFn` that prints the cluster assigned to each twitter message, every time a new message comes.

Review Comment:
   ```suggestion
   `GetUpdates` is a `DoFn` that prints the cluster assigned to each twitter message every time a new message arrives.
   ```



##########
website/www/site/content/en/documentation/ml/online-clustering.md:
##########
@@ -0,0 +1,211 @@
+---
+title: "Online Clustering"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# OnlineClustering Example
+
+The OnlineClustering example demonstrates how to setup a realtime clustering pipeline that can read text from PubSub, convert the text into an embedding using a language model, and cluster them using BIRCH.
+
+### Dataset for Clustering
+For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation and test (for performance evaluation). It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).
+
+To have a better understanding of the dataset, here are some examples from the train split of the dataset:
+
+
+| Text        | Type of emotion |
+| :---        |    :----:   |
+| im grabbing a minute to post i feel greedy wrong      | Anger       |
+| i am ever feeling nostalgic about the fireplace i will know that it is still on the property   | Love        |
+| ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
+| on a boat trip to denmark | Joy |
+| i feel you know basically like a fake in the realm of science fiction | Sadness |
+| i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
+
+### Clustering Algorithm
+For the clustering of tweets, we use an incremental clustering algorithm called BIRCH. It stands for balanced iterative reducing and clustering using hierarchies and is an unsupervised data mining algorithm used to perform hierarchical clustering over particularly large data-sets. An advantage of BIRCH is its ability to incrementally and dynamically cluster incoming, multi-dimensional metric data points in an attempt to produce the best quality clustering for a given set of resources (memory and time constraints).
+
+
+## Ingestion to PubSub
+We first ingest the data into [PubSub](https://cloud.google.com/pubsub/docs/overview) so that while clustering we can read the tweets from PubSub. PubSub is a messaging service for exchanging event data among applications and services. It is used for streaming analytics and data integration pipelines to ingest and distribute data.
+
+The full example code for ingesting data to PubSub can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/)
+
+The file structure for ingestion pipeline is:
+
+    write_data_to_pubsub_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── utils.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+
+The `write_data_to_pubsub_pipeline` contains four different transforms:
+1. Load emotion dataset using HuggingFace Datasets (We take samples from 3 classes instead of 6 for simplicity)
+2. Associate each text with a unique identifier (UID)
+3. Convert the text into a format PubSub is expecting
+4. Write the formatted message to PubSub
+
+
+## Clustering on Streaming Data
+
+After having the data ingested to PubSub, we can now look into the second pipeline, where we read the streaming message from PubSub, convert the text to a embedding using a language model and cluster them using BIRCH.
+
+The full example code for all the steps mentioned above can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/).
+
+
+The file structure for clustering_pipeline is:
+
+    clustering_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── transformations.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages and you have pushed data to PubSub.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+The pipeline can be broken down into few simple steps:
+
+1. Reading the message from PubSub
+2. Converting the PubSub message into a PCollection of dictionary where key is UID and value is twitter text
+3. Encoding the text into transformer-readable token ID integers using a tokenizer
+4. Using RunInference to get the vector embedding from a Transformer based Language Model
+5. Normalizing the embedding for Clustering
+6. Performing BIRCH Clustering using Stateful Processing
+7. Printing the texts assigned to clusters
+
+The code snippet for first two steps of pipeline where message from PubSub is read and converted into a dictionary
+
+{{< highlight >}}
+    docs = (
+        pipeline
+        | "Read from PubSub"
+        >> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True)
+        | "Decode PubSubMessage" >> beam.ParDo(Decode())
+    )
+{{< /highlight >}}
+
+
+We now closely look at three important steps of pipeline where we tokenize the text, fed the tokenized text to get embedding from a Transformer based Language Model and performing clustering using [Stateful Processing](https://beam.apache.org/blog/stateful-processing/).
+
+
+### Getting Embedding from a Language Model
+
+In order to do clustering with text data, we first need to map the text into vectors of numerical values suitable for statistical analysis. We use a transformer based language model called [sentence-transformers/stsb-distilbert-base/stsb-distilbert-base](https://huggingface.co/sentence-transformers/stsb-distilbert-base). It maps sentences & paragraphs to a 768 dimensional dense vector space and can be used for tasks like clustering or semantic search. But, we first need to tokenize the text as the language model is expecting a tokenized input instead of raw text.
+
+Tokenization can be seen as a preprocessing task as it transforms text in a way that it can be fed into the model for getting predictions.
+
+{{< highlight >}}
+    normalized_embedding = (
+        docs
+        | "Tokenize Text" >> beam.Map(tokenize_sentence)
+{{< /highlight >}}
+
+Here, `tokenize_sentence` is a function that takes a dictionary with a text and an id, tokenizes the text, and returns a tuple (text, id) and the tokenized output.
+
+
+Tokenized output is then passed to the language model for getting the embeddings. For getting embeddings from language model, we use `RunInference()` from beam.
+
+{{< highlight >}}
+    | "Get Embedding" >> RunInference(KeyedModelHandler(model_handler))
+
+{{< /highlight >}}
+
+After getting the embedding for each twitter text, the embeddings are normalized as it helps to make better clusters.
+
+{{< highlight >}}
+    | "Normalize Embedding" >> beam.ParDo(NormalizeEmbedding())
+
+{{< /highlight >}}
+
+
+### StatefulOnlineClustering
+As the data is coming in a streaming fashion, so to cluster them we need an iterative clustering algorithm like BIRCH. As, the algorithm is iterative, we need a mechanism to store the previous state so that when a twitter text arrives, it can be updated accordingly.  ** Stateful Processing ** enables a `DoFn` with the ability to have a persistent state which can be read and written during the processing of each element. One can read about Stateful Processing in the official documentation from Beam: [Link](https://beam.apache.org/blog/stateful-processing/).
+
+In this example, every time a new message is Read from PubSub, we retrieve the existing state of the clustering model, update it and write it back to the state.
+
+{{< highlight >}}
+    clustering = (
+        normalized_embedding
+        | "Map doc to key" >> beam.Map(lambda x: (1, x))
+        | "StatefulClustering using Birch" >> beam.ParDo(StatefulOnlineClustering())
+    )
+{{< /highlight >}}
+
+As BIRCH doesn't support parallelization, so we need to make sure that all the StatefulProcessing is taking place only by one worker. In order to do that, we use the `Beam.Map` to associate each text to the same key `1`.
+
+`StatefulOnlineClustering` is a `DoFn` that an embedding of a text and updates the clustering model. For storing the state it uses `ReadModifyWriteStateSpec` state object that acts as a container for storage.
+
+{{< highlight >}}
+
+class StatefulOnlineClustering(beam.DoFn):
+
+    BIRCH_MODEL_SPEC = ReadModifyWriteStateSpec("clustering_model", PickleCoder())
+    DATA_ITEMS_SPEC = ReadModifyWriteStateSpec("data_items", PickleCoder())
+    EMBEDDINGS_SPEC = ReadModifyWriteStateSpec("embeddings", PickleCoder())
+    UPDATE_COUNTER_SPEC = ReadModifyWriteStateSpec("update_counter", PickleCoder())
+
+{{< /highlight >}}
+
+We declare four different `ReadModifyWriteStateSpec objects`:
+
+* `BIRCH_MODEL_SPEC`: holds the state of clustering model
+* `DATA_ITEMS_SPEC`: holds the twitter texts seen so far
+* `EMBEDDINGS_SPEC`: holds the normalized embeddings
+* `UPDATE_COUNTER_SPEC`: holds the number of texts processed
+
+
+These `ReadModifyWriteStateSpec objects` are passed as an additional argument to the process fun, where whenever a news item comes in, we retrieve the existing state of the different objects, update it and then write it back.

Review Comment:
   ```suggestion
   These `ReadModifyWriteStateSpec objects` are passed as an additional argument to the `process` function. When a news item comes in, we retrieve the existing state of the different objects, update them and then write them back as persistent shared state.
   ```



##########
website/www/site/content/en/documentation/ml/online-clustering.md:
##########
@@ -0,0 +1,211 @@
+---
+title: "Online Clustering"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# OnlineClustering Example
+
+The OnlineClustering example demonstrates how to setup a realtime clustering pipeline that can read text from PubSub, convert the text into an embedding using a language model, and cluster them using BIRCH.
+
+### Dataset for Clustering
+For the example, we use a dataset called [emotion](https://huggingface.co/datasets/emotion). It comprises of 20000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation and test (for performance evaluation). It is a supervised dataset as it contains the text and the category(class) of the dataset. This dataset can easily be accessed using [HuggingFace Datasets](https://huggingface.co/docs/datasets/index).
+
+To have a better understanding of the dataset, here are some examples from the train split of the dataset:
+
+
+| Text        | Type of emotion |
+| :---        |    :----:   |
+| im grabbing a minute to post i feel greedy wrong      | Anger       |
+| i am ever feeling nostalgic about the fireplace i will know that it is still on the property   | Love        |
+| ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
+| on a boat trip to denmark | Joy |
+| i feel you know basically like a fake in the realm of science fiction | Sadness |
+| i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
+
+### Clustering Algorithm
+For the clustering of tweets, we use an incremental clustering algorithm called BIRCH. It stands for balanced iterative reducing and clustering using hierarchies and is an unsupervised data mining algorithm used to perform hierarchical clustering over particularly large data-sets. An advantage of BIRCH is its ability to incrementally and dynamically cluster incoming, multi-dimensional metric data points in an attempt to produce the best quality clustering for a given set of resources (memory and time constraints).
+
+
+## Ingestion to PubSub
+We first ingest the data into [PubSub](https://cloud.google.com/pubsub/docs/overview) so that while clustering we can read the tweets from PubSub. PubSub is a messaging service for exchanging event data among applications and services. It is used for streaming analytics and data integration pipelines to ingest and distribute data.
+
+The full example code for ingesting data to PubSub can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/)
+
+The file structure for ingestion pipeline is:
+
+    write_data_to_pubsub_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── utils.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+
+The `write_data_to_pubsub_pipeline` contains four different transforms:
+1. Load emotion dataset using HuggingFace Datasets (We take samples from 3 classes instead of 6 for simplicity)
+2. Associate each text with a unique identifier (UID)
+3. Convert the text into a format PubSub is expecting
+4. Write the formatted message to PubSub
+
+
+## Clustering on Streaming Data
+
+After having the data ingested to PubSub, we can now look into the second pipeline, where we read the streaming message from PubSub, convert the text to a embedding using a language model and cluster them using BIRCH.
+
+The full example code for all the steps mentioned above can be found [here](sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/).
+
+
+The file structure for clustering_pipeline is:
+
+    clustering_pipeline/
+    ├── pipeline/
+    │   ├── __init__.py
+    │   ├── options.py
+    │   └── transformations.py
+    ├── __init__.py
+    ├── config.py
+    ├── main.py
+    └── setup.py
+
+`pipeline/utils.py` contains the code for loading the emotion dataset and two `beam.DoFn` that are used for data transformation
+`pipeline/options.py` contains the pipeline options to configure the Dataflow pipeline
+`config.py` defines some variables like GCP PROJECT_ID, NUM_WORKERS that are used multiple times
+`setup.py` defines the packages/requirements for the pipeline to run
+`main.py` contains the pipeline code and some additional function used for running the pipeline
+
+### How to Run the Pipeline ?
+First, make sure you have installed the required packages and you have pushed data to PubSub.
+
+1. Locally on your machine: `python main.py`
+2. On GCP for Dataflow: `python main.py --mode cloud`
+
+The pipeline can be broken down into few simple steps:
+
+1. Reading the message from PubSub
+2. Converting the PubSub message into a PCollection of dictionary where key is UID and value is twitter text
+3. Encoding the text into transformer-readable token ID integers using a tokenizer
+4. Using RunInference to get the vector embedding from a Transformer based Language Model
+5. Normalizing the embedding for Clustering
+6. Performing BIRCH Clustering using Stateful Processing
+7. Printing the texts assigned to clusters
+
+The code snippet for first two steps of pipeline where message from PubSub is read and converted into a dictionary
+
+{{< highlight >}}
+    docs = (
+        pipeline
+        | "Read from PubSub"
+        >> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True)
+        | "Decode PubSubMessage" >> beam.ParDo(Decode())
+    )
+{{< /highlight >}}
+
+
+We now closely look at three important steps of pipeline where we tokenize the text, fed the tokenized text to get embedding from a Transformer based Language Model and performing clustering using [Stateful Processing](https://beam.apache.org/blog/stateful-processing/).
+
+
+### Getting Embedding from a Language Model
+
+In order to do clustering with text data, we first need to map the text into vectors of numerical values suitable for statistical analysis. We use a transformer based language model called [sentence-transformers/stsb-distilbert-base/stsb-distilbert-base](https://huggingface.co/sentence-transformers/stsb-distilbert-base). It maps sentences & paragraphs to a 768 dimensional dense vector space and can be used for tasks like clustering or semantic search. But, we first need to tokenize the text as the language model is expecting a tokenized input instead of raw text.
+
+Tokenization can be seen as a preprocessing task as it transforms text in a way that it can be fed into the model for getting predictions.
+
+{{< highlight >}}
+    normalized_embedding = (
+        docs
+        | "Tokenize Text" >> beam.Map(tokenize_sentence)
+{{< /highlight >}}
+
+Here, `tokenize_sentence` is a function that takes a dictionary with a text and an id, tokenizes the text, and returns a tuple (text, id) and the tokenized output.
+
+
+Tokenized output is then passed to the language model for getting the embeddings. For getting embeddings from language model, we use `RunInference()` from beam.
+
+{{< highlight >}}
+    | "Get Embedding" >> RunInference(KeyedModelHandler(model_handler))
+
+{{< /highlight >}}
+
+After getting the embedding for each twitter text, the embeddings are normalized as it helps to make better clusters.
+
+{{< highlight >}}
+    | "Normalize Embedding" >> beam.ParDo(NormalizeEmbedding())
+
+{{< /highlight >}}
+
+
+### StatefulOnlineClustering
+As the data is coming in a streaming fashion, so to cluster them we need an iterative clustering algorithm like BIRCH. As, the algorithm is iterative, we need a mechanism to store the previous state so that when a twitter text arrives, it can be updated accordingly.  ** Stateful Processing ** enables a `DoFn` with the ability to have a persistent state which can be read and written during the processing of each element. One can read about Stateful Processing in the official documentation from Beam: [Link](https://beam.apache.org/blog/stateful-processing/).

Review Comment:
   ```suggestion
   As the data is coming in a streaming fashion, so to cluster them we need an iterative clustering algorithm like BIRCH. As, the algorithm is iterative, we need a mechanism to store the previous state so that when a twitter text arrives, it can be updated accordingly.  *Stateful Processing* enables a `DoFn` to have persistent state which can be read and written during the processing of each element. One can read about Stateful Processing in the official documentation from Beam: [Link](https://beam.apache.org/blog/stateful-processing/).
   ```



##########
sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/main.py:
##########
@@ -0,0 +1,95 @@
+import argparse
+import sys
+
+import apache_beam as beam
+from apache_beam.io.gcp.pubsub import ReadFromPubSub
+from apache_beam.ml.inference.base import KeyedModelHandler, RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
+from transformers import AutoConfig
+
+import config as cfg
+from pipeline.options import get_pipeline_options
+from pipeline.transformations import (
+    Decode,
+    GetUpdates,
+    ModelWrapper,
+    NormalizeEmbedding,
+    StatefulOnlineClustering,
+    tokenize_sentence,
+)
+
+
+def parse_arguments(argv):
+    parser = argparse.ArgumentParser(description="online-clustering")
+
+    parser.add_argument(
+        "-m",
+        "--mode",
+        help="Mode to run pipeline in.",
+        choices=["local", "cloud"],
+        default="local",
+    )
+    parser.add_argument(
+        "-p",
+        "--project",
+        help="GCP project to run pipeline on.",
+        default=cfg.PROJECT_ID,
+    )
+
+    args, _ = parser.parse_known_args(args=argv)
+    return args
+
+
+class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):

Review Comment:
   Could we add a comment that links to https://github.com/apache/beam/issues/21863 and mentions that we can remove this workaround once it is fixed?



##########
sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/pipeline/transformations.py:
##########
@@ -0,0 +1,162 @@
+from collections import Counter, defaultdict
+
+import apache_beam as beam
+import numpy as np
+import torch
+from apache_beam.coders import PickleCoder
+from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
+from sklearn.cluster import Birch
+from transformers import AutoTokenizer, DistilBertModel
+
+import config as cfg
+
+Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)
+
+
+def tokenize_sentence(input_dict):
+    """
+    It takes a dictionary with a text and an id, tokenizes the text, and returns a tuple of the text and
+    id and the tokenized text
+
+    Args:
+      input_dict: a dictionary with the text and id of the sentence
+
+    Returns:
+      A tuple of the text and id, and a dictionary of the tokens.
+    """
+    text, id = input_dict["text"], input_dict["id"]
+    tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
+    tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
+    return (text, id), tokens
+
+
+class ModelWrapper(DistilBertModel):
+    def forward(self, **kwargs):
+        output = super().forward(**kwargs)
+        sentence_embedding = (
+            self.mean_pooling(output, kwargs["attention_mask"]).detach().cpu().numpy()
+        )
+        return sentence_embedding
+
+    # Mean Pooling - Take attention mask into account for correct averaging
+    def mean_pooling(self, model_output, attention_mask):
+        token_embeddings = model_output[
+            0
+        ]  # First element of model_output contains all token embeddings
+        input_mask_expanded = (
+            attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
+        )
+        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
+            input_mask_expanded.sum(1), min=1e-9
+        )
+
+
+class NormalizeEmbedding(beam.DoFn):
+    def process(self, element, *args, **kwargs):
+        """
+        For each element in the input PCollection, normalize the embedding vector, and
+        yield a new element with the normalized embedding added
+        Args:
+          element: The element to be processed.
+        """
+        (text, id), prediction = element
+        embedding = prediction.inference
+        l2_norm = np.linalg.norm(embedding)
+        yield {"text": text, "id": id, "embedding": embedding / l2_norm}
+
+
+class Decode(beam.DoFn):
+    def process(self, element, *args, **kwargs):
+        """
+        For each element in the input PCollection, retrieve the id and decode the bytes into string
+
+        Args:
+          element: The element that is being processed.
+        """
+        yield {
+            "text": element.data.decode("utf-8"),
+            "id": element.attributes["id"],
+        }
+
+
+class StatefulOnlineClustering(beam.DoFn):
+
+    BIRCH_MODEL_SPEC = ReadModifyWriteStateSpec("clustering_model", PickleCoder())
+    DATA_ITEMS_SPEC = ReadModifyWriteStateSpec("data_items", PickleCoder())
+    EMBEDDINGS_SPEC = ReadModifyWriteStateSpec("embeddings", PickleCoder())
+    UPDATE_COUNTER_SPEC = ReadModifyWriteStateSpec("update_counter", PickleCoder())
+    # [START stateful_clustering]
+    def process(
+        self,
+        element,
+        model_state=beam.DoFn.StateParam(BIRCH_MODEL_SPEC),
+        collected_docs_state=beam.DoFn.StateParam(DATA_ITEMS_SPEC),
+        collected_embeddings_state=beam.DoFn.StateParam(EMBEDDINGS_SPEC),
+        update_counter_state=beam.DoFn.StateParam(UPDATE_COUNTER_SPEC),
+        *args,
+        **kwargs,
+    ):
+        """
+        Takes the embedding of a document and updates the clustering model
+
+        Args:
+          element: The input element to be processed.
+          model_state: This is the state of the clustering model. It is a stateful parameter, which means
+        that it will be updated after each call to the process function.
+          collected_docs_state: This is a stateful dictionary that stores the documents that have been
+        processed so far.
+          collected_embeddings_state: This is a dictionary of document IDs and their embeddings.
+          update_counter_state: This is a counter that keeps track of how many documents have been
+        processed.
+        """
+        # 1. Initialise or load states
+        clustering = model_state.read() or Birch(n_clusters=None, threshold=0.7)
+        collected_documents = collected_docs_state.read() or dict()
+        collected_embeddings = collected_embeddings_state.read() or dict()
+        update_counter = update_counter_state.read() or Counter()
+
+        # 2. Extract document, add to state, and add to clustering model
+        _, doc = element
+        doc_id = doc["id"]
+        embedding_vector = doc["embedding"]
+        collected_embeddings[doc_id] = embedding_vector
+        collected_documents[doc_id] = {"id": doc_id, "text": doc["text"]}
+        update_counter = len(collected_documents)
+
+        clustering.partial_fit(np.atleast_2d(embedding_vector))
+
+        # 3. Predict cluster labels of collected documents
+        cluster_labels = clustering.predict(
+            np.array(list(collected_embeddings.values()))
+        )
+
+        # 4. Write states
+        model_state.write(clustering)
+        collected_docs_state.write(collected_documents)
+        collected_embeddings_state.write(collected_embeddings)
+        update_counter_state.write(update_counter)
+        yield {
+            "labels": cluster_labels,
+            "docs": collected_documents,
+            "id": list(collected_embeddings.keys()),
+            "counter": update_counter,
+        }
+        # [END stateful_clustering]
+
+
+class GetUpdates(beam.DoFn):
+    def process(self, element, *args, **kwargs):
+        """
+        Prints and returns clusters with items contained in it
+        """
+        cluster_labels = element.get("labels")
+        doc_ids = element.get("id")
+        docs = element.get("docs")
+        print(f"Update Number: {element.get('counter')}:::\n")
+        label_items_map = defaultdict(list)
+        for doc_id, cluster_label in zip(doc_ids, cluster_labels):
+            label_items_map[cluster_label].append(docs[doc_id])
+            # print(f"Doc-Text: {docs[doc_id]}, cluster_label: {cluster_label}")

Review Comment:
   Could we please cut this extra comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org