You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/10/31 22:17:01 UTC
[beam-starter-python] branch main updated: Revert "Add demo code"
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-starter-python.git
The following commit(s) were added to refs/heads/main by this push:
new 3bc87b0 Revert "Add demo code"
3bc87b0 is described below
commit 3bc87b027b508b12aba0f1306276b97599b70cbe
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon Oct 31 18:16:47 2022 -0400
Revert "Add demo code"
This reverts commit 7d083ddb102bc970a19584d9ab238572aae0a946.
---
.github/PULL_REQUEST_TEMPLATE.md | 12 ++
.github/dependabot.yml | 19 +++
.github/workflows/test.yaml | 30 ++++
CONTRIBUTING.md | 69 +++++++++
LICENSE | 236 +++++++++++++++++++++++++++++
MANIFEST.in | 1 +
NOTICE | 5 +
README.md | 79 +++++++++-
main.py | 32 +---
my_app/app.py | 314 ++-------------------------------------
requirements.txt | 1 +
setup.py | 7 +-
test/__init__.py | 7 +
test/test_my_app.py | 34 +++++
14 files changed, 507 insertions(+), 339 deletions(-)
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 0000000..50a629d
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,12 @@
+**Please** add a meaningful description for your change here
+
+------------------------
+
+Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
+
+ - [ ] I agree that my contributions are licensed with both [Apache ASL2](../../LICENSE-APACHE) and [MIT](../../LICENSE-MIT).
+ - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
+ - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
+ - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
+
+See the [`CONTRIBUTING.md`](../../CONTRIBUTING.md) guide for more information.
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
new file mode 100644
index 0000000..6285813
--- /dev/null
+++ b/.github/dependabot.yml
@@ -0,0 +1,19 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
+
+version: 2
+updates:
+ - package-ecosystem: "pip"
+ directory: "/"
+ schedule:
+ interval: "daily"
+
+ - package-ecosystem: "github-actions"
+ directory: "/"
+ schedule:
+ interval: "daily"
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
new file mode 100644
index 0000000..be3d2a7
--- /dev/null
+++ b/.github/workflows/test.yaml
@@ -0,0 +1,30 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
+
+name: Test
+
+on: [push, pull_request]
+
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: ["3.7", "3.8", "3.9"]
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v3
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install local package
+ run: |
+ pip install -U pip
+ pip install -e .
+ - name: Run tests
+ run: python -m unittest -v
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..45e824a
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,69 @@
+# Contributing
+
+đđ Thanks for taking the time to contribute! đđ
+
+There are many ways to contribute, here are some.
+
+## Filing an issue
+
+If there's any issue you encounter or anything that needs to be fixed, feel free to [create a GitHub issue](https://github.com/apache/beam-starter-python/issues).
+
+## Contributing to this starter project
+
+If this is your first time contributing to a GitHub repo,
+we recommmend going through the
+[GitHub quickstart](https://docs.github.com/en/get-started/quickstart/hello-world).
+
+It's a good idea to discuss your plans with the Beam community through the dev@beam.apache.org mailing list before doing any changes.
+
+Here's a small overview of the process.
+
+1. [Fork the repo](https://docs.github.com/en/get-started/quickstart/fork-a-repo).
+
+1. Clone the repo.
+
+ ```sh
+ export GITHUB_USERNAME="my-github-username"
+
+ git clone git@github.com:$GITHUB_USERNAME/beam-starter-python.git
+ ```
+
+1. Set the [upstream remote branch](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/configuring-a-remote-for-a-fork).
+
+ ```sh
+ cd beam-starter-python
+ git remote add upstream git@github.com:apache/beam-starter-python.git
+ ```
+
+1. Create and change to a new branch.
+
+ ```sh
+ git checkout -B my-branch-name
+ ```
+
+1. Modify the code! đą
+
+1. Run the tests. For steps on how to run them see the [`README.md`](README.md).
+
+1. Commit and push your changes to your branch in `origin`.
+
+ ```sh
+ git commit -m "one line description of your changes"
+ git push
+ ```
+
+1. [Create a Pull Request](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request).
+
+1. Add reviewers, and [address review comments](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests).
+
+1. Once it's approved, we can merge the Pull Request.
+
+For more information about proposing changes to a GitHub repository, see the
+[Propose changes](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-branches)
+page in the GitHub docs.
+
+## Contributing to Apache Beam
+
+For information on how to contribute to
+[Apache Beam](https://github.com/apache/beam), see the
+[Contribution guide](https://beam.apache.org/contribute/).
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..c7e6385
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,236 @@
+This software is distributed under the terms of both the MIT license and the
+Apache License (Version 2.0).
+
+
+MIT license
+
+Copyright 2022 Google LLC
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+
+Apache 2 license
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
\ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..540b720
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1 @@
+include requirements.txt
\ No newline at end of file
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..454af2e
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache Beam
+Copyright [2022-] The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 020b71e..dcd218e 100644
--- a/README.md
+++ b/README.md
@@ -1,11 +1,78 @@
-# ato-demo-2022
+# Apache Beam starter for Python
-To run on Dataflow:
+If you want to clone this repository to start your own project,
+you can choose the license you prefer and feel free to delete anything related to the license you are dropping.
+## Before you begin
+
+Make sure you have a [Python 3](https://www.python.org/) development environment ready.
+If you don't, you can download and install it from the
+[Python downloads page](https://www.python.org/downloads/).
+
+We recommend using a virtual environment to isolate your project's dependencies.
+
+```sh
+# Create a new Python virtual environment.
+python -m venv env
+
+# Activate the virtual environment.
+source env/bin/activate
```
-python3 -m venv ~/.virtualenvs/env
-. ~/.virtualenvs/env/bin/activate
+While activated, your `python` and `pip` commands will point to the virtual environment,
+so any changes or install dependencies are self-contained.
+
+As a one time setup, let's install the project's dependencies from the [`requirements.txt`](requirements.txt) file.
+
+```py
+# It's always a good idea to update pip before installing dependencies.
+pip install -U pip
+
+# Install the project as a local package, this installs all the dependencies as well.
pip install -e .
-python main.py --region us-central1 --runner DataflowRunner --project <your project> --temp_location gs://path/to/temp/location --streaming --use_runner_v2 --allow_unsafe_triggers
-```
\ No newline at end of file
+```
+
+> âšī¸ Once you are done, you can run the `deactivate` command to go back to your global Python installation.
+
+### Running the pipeline
+
+Running your pipeline in Python is as easy as running the script file directly.
+
+```sh
+# You can run the script file directly.
+python main.py
+
+# To run passing command line arguments.
+python main.py --input-text="đ"
+
+# To run the tests.
+python -m unittest -v
+```
+
+## GitHub Actions automated testing
+
+This project already comes with automated testing via [GitHub Actions](https://github.com/features/actions).
+
+To configure it, look at the [`.github/workflows/test.yaml`](.github/workflows/test.yaml) file.
+
+## Using other runners
+
+To keep this template small, it only includes the [Direct Runner](https://beam.apache.org/documentation/runners/direct/).
+
+For a comparison of what each runner currently supports, look at the [Beam Capability Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).
+
+To add a new runner, visit the runner's page for instructions on how to include it.
+
+## Contributing
+
+Thank you for your interest in contributing!
+All contributions are welcome! đđ
+
+Please refer to the [`CONTRIBUTING.md`](CONTRIBUTING.md) file for more information.
+
+# License
+
+This software is distributed under the terms of both the MIT license and the
+Apache License (Version 2.0).
+
+See [LICENSE](LICENSE) for details.
diff --git a/main.py b/main.py
index f909c2d..2de2540 100644
--- a/main.py
+++ b/main.py
@@ -18,35 +18,15 @@ if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--input-text",
+ default="Default input text",
+ help="Input text to print.",
+ )
args, beam_args = parser.parse_known_args()
- example_images = ['https://storage.googleapis.com/apache-beam-samples/image_captioning/bear.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/bear.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/bear.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/bee.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/bee.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/fox.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/fox.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/German-Shepherd.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/ladybug.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/ladybug.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/ladybug.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/starfish.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/tiger.jpeg',
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/zebra.jpeg'
- 'https://storage.googleapis.com/apache-beam-samples/image_captioning/zebra.jpeg']* 333
-
beam_options = PipelineOptions(save_main_session=True, setup_file="./setup.py")
app.run(
+ input_text=args.input_text,
beam_options=beam_options,
- example_images=example_images,
)
diff --git a/my_app/app.py b/my_app/app.py
index c7627d4..7edcd7a 100644
--- a/my_app/app.py
+++ b/my_app/app.py
@@ -6,314 +6,22 @@
# option. This file may not be copied, modified, or distributed
# except according to those terms.
-from io import BytesIO
-import logging
-import math
-import random
-import requests
-import torch
-from torchvision import models
-from torchvision import transforms
-import time
-from typing import Iterable
-from typing import Optional
-from typing import Tuple
+from typing import Callable, Optional
import apache_beam as beam
-from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
-from apache_beam.ml.inference import RunInference
-from apache_beam.ml.inference.base import KeyedModelHandler
-from apache_beam.ml.inference.base import PredictionResult
-from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.transforms import window
-# from apache_beam.transforms.periodicsequence import PeriodicImpulse
-# from my_app.transformations.impulse import PeriodicImpulse
-from apache_beam.transforms.trigger import AccumulationMode
-from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.trigger import DefaultTrigger
-from apache_beam.transforms.trigger import OrFinally
-from apache_beam.transforms.trigger import Repeatedly
-from apache_beam.utils import timestamp
-from PIL import Image
-from apache_beam.io.restriction_trackers import OffsetRange
-from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
-from apache_beam.runners import sdf_utils
-from apache_beam.transforms import core
-from apache_beam.transforms.ptransform import PTransform
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils.timestamp import MAX_TIMESTAMP
-from apache_beam.utils.timestamp import Timestamp
-# Everything surrounding impulsegen + periodic sequence/impulse can mostly be ignored.
-# This is a local patch of a bug in Beam 2.42.0 which will be fixed shortly (likely in 2.44.0).
-class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
- def initial_restriction(self, element):
- start, end, interval = element
- if isinstance(start, Timestamp):
- start = start.micros / 1000000
- if isinstance(end, Timestamp):
- end = end.micros / 1000000
-
- assert start <= end
- assert interval > 0
- total_outputs = math.ceil((end - start) / interval)
- return OffsetRange(0, total_outputs)
-
- def create_tracker(self, restriction):
- return OffsetRestrictionTracker(restriction)
-
- def restriction_size(self, unused_element, restriction):
- return restriction.size()
-
-
-class ImpulseSeqGenDoFn(beam.DoFn):
- '''
- ImpulseSeqGenDoFn fn receives tuple elements with three parts:
-
- * first_timestamp = first timestamp to output element for.
- * last_timestamp = last timestamp/time to output element for.
- * fire_interval = how often to fire an element.
-
- For each input element received, ImpulseSeqGenDoFn fn will start
- generating output elements in following pattern:
-
- * if element timestamp is less than current runtime then output element.
- * if element timestamp is greater than current runtime, wait until next
- element timestamp.
-
- ImpulseSeqGenDoFn can't guarantee that each element is output at exact time.
- ImpulseSeqGenDoFn guarantees that elements would not be output prior to
- given runtime timestamp.
- '''
- @beam.DoFn.unbounded_per_element()
- def process(
- self,
- element,
- restriction_tracker=beam.DoFn.RestrictionParam(
- ImpulseSeqGenRestrictionProvider()),
- watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
- ManualWatermarkEstimator.default_provider())):
- '''
- :param element: (start_timestamp, end_timestamp, interval)
- :param restriction_tracker:
- :return: yields elements at processing real-time intervals with value of
- target output timestamp for the element.
- '''
- start, _, interval = element
-
- if isinstance(start, Timestamp):
- start = start.micros / 1000000
-
- assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
-
- current_output_index = restriction_tracker.current_restriction().start
- current_output_timestamp = start + interval * current_output_index
- current_time = time.time()
- watermark_estimator.set_watermark(
- timestamp.Timestamp(current_output_timestamp))
-
- while current_output_timestamp <= current_time:
- if restriction_tracker.try_claim(current_output_index):
- yield current_output_timestamp
- current_output_index += 1
- current_output_timestamp = start + interval * current_output_index
- current_time = time.time()
- watermark_estimator.set_watermark(
- timestamp.Timestamp(current_output_timestamp))
- else:
- return
-
- restriction_tracker.defer_remainder(
- timestamp.Timestamp(current_output_timestamp))
-
-
-class PeriodicSequence(PTransform):
- '''
- PeriodicSequence transform receives tuple elements with three parts:
-
- * first_timestamp = first timestamp to output element for.
- * last_timestamp = last timestamp/time to output element for.
- * fire_interval = how often to fire an element.
-
- For each input element received, PeriodicSequence transform will start
- generating output elements in following pattern:
-
- * if element timestamp is less than current runtime then output element.
- * if element timestamp is greater than current runtime, wait until next
- element timestamp.
-
- PeriodicSequence can't guarantee that each element is output at exact time.
- PeriodicSequence guarantees that elements would not be output prior to given
- runtime timestamp.
- The PCollection generated by PeriodicSequence is unbounded.
- '''
- def __init__(self):
- pass
-
- def expand(self, pcoll):
- return (
- pcoll
- | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())
- | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
-
-
-class PeriodicImpulse(PTransform):
- '''
- PeriodicImpulse transform generates an infinite sequence of elements with
- given runtime interval.
-
- PeriodicImpulse transform behaves same as {@link PeriodicSequence} transform,
- but can be used as first transform in pipeline.
- The PCollection generated by PeriodicImpulse is unbounded.
- '''
- def __init__(
- self,
- start_timestamp=Timestamp.now(),
- stop_timestamp=MAX_TIMESTAMP,
- fire_interval=360.0,
- apply_windowing=False):
- '''
- :param start_timestamp: Timestamp for first element.
- :param stop_timestamp: Timestamp after which no elements will be output.
- :param fire_interval: Interval at which to output elements.
- :param apply_windowing: Whether each element should be assigned to
- individual window. If false, all elements will reside in global window.
- '''
- self.start_ts = start_timestamp
- self.stop_ts = stop_timestamp
- self.interval = fire_interval
- self.apply_windowing = apply_windowing
-
- def expand(self, pbegin):
- result = (
- pbegin
- | 'ImpulseElement' >> beam.Create(
- [(self.start_ts, self.stop_ts, self.interval)])
- | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())
- | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
- if self.apply_windowing:
- result = result | 'ApplyWindowing' >> beam.WindowInto(
- window.FixedWindows(self.interval))
- return result
-
-
-# Everything above can largely be ignored. This is a recreation of PeriodicImpulse because the built in version has a bug.
-
-def read_image(image_url: str) -> Tuple[str, Image.Image]:
- response = requests.get(image_url)
- image = Image.open(BytesIO(response.content)).convert('RGB')
- return image_url, image
-
-def preprocess_image(data: Image.Image) -> torch.Tensor:
- image_size = (224, 224)
- # Pre-trained PyTorch models expect input images normalized with the
- # below values (see: https://pytorch.org/vision/stable/models.html)
- normalize = transforms.Normalize(
- mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
- transform = transforms.Compose([
- transforms.Resize(image_size),
- transforms.ToTensor(),
- normalize,
- ])
- return transform(data)
-
-class PostProcessor(beam.DoFn):
- def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
- image_url, prediction_result = element
- prediction = torch.argmax(prediction_result.inference, dim=0)
- # Map back to items based on https://deeplearning.cms.waikato.ac.nz/user-guide/class-maps/IMAGENET/
- if prediction.item() == 294:
- yield 'bear'
- elif prediction.item() == 309:
- yield 'bee'
- elif prediction.item() == 277:
- yield 'fox'
- elif prediction.item() == 235:
- yield 'dog'
- elif prediction.item() == 301:
- yield 'ladybug'
- elif prediction.item() == 327:
- yield 'starfish'
- elif prediction.item() == 292:
- yield 'tiger'
- elif prediction.item() == 340:
- yield 'zebra'
- else:
- yield 'no_matches'
-
-class Logger(beam.DoFn):
-
- def __init__(self, name="main"):
- self._name = name
-
- def process(self, element, w=beam.DoFn.WindowParam,
- ts=beam.DoFn.TimestampParam):
- logging.warning('%s', element)
- yield element
-
-@beam.DoFn.unbounded_per_element()
-class AddWatermark(beam.DoFn):
-
- def process(self, element, ts=beam.DoFn.TimestampParam, restriction_tracker=beam.DoFn.RestrictionParam(
- ImpulseSeqGenRestrictionProvider()), watermark_estimator=beam.DoFn.WatermarkEstimatorParam(ManualWatermarkEstimator.default_provider())):
- logging.warning('Setting watermark: %s', ts)
- watermark_estimator.set_watermark(ts)
- yield element
-
-class RandomlySampleImages(beam.DoFn):
-
- def __init__(self, example_images):
- self.example_images = example_images
-
- def process(self, unused_element):
- # Expecting 8,000 elements; this will return around 800 each time
- for tweet in self.example_images:
- num = random.random()
- # Sample 10% of the elements.
- if num > 0.9:
- yield tweet
-
def run(
- beam_options: Optional[PipelineOptions],
- example_images,
+ input_text: str,
+ beam_options: Optional[PipelineOptions] = None,
+ test: Callable[[beam.PCollection], None] = lambda _: None,
) -> None:
- model_class = models.mobilenet_v2
- model_params = {'num_classes': 1000}
-
- class PytorchModelHandlerTensorWithBatchSize(PytorchModelHandlerTensor):
- def batch_elements_kwargs(self):
- return {'min_batch_size': 10, 'max_batch_size': 100}
-
- # In this example we pass keyed inputs to RunInference transform.
- # Therefore, we use KeyedModelHandler wrapper over PytorchModelHandler.
- model_handler = KeyedModelHandler(
- PytorchModelHandlerTensorWithBatchSize(
- state_dict_path="gs://world-readable-mkcq69tkcu/dannymccormick/ato-model/mobilenet.pth",
- model_class=model_class,
- model_params=model_params))
-
with beam.Pipeline(options=beam_options) as pipeline:
- it = time.time()
- duration = 300000
- et = it + duration
- interval = 1
- sample_image_urls = (
+ elements = (
pipeline
- | PeriodicImpulse(it, et, interval, False)
- | "Randomly sample images" >> beam.ParDo(RandomlySampleImages(example_images)))
- inferences = (
- sample_image_urls
- | beam.WindowInto(
- window.FixedWindows(1 * 60),
- # Fire trigger once we've accumulated at least 200 elements for a key
- trigger=OrFinally(Repeatedly(AfterCount(200)), DefaultTrigger()),
- accumulation_mode=AccumulationMode.ACCUMULATING)
- | 'ReadImageData' >> beam.Map(lambda image_url: read_image(image_url=image_url))
- | 'PreprocessImages' >> beam.MapTuple(lambda file_name, data: (file_name, preprocess_image(data)))
- | 'PyTorchRunInference' >> RunInference(model_handler)
- | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
- (inferences
- | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
- | 'Count elements per key' >> beam.combiners.Count.PerKey()
- | 'LOG' >> beam.ParDo(Logger()))
\ No newline at end of file
+ | "Create elements" >> beam.Create(["Hello", "World!", input_text])
+ | "Print elements" >> beam.Map(print)
+ )
+
+ # Used for testing only.
+ test(elements)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..34a4d2e
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1 @@
+apache-beam==2.39.0
diff --git a/setup.py b/setup.py
index 06dc274..cbee563 100644
--- a/setup.py
+++ b/setup.py
@@ -8,15 +8,14 @@
from setuptools import setup, find_packages
-requirements = [
- "apache-beam[gcp]==2.42.0", "transformers==4.21.0", "torch==1.13.0", "torchvision==0.14.0"
-]
+with open("requirements.txt") as f:
+ requirements = f.readlines()
setup(
name="My app",
version="1.0",
description="Python Apache Beam pipeline.",
- author="Danny McCormick",
+ author="My name",
author_email="my@email.com",
packages=find_packages(),
install_requires=requirements,
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..bfe3779
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,7 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
diff --git a/test/test_my_app.py b/test/test_my_app.py
new file mode 100644
index 0000000..c3ecedb
--- /dev/null
+++ b/test/test_my_app.py
@@ -0,0 +1,34 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
+# option. This file may not be copied, modified, or distributed
+# except according to those terms.
+
+# For more information on unittest, see:
+# https://docs.python.org/3/library/unittest.html
+
+import unittest
+from unittest.mock import patch
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
+
+import my_app
+
+
+@patch("apache_beam.Pipeline", TestPipeline)
+@patch("builtins.print", lambda x: x)
+class TestApp(unittest.TestCase):
+ def test_run_direct_runner(self):
+ # Note that the order of the elements doesn't matter.
+ expected = ["Test", "Hello", "World!"]
+ my_app.run(
+ input_text="Test",
+ test=lambda elements: assert_that(elements, equal_to(expected)),
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()