You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/04/03 19:47:42 UTC

[beam] branch master updated: Remove accidental commit (#26081)

This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fb9c8cecb7 Remove accidental commit (#26081)
0fb9c8cecb7 is described below

commit 0fb9c8cecb7300274c2c4663c4567c37b334cde0
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Mon Apr 3 15:47:33 2023 -0400

    Remove accidental commit (#26081)
---
 ...m-ml auto_model_updates_using_side_inputs.ipynb | 333 ---------------------
 1 file changed, 333 deletions(-)

diff --git a/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb b/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb
deleted file mode 100644
index a59d307d5f2..00000000000
--- a/beam/beam/examples/notebooks/beam-ml auto_model_updates_using_side_inputs.ipynb	
+++ /dev/null
@@ -1,333 +0,0 @@
-{
-  "nbformat": 4,
-  "nbformat_minor": 0,
-  "metadata": {
-    "colab": {
-      "provenance": [],
-      "include_colab_link": true
-    },
-    "kernelspec": {
-      "name": "python3",
-      "display_name": "Python 3"
-    },
-    "language_info": {
-      "name": "python"
-    }
-  },
-  "cells": [
-    {
-      "cell_type": "markdown",
-      "metadata": {
-        "id": "view-in-github",
-        "colab_type": "text"
-      },
-      "source": [
-        "<a href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/master/beam/beam/examples/notebooks/beam-ml%20auto_model_updates_using_side_inputs.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
-      ]
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
-        "\n",
-        "# Licensed to the Apache Software Foundation (ASF) under one\n",
-        "# or more contributor license agreements. See the NOTICE file\n",
-        "# distributed with this work for additional information\n",
-        "# regarding copyright ownership. The ASF licenses this file\n",
-        "# to you under the Apache License, Version 2.0 (the\n",
-        "# \"License\"); you may not use this file except in compliance\n",
-        "# with the License. You may obtain a copy of the License at\n",
-        "#\n",
-        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
-        "#\n",
-        "# Unless required by applicable law or agreed to in writing,\n",
-        "# software distributed under the License is distributed on an\n",
-        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
-        "# KIND, either express or implied. See the License for the\n",
-        "# specific language governing permissions and limitations\n",
-        "# under the License"
-      ],
-      "metadata": {
-        "cellView": "form",
-        "id": "OsFaZscKSPvo"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "# Use WatchFilePattern to auto-update ML models in RunInference\n",
-        "\n",
-        "The pipeline in this notebook uses a `RunInference` PTransform to run inference on images using TensorFlow models. It uses a side input PCollection that emits `ModelMetadata` to update the model.\n",
-        "\n",
-        "Using side inputs, you can update your model (which is passed in a ModelHandler configuration object) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the WatchFilePattern, or by configuring a custom side input PCollection that defines the logic for the model update.\n",
-        "\n",
-        "For more information about side inputs, see the Side inputs section in the Apache Beam Programming Guide.\n",
-        "\n",
-        "This notebook uses `WatchFilePattern` as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which is used in the `RunInference` PTransform to automatically update the ML model without stopping the Beam pipeline.\n"
-      ],
-      "metadata": {
-        "id": "ZUSiAR62SgO8"
-      }
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "### Before you begin\n",
-        "Install the necessary dependencies that are used to run this notebook.\n",
-        "\n",
-        "To use RunInference with side inputs for automatic model updates, install `Apache Beam` version `2.46.0` or later."
-      ],
-      "metadata": {
-        "id": "SPuXFowiTpWx"
-      }
-    },
-    {
-      "cell_type": "code",
-      "execution_count": null,
-      "metadata": {
-        "id": "1RyTYsFEIOlA"
-      },
-      "outputs": [],
-      "source": [
-        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
-        "!pip install tensorflow\n",
-        "!pip install tensorflow_hub"
-      ]
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "# Imports required for the notebook.\n",
-        "import logging\n",
-        "import time\n",
-        "from typing import Iterable\n",
-        "\n",
-        "import apache_beam as beam\n",
-        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation import PostProcessor\n",
-        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation import read_image\n",
-        "from apache_beam.ml.inference.base import PredictionResult\n",
-        "from apache_beam.ml.inference.base import RunInference\n",
-        "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor\n",
-        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
-        "from apache_beam.options.pipeline_options import GoogleCloudOptions\n",
-        "from apache_beam.options.pipeline_options import PipelineOptions\n",
-        "from apache_beam.options.pipeline_options import SetupOptions\n",
-        "from apache_beam.options.pipeline_options import StandardOptions\n",
-        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
-      ],
-      "metadata": {
-        "id": "Rs4cwwNrIV9H"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "# authenticate to your gcp account.\n",
-        "from google.colab import auth\n",
-        "auth.authenticate_user()"
-      ],
-      "metadata": {
-        "id": "jAKpPcmmGm03"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "# Pipeline options\n",
-        "\n",
-        "Configure the pipeline options for the pipeline to run on Dataflow. Make sure the streaming mode is on for this pipeline."
-      ],
-      "metadata": {
-        "id": "ORYNKhH3WQyP"
-      }
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "options = PipelineOptions()\n",
-        "options.view_as(StandardOptions).streaming = True\n",
-        "\n",
-        "# provide required pipeline options for DataflowRunner\n",
-        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
-        "\n",
-        "# Sets the project to the default project in your current Google Cloud environment.\n",
-        "options.view_as(GoogleCloudOptions).project = '<your-project>'\n",
-        "\n",
-        "# Sets the Google Cloud Region in which Cloud Dataflow runs.\n",
-        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
-        "\n",
-        "# IMPORTANT! Adjust the following to choose a Cloud Storage location.\n",
-        "dataflow_gcs_location = \"gs://<your-bucket>/tmp/\"\n",
-        "\n",
-        "# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.\n",
-        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location\n",
-        "\n",
-        "# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.\n",
-        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location\n",
-        "\n"
-      ],
-      "metadata": {
-        "id": "wWjbnq6X-4uE"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "We need to install the `tensorflow` and `tensorflow_hub` dependencies on Dataflow. We can pass them via `requirements_file` pipeline option."
-      ],
-      "metadata": {
-        "id": "HTJV8pO2Wcw4"
-      }
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "# define dependencies in a requirements file required for the pipeline.\n",
-        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
-        "requirements_file_path = './requirements.txt'\n",
-        "# write the depencies to a requirements file.\n",
-        "with open(requirements_file_path, 'w') as f:\n",
-        "  for dep in deps_required_for_pipeline:\n",
-        "    f.write(dep + '\\n')\n",
-        "\n",
-        "# the pipeline needs dependencies needed to be installed on Dataflow.\n",
-        "options.view_as(SetupOptions).requirements_file = requirements_file_path"
-      ],
-      "metadata": {
-        "id": "lEy4PkluWbdm"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "Let's define configuration for the `PeriodicImpulse`.\n",
-        "\n",
-        "  * `PeriodicImpulse` transform generates an infinite sequence of elements with given runtime interval.\n",
-        "\n",
-        "We use `PeriodicImpulse` in this notebook to mimic the `Pub/Sub` source. Since the inputs in a streaming pipleine arrives in intervals, we use `PeriodicImpulse` to output element at `m` intervals.\n",
-        "\n",
-        "To learn more about PeriodicImpulse, please take a look at the [code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)"
-      ],
-      "metadata": {
-        "id": "wVRmOh_fRBwd"
-      }
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "start_timestamp = time.time()\n",
-        "end_timestamp = start_timestamp + 60 * 20\n",
-        "main_input_fire_interval = 60 # interval at which the main input PCollection is emitted.\n",
-        "side_input_fire_interval = 60 # interval at which the side input PCollection is emitted."
-      ],
-      "metadata": {
-        "id": "E7zsnoxFQ_7L"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "## TensorFlow ModelHandler\n",
-        " In this notebook, we will use `TFModelHandlerTensor` as the ModelHandler. We will use `resnet_101` model trained on imagenet.\n",
-        "\n",
-        " Download the model from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5 and place it in a directory that you would use to auto model updates."
-      ],
-      "metadata": {
-        "id": "_AUNH_GJk_NE"
-      }
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "model_handler = TFModelHandlerTensor(\n",
-        "    model_uri=\"gs://<your-bucket>/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
-      ],
-      "metadata": {
-        "id": "kkSnsxwUk-Sp"
-      },
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "markdown",
-      "source": [
-        "Now, let's jump into the pipeline code.\n",
-        "\n",
-        "**Pipeline steps**:\n",
-        "1. Create a `PeriodImpulse`, which emits output every `n` seconds.\n",
-        "2. Read and pre-process the images using the `read_image` function.\n",
-        "3. Pass the images to the RunInference `PTransform`. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters.\n",
-        "\n",
-        "\n",
-        "The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` to the RunInference `PTransform`. This side input is used to update the models in the `model_handler` without needing to stop the beam pipeline.\n",
-        "We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files.\n",
-        "\n",
-        "`model_metadata_pcoll` expects a `PCollection` of ModelMetadata compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Because the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`.\n",
-        "\n",
-        "**How to watch for auto model update**\n",
-        "\n",
-        "After the pipeline starts processing data and when you see some outputs emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model(for example, [resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5)) that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as a side input.\n",
-        "\n",
-        "**Note**: Side input update frequency is non-deterministic and can have longer intervals between updates.\n",
-        "\n",
-        "When the inference is complete, RunInference outputs a `PredictionResult` object that contains `example`, `inference`, and `model_id` fields. The `model_id` is used to identify which model is used for running the inference."
-      ],
-      "metadata": {
-        "id": "kjnb2Ib3ZpJN"
-      }
-    },
-    {
-      "cell_type": "code",
-      "source": [
-        "pipeline = beam.Pipeline(options=options)\n",
-        "\n",
-        "# file_pattern used in WatchFilePattern to watch for latest model files.\n",
-        "file_pattern = 'gs://<your-bucket>/*.h5'\n",
-        "with beam.Pipeline(options=options) as pipeline:\n",
-        "\n",
-        "  # side input used to watch for .h5 file and auto update the model_uri of the TFModelHandlerTensor.\n",
-        "  side_input_pcoll = (\n",
-        "      pipeline\n",
-        "      | \"WatchFilePattern\" >> WatchFilePattern(file_pattern=file_pattern,\n",
-        "                                                interval=side_input_fire_interval,\n",
-        "                                                stop_timestamp=end_timestamp))\n",
-        "\n",
-        "  read_images = (\n",
-        "      pipeline\n",
-        "      | \"MainInputPcoll\" >> PeriodicImpulse(\n",
-        "          start_timestamp=start_timestamp,\n",
-        "          stop_timestamp=end_timestamp,\n",
-        "          fire_interval=main_input_fire_interval)\n",
-        "      # since this example focuses on the auto model updates, we will use only one image for every prediction.\n",
-        "      | beam.Map(lambda x: \"Cat-with-beanie.jpg\")\n",
-        "      | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n",
-        "          image_name=image_name, image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))\n",
-        "\n",
-        "  inferences = (read_images | \"ApplyWindowing\" >> beam.WindowInto(beam.window.FixedWindows(10))\n",
-        "      | \"RunInference\" >> RunInference(model_handler=model_handler,\n",
-        "                                        model_metadata_pcoll=side_input_pcoll))\n",
-        "\n",
-        "  post_processor = (inferences | \"PostProcessResults\" >> beam.ParDo(PostProcessor()))\n",
-        "\n",
-        "  post_processor | \"print\" >> beam.Map(logging.info)\n"
-      ],
-      "metadata": {
-        "id": "ZWcNlixlQ8-Z"
-      },
-      "execution_count": null,
-      "outputs": []
-    }
-  ]
-}
\ No newline at end of file