You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/13 12:01:45 UTC

[GitHub] [beam] yeandy opened a new pull request, #21818: Add Bert Language Modeling example

yeandy opened a new pull request, #21818:
URL: https://github.com/apache/beam/pull/21818

   Add Bert Language Modeling example
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r896037389


##########
sdks/python/apache_beam/examples/inference/pytorch_bert.py:
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from the bookcorpus dataset, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Iterable
+from typing import Dict
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertTokenizer, BertForMaskedLM, BertConfig
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default=
+      'gs://apache-beam-ml/datasets/bookcorpus/bookcorpus_subset.parquet',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='/Users/yeandy/Downloads/'

Review Comment:
   leftover?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897249005


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')

Review Comment:
   Fixed.



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1153828624

   R: @AnandInguva @tvalentyn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1155838098

   Removed CoGBK for simplicity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897237897


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()
+                     | beam.Map(
+                         lambda x: x[1]['masked_text'][0] + ';' + x[0] + ';' +
+                         x[1]['predicted_text'][0]))
+
+    if known_args.output:

Review Comment:
   If users run the example mostly in a non-interactive way, then if a user omits 'output', and the pipeline runs, there won't be any way to check results, so such execution doesn't provide much value, and `output`  should rather be a mandatory param. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897250659


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()
+                     | beam.Map(
+                         lambda x: x[1]['masked_text'][0] + ';' + x[0] + ';' +
+                         x[1]['predicted_text'][0]))
+
+    if known_args.output:

Review Comment:
   Made it required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r896917360


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])

Review Comment:
   I don't think we need these hints anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1154077012

   Postcommits and unit tests pass locally. PTAL @tvalentyn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r896817313


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)

Review Comment:
   nit: decoded_word? or  predicted_last_word?



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()
+                     | beam.Map(
+                         lambda x: x[1]['masked_text'][0] + ';' + x[0] + ';' +
+                         x[1]['predicted_text'][0]))
+
+    if known_args.output:

Review Comment:
   Just curious, why is `output` optional?  Did you consider smth like logging  the predictions in the stdout or  make them visualizeable when executed as a notebook? 



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a

Review Comment:
   nit: grammar



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')

Review Comment:
   broken help string



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):

Review Comment:
   wouldn't hurt to add a docstring as well.



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')

Review Comment:
   incorrect help string



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()

Review Comment:
   is this something that can be folded into PostProcess? isn't the masked_text already a part of predicted result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897249453


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897308716


##########
sdks/python/apache_beam/examples/inference/data/sentences.txt:
##########
@@ -0,0 +1,10 @@
+The capital of France is Paris .

Review Comment:
   We could host it in GCS or pass via `beam.Create([list_of_strings])` (perhaps optionally, if no input  specified).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1155141386

   Changed the dataset to a custom file with my own sentences. If we get the Ok for the model, then I think this example should be goo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897249273


##########
sdks/python/apache_beam/examples/inference/pytorch_bert.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added

Review Comment:
   Added.



##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897290858


##########
sdks/python/apache_beam/examples/inference/data/sentences.txt:
##########
@@ -0,0 +1,10 @@
+The capital of France is Paris .

Review Comment:
   I think we need to add Apache License here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897219305


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()
+                     | beam.Map(
+                         lambda x: x[1]['masked_text'][0] + ';' + x[0] + ';' +
+                         x[1]['predicted_text'][0]))
+
+    if known_args.output:

Review Comment:
   It is there to showcase the ability to write predictions to GCS. For notebook execution, we can take out this`output` part and simply log to the notebook.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897220743


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()

Review Comment:
   Yes, we can fold it in. The original text, not the masked text, is the key of `predicted_text`. i.e. the format is
   masked_text is (original_text, masked_text)
   predicted_text is (original_text, predicted_word)
   and then we join on the original_text key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897308716


##########
sdks/python/apache_beam/examples/inference/data/sentences.txt:
##########
@@ -0,0 +1,10 @@
+The capital of France is Paris .

Review Comment:
   We could host it in GCS or pass via `beam.Create([list_of_strings])` if input not specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897687420


##########
sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py:
##########
@@ -89,6 +90,41 @@ def test_torch_run_inference_imagenet_mobilenetv2(self):
       filename, prediction = prediction.split(',')
       self.assertEqual(_EXPECTED_OUTPUTS[filename], prediction)
 
+  @pytest.mark.uses_pytorch
+  @pytest.mark.it_postcommit
+  def test_torch_run_inference_bert_for_masked_lm(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    # Path to text file containing some sentences
+    file_of_sentences = 'gs://apache-beam-ml/datasets/custom/sentences.txt'  # disable: line-too-long
+    output_file_dir = 'gs://apache-beam-ml/testing/predictions'

Review Comment:
   For test output, it's better to use a bucket with a lifecycle configured to leave less clutter behind, for example:
   
   :~$ gsutil lifecycle get gs://temp-storage-for-end-to-end-tests/
   {"rule": [{"action": {"type": "Delete"}, "condition": {"age": 14}}]}
   
   Lifecycle may be per bucket (not sure if we can configure it just for `./testing/predictions`), so switching outputs for all tests to gs://temp-storage-for-end-to-end-tests/ may be easiest.
   
   cc: @AnandInguva FYI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r896805018


##########
sdks/python/apache_beam/examples/inference/pytorch_bert.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added

Review Comment:
   is there an Issue tracking this TODO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1154214423

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r895866979


##########
sdks/python/apache_beam/examples/inference/pytorch_bert.py:
##########
@@ -0,0 +1,214 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from the bookcorpus dataset, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Iterable
+from typing import Dict
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertTokenizer, BertForMaskedLM, BertConfig
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default=
+      'gs://apache-beam-ml/datasets/bookcorpus/bookcorpus_subset.parquet',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='/Users/yeandy/Downloads/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (
+        p
+        | 'ReadSentences' >> beam.io.ReadFromParquet(known_args.input)
+        | 'ExtractTextFromDict' >> beam.Map(lambda x: x['text']))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        |
+        'PyTorchRunInference' >> RunInference(model_handler).with_output_types(
+            Tuple[str, PredictionResult])

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r896774208


##########
sdks/python/apache_beam/examples/inference/pytorch_bert.py:
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from the bookcorpus dataset, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Iterable
+from typing import Dict
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertTokenizer, BertForMaskedLM, BertConfig
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default=
+      'gs://apache-beam-ml/datasets/bookcorpus/bookcorpus_subset.parquet',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='/Users/yeandy/Downloads/'

Review Comment:
   Yeah. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1155653848

   Added custom text of sentences. Ran local tests 
   ```
   python -m apache_beam.examples.inference.pytorch_language_modeling \
     --input apache_beam/examples/inference/data/sentences.txt \
     --output predictions.csv \
     --model_state_dict_path BertForMaskedLM.pth
   
   python -m apache_beam.examples.inference.pytorch_language_modeling \
     --output predictions.csv \
     --model_state_dict_path BertForMaskedLM.pth
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn merged pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn merged PR #21818:
URL: https://github.com/apache/beam/pull/21818


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897965718


##########
sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py:
##########
@@ -89,6 +90,41 @@ def test_torch_run_inference_imagenet_mobilenetv2(self):
       filename, prediction = prediction.split(',')
       self.assertEqual(_EXPECTED_OUTPUTS[filename], prediction)
 
+  @pytest.mark.uses_pytorch
+  @pytest.mark.it_postcommit
+  def test_torch_run_inference_bert_for_masked_lm(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    # Path to text file containing some sentences
+    file_of_sentences = 'gs://apache-beam-ml/datasets/custom/sentences.txt'  # disable: line-too-long
+    output_file_dir = 'gs://apache-beam-ml/testing/predictions'

Review Comment:
   SG, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897320657


##########
sdks/python/apache_beam/examples/inference/data/sentences.txt:
##########
@@ -0,0 +1,10 @@
+The capital of France is Paris .

Review Comment:
   I did the latter as it's easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1154236014

   # [Codecov](https://codecov.io/gh/apache/beam/pull/21818?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#21818](https://codecov.io/gh/apache/beam/pull/21818?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eaf3c1) into [master](https://codecov.io/gh/apache/beam/commit/edf9b7906cd188cc7d16737d9a779a5cf585a6a2?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (edf9b79) will **decrease** coverage by `0.07%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #21818      +/-   ##
   ==========================================
   - Coverage   74.15%   74.08%   -0.08%     
   ==========================================
     Files         698      699       +1     
     Lines       92417    92468      +51     
   ==========================================
   - Hits        68530    68502      -28     
   - Misses      22636    22715      +79     
     Partials     1251     1251              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.66% <0.00%> (-0.11%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/21818?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/examples/inference/pytorch\_bert.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfYmVydC5weQ==) | `0.00% <0.00%> (ø)` | |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [.../apache\_beam/runners/interactive/dataproc/types.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kYXRhcHJvYy90eXBlcy5weQ==) | `93.10% <0.00%> (-3.45%)` | :arrow_down: |
   | [...n/apache\_beam/ml/gcp/recommendations\_ai\_test\_it.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvZ2NwL3JlY29tbWVuZGF0aW9uc19haV90ZXN0X2l0LnB5) | `73.46% <0.00%> (-2.05%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.01% <0.00%> (-1.39%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.33%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.30% <0.00%> (-0.38%)` | :arrow_down: |
   | [...examples/inference/pytorch\_image\_classification.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfaW1hZ2VfY2xhc3NpZmljYXRpb24ucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/ml/inference/api.py](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL2FwaS5weQ==) | | |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/21818/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/21818?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/21818?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [edf9b79...5eaf3c1](https://codecov.io/gh/apache/beam/pull/21818?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r895686758


##########
sdks/python/apache_beam/examples/inference/pytorch_bert.py:
##########
@@ -0,0 +1,214 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from the bookcorpus dataset, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Iterable
+from typing import Dict
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.api import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertTokenizer, BertForMaskedLM, BertConfig
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default=
+      'gs://apache-beam-ml/datasets/bookcorpus/bookcorpus_subset.parquet',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='/Users/yeandy/Downloads/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (
+        p
+        | 'ReadSentences' >> beam.io.ReadFromParquet(known_args.input)
+        | 'ExtractTextFromDict' >> beam.Map(lambda x: x['text']))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        |
+        'PyTorchRunInference' >> RunInference(model_handler).with_output_types(
+            Tuple[str, PredictionResult])

Review Comment:
   is this still necessary with the proposed changes to make keyed/non-keyed model handlers as separate classes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #21818:
URL: https://github.com/apache/beam/pull/21818#discussion_r897251766


##########
sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py:
##########
@@ -0,0 +1,213 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""""A pipeline that uses RunInference to perform Language Modeling with Bert.
+
+This pipeline takes sentences from a custom text file, removes the last word
+of the sentence, and then uses the BertForMaskedLM from Hugging Face to predict
+the best word to follow or continue that sentence given all the words already in
+the sentence. The pipeline then writes the prediction to an output file in
+which users can then compare against the original sentence.
+"""
+
+import argparse
+from typing import Dict
+from typing import Iterable
+from typing import Tuple
+
+import apache_beam as beam
+import torch
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import PytorchModelHandler
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from transformers import BertConfig
+from transformers import BertForMaskedLM
+from transformers import BertTokenizer
+
+BERT_TOKENIZER = BertTokenizer.from_pretrained('bert-base-uncased')
+
+
+def add_mask_to_last_word(text: str) -> Tuple[str, str]:
+  text_list = text.split()
+  return text, ' '.join(text_list[:-2] + ['[MASK]', text_list[-1]])
+
+
+def tokenize_sentence(
+    text_and_mask: Tuple[str, str]) -> Tuple[str, Dict[str, torch.Tensor]]:
+  text, masked_text = text_and_mask
+  tokenized_sentence = BERT_TOKENIZER.encode_plus(
+      masked_text, return_tensors="pt")
+
+  # Workaround to manually remove batch dim until we have the feature to
+  # add optional batching flag. TODO: Remove once optional batching flag added
+  return text, {
+      k: torch.squeeze(v)
+      for k, v in dict(tokenized_sentence).items()
+  }
+
+
+class PostProcessor(beam.DoFn):
+  def process(
+      self, element: Tuple[str, PredictionResult]) -> Iterable[Tuple[str, str]]:
+    text, prediction_result = element
+    inputs = prediction_result.example
+    logits = prediction_result.inference['logits']
+    mask_token_index = (
+        inputs['input_ids'] == BERT_TOKENIZER.mask_token_id).nonzero(
+            as_tuple=True)[0]
+    predicted_token_id = logits[mask_token_index].argmax(axis=-1)
+    decoded_text = BERT_TOKENIZER.decode(predicted_token_id)
+    yield (text, decoded_text)
+
+
+def parse_known_args(argv):
+  """Parses args for the workflow."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://apache-beam-ml/datasets/custom/sentences.txt',
+      help='Path to the text file containing image names.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      help='Path where to save output predictions.'
+      ' text file.')
+  parser.add_argument(
+      '--model_state_dict_path',
+      dest='model_state_dict_path',
+      default='gs://apache-beam-ml/models/'
+      'huggingface.BertForMaskedLM.bert-base-uncased.pth',
+      help="Path to the model's state_dict. "
+      "Default state_dict would be for the bert-base-uncased model.")
+  return parser.parse_known_args(argv)
+
+
+def run(argv=None, model_class=None, model_params=None, save_main_session=True):
+  """
+  Args:
+    argv: Command line arguments defined for this example.
+    model_class: Reference to the class definition of the model.
+                If None, BertForMaskedLM will be used as default .
+    model_params: Parameters passed to the constructor of the model_class.
+                  These will be used to instantiate the model object in the
+                  RunInference API.
+  """
+  known_args, pipeline_args = parse_known_args(argv)
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  if not model_class:
+    model_config = BertConfig(is_decoder=False, return_dict=True)
+    model_class = BertForMaskedLM
+    model_params = {'config': model_config}
+
+  # TODO: Remove once optional batching flag added
+  class HuggingFaceStripBatchingWrapper(model_class):
+    """Wrapper class to convert output from dict of lists to list of dicts
+
+    The `forward()` function in Hugging Face models don't return a just a
+    standard torch.Tensor output. Instead, they can return a dictionary of
+    different outputs. To work with current RunInference implementation which
+    returns a PredictionResult object for each example, we must override the
+    `forward()` function and convert the standard Hugging Face forward output
+    into the appropriate format of List[Dict[str, torch.Tensor]].
+
+    Before:
+    output = {
+      'logit': torch.FloatTensor of shape
+        (batch_size, sequence_length, config.vocab_size),
+      'hidden_states': tuple(torch.FloatTensor) of shape
+        (batch_size, sequence_length, hidden_size)
+    }
+    After:
+    output = [
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of
+          shape (sequence_length, hidden_size)
+      },
+      {
+        'logit': torch.FloatTensor of shape
+          (sequence_length, config.vocab_size),
+        'hidden_states': tuple(torch.FloatTensor) of shape
+          (sequence_length, hidden_size)
+      },
+      ...
+    ]
+    where len(output) is batch_size
+    """
+    def forward(self, **kwargs):
+      output = super().forward(**kwargs)
+      return [dict(zip(output, v)) for v in zip(*output.values())]
+
+  # TODO: Remove once nested tensors https://github.com/pytorch/nestedtensor
+  # is officially released.
+  class PytorchNoBatchModelHandler(PytorchModelHandler):
+    """Wrapper to PytorchModelHandler to limit batch size to 1.
+
+    The tokenized strings generated from BertTokenizer may have different
+    lengths, which doesn't work with torch.stack() in current RunInference
+    implementation since stack() requires tensors to be the same size.
+
+    Restricting max_batch_size to 1 means there is only 1 example per `batch`
+    in the run_inference() call.
+    """
+    def batch_elements_kwargs(self):
+      return {'max_batch_size': 1}
+
+  model_handler = PytorchNoBatchModelHandler(
+      state_dict_path=known_args.model_state_dict_path,
+      model_class=HuggingFaceStripBatchingWrapper,
+      model_params=model_params)
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    text = (p | 'ReadSentences' >> beam.io.ReadFromText(known_args.input))
+    text_and_masked_text_tuple = (
+        text
+        | 'AddMask' >> beam.Map(add_mask_to_last_word))
+    text_and_tokenized_text_tuple = (
+        text_and_masked_text_tuple
+        | 'TokenizeSentence' >> beam.Map(tokenize_sentence))
+    text_and_predictions = (
+        text_and_tokenized_text_tuple
+        | 'PyTorchRunInference' >> RunInference(
+            KeyedModelHandler(model_handler)).with_output_types(
+                Tuple[str, PredictionResult])
+        | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+    combined_text = (({
+        'masked_text': text_and_masked_text_tuple,
+        'predicted_text': text_and_predictions
+    })
+                     | 'Merge' >> beam.CoGroupByKey()

Review Comment:
   Actually, the CoGBK can't be folded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on pull request #21818: Add Bert Language Modeling example

Posted by GitBox <gi...@apache.org>.
yeandy commented on PR #21818:
URL: https://github.com/apache/beam/pull/21818#issuecomment-1153828983

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org