You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/10/31 22:17:01 UTC

[beam-starter-python] branch main updated: Revert "Add demo code"

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-starter-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bc87b0  Revert "Add demo code"
3bc87b0 is described below

commit 3bc87b027b508b12aba0f1306276b97599b70cbe
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon Oct 31 18:16:47 2022 -0400

    Revert "Add demo code"
    
    This reverts commit 7d083ddb102bc970a19584d9ab238572aae0a946.
---
 .github/PULL_REQUEST_TEMPLATE.md |  12 ++
 .github/dependabot.yml           |  19 +++
 .github/workflows/test.yaml      |  30 ++++
 CONTRIBUTING.md                  |  69 +++++++++
 LICENSE                          | 236 +++++++++++++++++++++++++++++
 MANIFEST.in                      |   1 +
 NOTICE                           |   5 +
 README.md                        |  79 +++++++++-
 main.py                          |  32 +---
 my_app/app.py                    | 314 ++-------------------------------------
 requirements.txt                 |   1 +
 setup.py                         |   7 +-
 test/__init__.py                 |   7 +
 test/test_my_app.py              |  34 +++++
 14 files changed, 507 insertions(+), 339 deletions(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 0000000..50a629d
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,12 @@
+**Please** add a meaningful description for your change here
+
+------------------------
+
+Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
+
+ - [ ] I agree that my contributions are licensed with both [Apache ASL2](../../LICENSE-APACHE) and [MIT](../../LICENSE-MIT).
+ - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
+ - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
+ - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
+
+See the [`CONTRIBUTING.md`](../../CONTRIBUTING.md) guide for more information.
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
new file mode 100644
index 0000000..6285813
--- /dev/null
+++ b/.github/dependabot.yml
@@ -0,0 +1,19 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
+
+version: 2
+updates:
+  - package-ecosystem: "pip"
+    directory: "/"
+    schedule:
+      interval: "daily"
+
+  - package-ecosystem: "github-actions"
+    directory: "/"
+    schedule:
+      interval: "daily"
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
new file mode 100644
index 0000000..be3d2a7
--- /dev/null
+++ b/.github/workflows/test.yaml
@@ -0,0 +1,30 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
+
+name: Test
+
+on: [push, pull_request]
+
+jobs:
+  tests:
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        python-version: ["3.7", "3.8", "3.9"]
+    steps:
+    - uses: actions/checkout@v3
+    - name: Set up Python ${{ matrix.python-version }}
+      uses: actions/setup-python@v3
+      with:
+        python-version: ${{ matrix.python-version }}
+    - name: Install local package
+      run: |
+        pip install -U pip
+        pip install -e .
+    - name: Run tests
+      run: python -m unittest -v
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..45e824a
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,69 @@
+# Contributing
+
+🎉🎊 Thanks for taking the time to contribute! 🎉🎊
+
+There are many ways to contribute, here are some.
+
+## Filing an issue
+
+If there's any issue you encounter or anything that needs to be fixed, feel free to [create a GitHub issue](https://github.com/apache/beam-starter-python/issues).
+
+## Contributing to this starter project
+
+If this is your first time contributing to a GitHub repo,
+we recommmend going through the
+[GitHub quickstart](https://docs.github.com/en/get-started/quickstart/hello-world).
+
+It's a good idea to discuss your plans with the Beam community through the dev@beam.apache.org mailing list before doing any changes.
+
+Here's a small overview of the process.
+
+1. [Fork the repo](https://docs.github.com/en/get-started/quickstart/fork-a-repo).
+
+1. Clone the repo.
+
+    ```sh
+    export GITHUB_USERNAME="my-github-username"
+
+    git clone git@github.com:$GITHUB_USERNAME/beam-starter-python.git
+    ```
+
+1. Set the [upstream remote branch](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/configuring-a-remote-for-a-fork).
+
+    ```sh
+    cd beam-starter-python
+    git remote add upstream git@github.com:apache/beam-starter-python.git
+    ```
+
+1. Create and change to a new branch.
+
+    ```sh
+    git checkout -B my-branch-name
+    ```
+
+1. Modify the code! 😱
+
+1. Run the tests. For steps on how to run them see the [`README.md`](README.md).
+
+1. Commit and push your changes to your branch in `origin`.
+
+    ```sh
+    git commit -m "one line description of your changes"
+    git push
+    ```
+
+1. [Create a Pull Request](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request).
+
+1. Add reviewers, and [address review comments](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests).
+
+1. Once it's approved, we can merge the Pull Request.
+
+For more information about proposing changes to a GitHub repository, see the
+[Propose changes](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-branches)
+page in the GitHub docs.
+
+## Contributing to Apache Beam
+
+For information on how to contribute to
+[Apache Beam](https://github.com/apache/beam), see the
+[Contribution guide](https://beam.apache.org/contribute/).
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..c7e6385
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,236 @@
+This software is distributed under the terms of both the MIT license and the
+Apache License (Version 2.0).
+
+
+MIT license
+
+Copyright 2022 Google LLC
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+
+Apache 2 license
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..540b720
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+include requirements.txt
\ No newline at end of file
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..454af2e
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache Beam
+Copyright [2022-] The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 020b71e..dcd218e 100644
--- a/README.md
+++ b/README.md
@@ -1,11 +1,78 @@
-# ato-demo-2022
+# Apache Beam starter for Python
 
-To run on Dataflow:
+If you want to clone this repository to start your own project,
+you can choose the license you prefer and feel free to delete anything related to the license you are dropping.
 
+## Before you begin
+
+Make sure you have a [Python 3](https://www.python.org/) development environment ready.
+If you don't, you can download and install it from the
+[Python downloads page](https://www.python.org/downloads/).
+
+We recommend using a virtual environment to isolate your project's dependencies.
+
+```sh
+# Create a new Python virtual environment.
+python -m venv env
+
+# Activate the virtual environment.
+source env/bin/activate
 ```
-python3 -m venv ~/.virtualenvs/env
-. ~/.virtualenvs/env/bin/activate
 
+While activated, your `python` and `pip` commands will point to the virtual environment,
+so any changes or install dependencies are self-contained.
+
+As a one time setup, let's install the project's dependencies from the [`requirements.txt`](requirements.txt) file.
+
+```py
+# It's always a good idea to update pip before installing dependencies.
+pip install -U pip
+
+# Install the project as a local package, this installs all the dependencies as well.
 pip install -e .
-python main.py --region us-central1 --runner DataflowRunner --project <your project> --temp_location gs://path/to/temp/location --streaming --use_runner_v2 --allow_unsafe_triggers
-```
\ No newline at end of file
+```
+
+> ℹī¸ Once you are done, you can run the `deactivate` command to go back to your global Python installation.
+
+### Running the pipeline
+
+Running your pipeline in Python is as easy as running the script file directly.
+
+```sh
+# You can run the script file directly.
+python main.py
+
+# To run passing command line arguments.
+python main.py --input-text="🎉"
+
+# To run the tests.
+python -m unittest -v
+```
+
+## GitHub Actions automated testing
+
+This project already comes with automated testing via [GitHub Actions](https://github.com/features/actions).
+
+To configure it, look at the [`.github/workflows/test.yaml`](.github/workflows/test.yaml) file.
+
+## Using other runners
+
+To keep this template small, it only includes the [Direct Runner](https://beam.apache.org/documentation/runners/direct/).
+
+For a comparison of what each runner currently supports, look at the [Beam Capability Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).
+
+To add a new runner, visit the runner's page for instructions on how to include it.
+
+## Contributing
+
+Thank you for your interest in contributing!
+All contributions are welcome! 🎉🎊
+
+Please refer to the [`CONTRIBUTING.md`](CONTRIBUTING.md) file for more information.
+
+# License
+
+This software is distributed under the terms of both the MIT license and the
+Apache License (Version 2.0).
+
+See [LICENSE](LICENSE) for details.
diff --git a/main.py b/main.py
index f909c2d..2de2540 100644
--- a/main.py
+++ b/main.py
@@ -18,35 +18,15 @@ if __name__ == "__main__":
     logging.getLogger().setLevel(logging.INFO)
 
     parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--input-text",
+        default="Default input text",
+        help="Input text to print.",
+    )
     args, beam_args = parser.parse_known_args()
 
-    example_images = ['https://storage.googleapis.com/apache-beam-samples/image_captioning/bear.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/bear.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/bear.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/bee.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/bee.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/fox.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/fox.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/ladybug.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/ladybug.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/ladybug.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/starfish.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/zebra.jpeg'
-              'https://storage.googleapis.com/apache-beam-samples/image_captioning/zebra.jpeg']* 333
-
     beam_options = PipelineOptions(save_main_session=True, setup_file="./setup.py")
     app.run(
+        input_text=args.input_text,
         beam_options=beam_options,
-        example_images=example_images,
     )
diff --git a/my_app/app.py b/my_app/app.py
index c7627d4..7edcd7a 100644
--- a/my_app/app.py
+++ b/my_app/app.py
@@ -6,314 +6,22 @@
 # option. This file may not be copied, modified, or distributed
 # except according to those terms.
 
-from io import BytesIO
-import logging
-import math
-import random
-import requests
-import torch
-from torchvision import models
-from torchvision import transforms
-import time
-from typing import Iterable
-from typing import Optional
-from typing import Tuple
+from typing import Callable, Optional
 import apache_beam as beam
-from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
-from apache_beam.ml.inference import RunInference
-from apache_beam.ml.inference.base import KeyedModelHandler
-from apache_beam.ml.inference.base import PredictionResult
-from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
 from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.transforms import window
-# from apache_beam.transforms.periodicsequence import PeriodicImpulse
-# from my_app.transformations.impulse import PeriodicImpulse
-from apache_beam.transforms.trigger import AccumulationMode
-from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.trigger import DefaultTrigger
-from apache_beam.transforms.trigger import OrFinally
-from apache_beam.transforms.trigger import Repeatedly
-from apache_beam.utils import timestamp
-from PIL import Image
-from apache_beam.io.restriction_trackers import OffsetRange
-from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
-from apache_beam.runners import sdf_utils
-from apache_beam.transforms import core
-from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils.timestamp import MAX_TIMESTAMP
-from apache_beam.utils.timestamp import Timestamp
 
 
-# Everything surrounding impulsegen + periodic sequence/impulse can mostly be ignored.
-# This is a local patch of a bug in Beam 2.42.0 which will be fixed shortly (likely in 2.44.0).
-class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
-  def initial_restriction(self, element):
-    start, end, interval = element
-    if isinstance(start, Timestamp):
-      start = start.micros / 1000000
-    if isinstance(end, Timestamp):
-      end = end.micros / 1000000
-
-    assert start <= end
-    assert interval > 0
-    total_outputs = math.ceil((end - start) / interval)
-    return OffsetRange(0, total_outputs)
-
-  def create_tracker(self, restriction):
-    return OffsetRestrictionTracker(restriction)
-
-  def restriction_size(self, unused_element, restriction):
-    return restriction.size()
-
-
-class ImpulseSeqGenDoFn(beam.DoFn):
-  '''
-  ImpulseSeqGenDoFn fn receives tuple elements with three parts:
-
-  * first_timestamp = first timestamp to output element for.
-  * last_timestamp = last timestamp/time to output element for.
-  * fire_interval = how often to fire an element.
-
-  For each input element received, ImpulseSeqGenDoFn fn will start
-  generating output elements in following pattern:
-
-  * if element timestamp is less than current runtime then output element.
-  * if element timestamp is greater than current runtime, wait until next
-    element timestamp.
-
-  ImpulseSeqGenDoFn can't guarantee that each element is output at exact time.
-  ImpulseSeqGenDoFn guarantees that elements would not be output prior to
-  given runtime timestamp.
-  '''
-  @beam.DoFn.unbounded_per_element()
-  def process(
-      self,
-      element,
-      restriction_tracker=beam.DoFn.RestrictionParam(
-          ImpulseSeqGenRestrictionProvider()),
-      watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
-          ManualWatermarkEstimator.default_provider())):
-    '''
-    :param element: (start_timestamp, end_timestamp, interval)
-    :param restriction_tracker:
-    :return: yields elements at processing real-time intervals with value of
-      target output timestamp for the element.
-    '''
-    start, _, interval = element
-
-    if isinstance(start, Timestamp):
-      start = start.micros / 1000000
-
-    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
-
-    current_output_index = restriction_tracker.current_restriction().start
-    current_output_timestamp = start + interval * current_output_index
-    current_time = time.time()
-    watermark_estimator.set_watermark(
-        timestamp.Timestamp(current_output_timestamp))
-
-    while current_output_timestamp <= current_time:
-      if restriction_tracker.try_claim(current_output_index):
-        yield current_output_timestamp
-        current_output_index += 1
-        current_output_timestamp = start + interval * current_output_index
-        current_time = time.time()
-        watermark_estimator.set_watermark(
-            timestamp.Timestamp(current_output_timestamp))
-      else:
-        return
-
-    restriction_tracker.defer_remainder(
-        timestamp.Timestamp(current_output_timestamp))
-
-
-class PeriodicSequence(PTransform):
-  '''
-  PeriodicSequence transform receives tuple elements with three parts:
-
-  * first_timestamp = first timestamp to output element for.
-  * last_timestamp = last timestamp/time to output element for.
-  * fire_interval = how often to fire an element.
-
-  For each input element received, PeriodicSequence transform will start
-  generating output elements in following pattern:
-
-  * if element timestamp is less than current runtime then output element.
-  * if element timestamp is greater than current runtime, wait until next
-    element timestamp.
-
-  PeriodicSequence can't guarantee that each element is output at exact time.
-  PeriodicSequence guarantees that elements would not be output prior to given
-  runtime timestamp.
-  The PCollection generated by PeriodicSequence is unbounded.
-  '''
-  def __init__(self):
-    pass
-
-  def expand(self, pcoll):
-    return (
-        pcoll
-        | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())
-        | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
-
-
-class PeriodicImpulse(PTransform):
-  '''
-  PeriodicImpulse transform generates an infinite sequence of elements with
-  given runtime interval.
-
-  PeriodicImpulse transform behaves same as {@link PeriodicSequence} transform,
-  but can be used as first transform in pipeline.
-  The PCollection generated by PeriodicImpulse is unbounded.
-  '''
-  def __init__(
-      self,
-      start_timestamp=Timestamp.now(),
-      stop_timestamp=MAX_TIMESTAMP,
-      fire_interval=360.0,
-      apply_windowing=False):
-    '''
-    :param start_timestamp: Timestamp for first element.
-    :param stop_timestamp: Timestamp after which no elements will be output.
-    :param fire_interval: Interval at which to output elements.
-    :param apply_windowing: Whether each element should be assigned to
-      individual window. If false, all elements will reside in global window.
-    '''
-    self.start_ts = start_timestamp
-    self.stop_ts = stop_timestamp
-    self.interval = fire_interval
-    self.apply_windowing = apply_windowing
-
-  def expand(self, pbegin):
-    result = (
-        pbegin
-        | 'ImpulseElement' >> beam.Create(
-            [(self.start_ts, self.stop_ts, self.interval)])
-        | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())
-        | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
-    if self.apply_windowing:
-      result = result | 'ApplyWindowing' >> beam.WindowInto(
-          window.FixedWindows(self.interval))
-    return result
-
-
-# Everything above can largely be ignored. This is a recreation of PeriodicImpulse because the built in version has a bug.
-
-def read_image(image_url: str) -> Tuple[str, Image.Image]:
-    response = requests.get(image_url)
-    image = Image.open(BytesIO(response.content)).convert('RGB')
-    return image_url, image
-
-def preprocess_image(data: Image.Image) -> torch.Tensor:
-  image_size = (224, 224)
-  # Pre-trained PyTorch models expect input images normalized with the
-  # below values (see: https://pytorch.org/vision/stable/models.html)
-  normalize = transforms.Normalize(
-      mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
-  transform = transforms.Compose([
-      transforms.Resize(image_size),
-      transforms.ToTensor(),
-      normalize,
-  ])
-  return transform(data)
-
-class PostProcessor(beam.DoFn):
-  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
-    image_url, prediction_result = element
-    prediction = torch.argmax(prediction_result.inference, dim=0)
-    # Map back to items based on https://deeplearning.cms.waikato.ac.nz/user-guide/class-maps/IMAGENET/
-    if prediction.item() == 294:
-        yield 'bear'
-    elif prediction.item() == 309:
-        yield 'bee'
-    elif prediction.item() == 277:
-        yield 'fox'
-    elif prediction.item() == 235:
-        yield 'dog'
-    elif prediction.item() == 301:
-        yield 'ladybug'
-    elif prediction.item() == 327:
-        yield 'starfish'
-    elif prediction.item() == 292:
-        yield 'tiger'
-    elif prediction.item() == 340:
-        yield 'zebra'
-    else:
-        yield 'no_matches'
-
-class Logger(beam.DoFn):
-
-  def __init__(self, name="main"):
-    self._name = name
-
-  def process(self, element, w=beam.DoFn.WindowParam,
-              ts=beam.DoFn.TimestampParam):
-    logging.warning('%s', element)
-    yield element
-
-@beam.DoFn.unbounded_per_element()
-class AddWatermark(beam.DoFn):
-
-    def process(self, element, ts=beam.DoFn.TimestampParam, restriction_tracker=beam.DoFn.RestrictionParam(
-          ImpulseSeqGenRestrictionProvider()), watermark_estimator=beam.DoFn.WatermarkEstimatorParam(ManualWatermarkEstimator.default_provider())):
-        logging.warning('Setting watermark: %s', ts)
-        watermark_estimator.set_watermark(ts)
-        yield element
-
-class RandomlySampleImages(beam.DoFn):
-
-    def __init__(self, example_images):
-        self.example_images = example_images
-    
-    def process(self, unused_element):
-        # Expecting 8,000 elements; this will return around 800 each time
-        for tweet in self.example_images:
-            num = random.random()
-            # Sample 10% of the elements.
-            if num > 0.9:
-                yield tweet
-
 def run(
-    beam_options: Optional[PipelineOptions],
-    example_images,
+    input_text: str,
+    beam_options: Optional[PipelineOptions] = None,
+    test: Callable[[beam.PCollection], None] = lambda _: None,
 ) -> None:
-    model_class = models.mobilenet_v2
-    model_params = {'num_classes': 1000}
-
-    class PytorchModelHandlerTensorWithBatchSize(PytorchModelHandlerTensor):
-        def batch_elements_kwargs(self):
-            return {'min_batch_size': 10, 'max_batch_size': 100}
-
-    # In this example we pass keyed inputs to RunInference transform.
-    # Therefore, we use KeyedModelHandler wrapper over PytorchModelHandler.
-    model_handler = KeyedModelHandler(
-        PytorchModelHandlerTensorWithBatchSize(
-            state_dict_path="gs://world-readable-mkcq69tkcu/dannymccormick/ato-model/mobilenet.pth",
-            model_class=model_class,
-            model_params=model_params))
-
     with beam.Pipeline(options=beam_options) as pipeline:
-        it = time.time()
-        duration = 300000
-        et = it + duration
-        interval = 1
-        sample_image_urls = (
+        elements = (
             pipeline
-            | PeriodicImpulse(it, et, interval, False)
-            | "Randomly sample images" >> beam.ParDo(RandomlySampleImages(example_images)))
-        inferences = (
-            sample_image_urls
-            | beam.WindowInto(
-                window.FixedWindows(1 * 60),
-                # Fire trigger once we've accumulated at least 200 elements for a key
-                trigger=OrFinally(Repeatedly(AfterCount(200)), DefaultTrigger()),
-                accumulation_mode=AccumulationMode.ACCUMULATING)
-            | 'ReadImageData' >> beam.Map(lambda image_url: read_image(image_url=image_url))
-            | 'PreprocessImages' >> beam.MapTuple(lambda file_name, data: (file_name, preprocess_image(data)))
-            | 'PyTorchRunInference' >> RunInference(model_handler)
-            | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
-        (inferences
-        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
-        | 'Count elements per key' >> beam.combiners.Count.PerKey()
-        | 'LOG' >> beam.ParDo(Logger()))
\ No newline at end of file
+            | "Create elements" >> beam.Create(["Hello", "World!", input_text])
+            | "Print elements" >> beam.Map(print)
+        )
+
+        # Used for testing only.
+        test(elements)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..34a4d2e
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1 @@
+apache-beam==2.39.0
diff --git a/setup.py b/setup.py
index 06dc274..cbee563 100644
--- a/setup.py
+++ b/setup.py
@@ -8,15 +8,14 @@
 
 from setuptools import setup, find_packages
 
-requirements = [
-    "apache-beam[gcp]==2.42.0", "transformers==4.21.0", "torch==1.13.0", "torchvision==0.14.0"
-]
+with open("requirements.txt") as f:
+    requirements = f.readlines()
 
 setup(
     name="My app",
     version="1.0",
     description="Python Apache Beam pipeline.",
-    author="Danny McCormick",
+    author="My name",
     author_email="my@email.com",
     packages=find_packages(),
     install_requires=requirements,
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..bfe3779
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,7 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
diff --git a/test/test_my_app.py b/test/test_my_app.py
new file mode 100644
index 0000000..c3ecedb
--- /dev/null
+++ b/test/test_my_app.py
@@ -0,0 +1,34 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
+
+# For more information on unittest, see:
+#   https://docs.python.org/3/library/unittest.html
+
+import unittest
+from unittest.mock import patch
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
+
+import my_app
+
+
+@patch("apache_beam.Pipeline", TestPipeline)
+@patch("builtins.print", lambda x: x)
+class TestApp(unittest.TestCase):
+    def test_run_direct_runner(self):
+        # Note that the order of the elements doesn't matter.
+        expected = ["Test", "Hello", "World!"]
+        my_app.run(
+            input_text="Test",
+            test=lambda elements: assert_that(elements, equal_to(expected)),
+        )
+
+
+if __name__ == "__main__":
+    unittest.main()