You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2023/08/30 12:31:55 UTC

[beam] branch master updated: Add word count to BagOfWords (#28203)

This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 63f419726dc Add word count to BagOfWords (#28203)
63f419726dc is described below

commit 63f419726dc5e0b71cb4db7f5480ed106857a88f
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Wed Aug 30 12:31:47 2023 +0000

    Add word count to BagOfWords (#28203)
    
    * Add word count
    
    * Apply suggestions from code review
    
    Co-authored-by: Danny McCormick <da...@google.com>
    
    ---------
    
    Co-authored-by: Danny McCormick <da...@google.com>
---
 sdks/python/apache_beam/ml/transforms/tft.py      | 27 ++++++++++++++++++-
 sdks/python/apache_beam/ml/transforms/tft_test.py | 33 +++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py
index b59c1715b7d..1d492642cd6 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -59,6 +59,7 @@ __all__ = [
     'TFTOperation',
     'ScaleByMinMax',
     'NGrams',
+    'BagOfWords',
 ]
 
 # Register the expected input types for each operation
@@ -570,7 +571,9 @@ class BagOfWords(TFTOperation):
       *,
       ngram_range: Tuple[int, int] = (1, 1),
       ngrams_separator: Optional[str] = None,
-      name: Optional[str] = None):
+      compute_word_count: bool = False,
+      name: Optional[str] = None,
+  ):
     """
     Bag of words contains the unique words present in the input text.
     This operation applies a bag of words transformation to specified
@@ -588,6 +591,10 @@ class BagOfWords(TFTOperation):
       ngram_range: A tuple of integers(inclusive) specifying the range of
         n-gram sizes.
       seperator: A string that will be inserted between each ngram.
+      compute_word_count: A boolean that specifies whether to compute
+        the unique word count and add it as an artifact to the output.
+        Note that the count will be computed over the entire dataset so
+        it will be the same value for all inputs.
       name: A name for the operation (optional).
 
     Note that original order of the input may not be preserved.
@@ -598,11 +605,19 @@ class BagOfWords(TFTOperation):
     self.ngrams_separator = ngrams_separator
     self.name = name
     self.split_string_by_delimiter = split_string_by_delimiter
+    if compute_word_count:
+      self.compute_word_count_fn = count_unqiue_words
+    else:
+      self.compute_word_count_fn = lambda *args, **kwargs: {}
 
     if ngram_range != (1, 1) and not ngrams_separator:
       raise ValueError(
           'ngrams_separator must be specified when ngram_range is not (1, 1)')
 
+  def get_artifacts(self, data: tf.SparseTensor,
+                    col_name: str) -> Dict[str, tf.Tensor]:
+    return self.compute_word_count_fn(data, col_name)
+
   def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
     if self.split_string_by_delimiter:
       data = self._split_string_with_delimiter(
@@ -610,3 +625,13 @@ class BagOfWords(TFTOperation):
     output = tft.bag_of_words(
         data, self.ngram_range, self.ngrams_separator, self.name)
     return {output_col_name: output}
+
+
+def count_unqiue_words(data: tf.SparseTensor,
+                       output_col_name: str) -> Dict[str, tf.Tensor]:
+  keys, count = tft.count_per_key(data)
+  shape = [tf.shape(data)[0], tf.shape(keys)[0]]
+  return {
+      output_col_name + '_unique_elements': tf.broadcast_to(keys, shape),
+      output_col_name + '_counts': tf.broadcast_to(count, shape)
+  }
diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py
index 9be41895701..41f59c868c3 100644
--- a/sdks/python/apache_beam/ml/transforms/tft_test.py
+++ b/sdks/python/apache_beam/ml/transforms/tft_test.py
@@ -730,6 +730,39 @@ class BagOfWordsTest(unittest.TestCase):
       ], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
       assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
 
+  def test_count_per_key_on_list(self):
+    def map_element_to_count(elements, counts):
+      d = {elements[i]: counts[i] for i in range(len(elements))}
+      return d
+
+    data = [{
+        'x': ['I', 'like', 'pie', 'pie', 'pie'],
+    }, {
+        'x': ['yum', 'yum', 'pie']
+    }, {
+        'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple']
+    }]
+    with beam.Pipeline() as p:
+      result = (
+          p
+          | "Create" >> beam.Create(data)
+          | "MLTransform" >> base.MLTransform(
+              write_artifact_location=self.artifact_location,
+              transforms=[
+                  tft.BagOfWords(columns=['x'], compute_word_count=True)
+              ]))
+
+      # the unique elements and counts are artifacts and will be
+      # stored in the result and same for all the elements in the
+      # PCollection.
+      result = result | beam.Map(
+          lambda x: map_element_to_count(x.x_unique_elements, x.x_counts))
+
+      expected_data = [{
+          b'Apple': 4, b'Banana': 2, b'I': 1, b'like': 1, b'pie': 4, b'yum': 2
+      }] * 3  # since there are 3 elements in input.
+      assert_that(result, equal_to(expected_data))
+
 
 if __name__ == '__main__':
   unittest.main()