You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2023/08/30 12:31:55 UTC
[beam] branch master updated: Add word count to BagOfWords (#28203)
This is an automated email from the ASF dual-hosted git repository.
anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 63f419726dc Add word count to BagOfWords (#28203)
63f419726dc is described below
commit 63f419726dc5e0b71cb4db7f5480ed106857a88f
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Wed Aug 30 12:31:47 2023 +0000
Add word count to BagOfWords (#28203)
* Add word count
* Apply suggestions from code review
Co-authored-by: Danny McCormick <da...@google.com>
---------
Co-authored-by: Danny McCormick <da...@google.com>
---
sdks/python/apache_beam/ml/transforms/tft.py | 27 ++++++++++++++++++-
sdks/python/apache_beam/ml/transforms/tft_test.py | 33 +++++++++++++++++++++++
2 files changed, 59 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py
index b59c1715b7d..1d492642cd6 100644
--- a/sdks/python/apache_beam/ml/transforms/tft.py
+++ b/sdks/python/apache_beam/ml/transforms/tft.py
@@ -59,6 +59,7 @@ __all__ = [
'TFTOperation',
'ScaleByMinMax',
'NGrams',
+ 'BagOfWords',
]
# Register the expected input types for each operation
@@ -570,7 +571,9 @@ class BagOfWords(TFTOperation):
*,
ngram_range: Tuple[int, int] = (1, 1),
ngrams_separator: Optional[str] = None,
- name: Optional[str] = None):
+ compute_word_count: bool = False,
+ name: Optional[str] = None,
+ ):
"""
Bag of words contains the unique words present in the input text.
This operation applies a bag of words transformation to specified
@@ -588,6 +591,10 @@ class BagOfWords(TFTOperation):
ngram_range: A tuple of integers(inclusive) specifying the range of
n-gram sizes.
seperator: A string that will be inserted between each ngram.
+ compute_word_count: A boolean that specifies whether to compute
+ the unique word count and add it as an artifact to the output.
+ Note that the count will be computed over the entire dataset so
+ it will be the same value for all inputs.
name: A name for the operation (optional).
Note that original order of the input may not be preserved.
@@ -598,11 +605,19 @@ class BagOfWords(TFTOperation):
self.ngrams_separator = ngrams_separator
self.name = name
self.split_string_by_delimiter = split_string_by_delimiter
+ if compute_word_count:
+ self.compute_word_count_fn = count_unqiue_words
+ else:
+ self.compute_word_count_fn = lambda *args, **kwargs: {}
if ngram_range != (1, 1) and not ngrams_separator:
raise ValueError(
'ngrams_separator must be specified when ngram_range is not (1, 1)')
+ def get_artifacts(self, data: tf.SparseTensor,
+ col_name: str) -> Dict[str, tf.Tensor]:
+ return self.compute_word_count_fn(data, col_name)
+
def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
if self.split_string_by_delimiter:
data = self._split_string_with_delimiter(
@@ -610,3 +625,13 @@ class BagOfWords(TFTOperation):
output = tft.bag_of_words(
data, self.ngram_range, self.ngrams_separator, self.name)
return {output_col_name: output}
+
+
+def count_unqiue_words(data: tf.SparseTensor,
+ output_col_name: str) -> Dict[str, tf.Tensor]:
+ keys, count = tft.count_per_key(data)
+ shape = [tf.shape(data)[0], tf.shape(keys)[0]]
+ return {
+ output_col_name + '_unique_elements': tf.broadcast_to(keys, shape),
+ output_col_name + '_counts': tf.broadcast_to(count, shape)
+ }
diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py
index 9be41895701..41f59c868c3 100644
--- a/sdks/python/apache_beam/ml/transforms/tft_test.py
+++ b/sdks/python/apache_beam/ml/transforms/tft_test.py
@@ -730,6 +730,39 @@ class BagOfWordsTest(unittest.TestCase):
], [b'yum', b'yum yum', b'yum yum pie', b'yum pie', b'pie']]
assert_that(result, equal_to(expected_data, equals_fn=np.array_equal))
+ def test_count_per_key_on_list(self):
+ def map_element_to_count(elements, counts):
+ d = {elements[i]: counts[i] for i in range(len(elements))}
+ return d
+
+ data = [{
+ 'x': ['I', 'like', 'pie', 'pie', 'pie'],
+ }, {
+ 'x': ['yum', 'yum', 'pie']
+ }, {
+ 'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple']
+ }]
+ with beam.Pipeline() as p:
+ result = (
+ p
+ | "Create" >> beam.Create(data)
+ | "MLTransform" >> base.MLTransform(
+ write_artifact_location=self.artifact_location,
+ transforms=[
+ tft.BagOfWords(columns=['x'], compute_word_count=True)
+ ]))
+
+ # the unique elements and counts are artifacts and will be
+ # stored in the result and same for all the elements in the
+ # PCollection.
+ result = result | beam.Map(
+ lambda x: map_element_to_count(x.x_unique_elements, x.x_counts))
+
+ expected_data = [{
+ b'Apple': 4, b'Banana': 2, b'I': 1, b'like': 1, b'pie': 4, b'yum': 2
+ }] * 3 # since there are 3 elements in input.
+ assert_that(result, equal_to(expected_data))
+
if __name__ == '__main__':
unittest.main()