You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/08/02 21:02:01 UTC

[beam] branch master updated: Beam ml notebooks (#22510)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 63d0f0d31ab Beam ml notebooks (#22510)
63d0f0d31ab is described below

commit 63d0f0d31ab633b1ecd8fec273ecf647585d3423
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Wed Aug 3 02:31:54 2022 +0530

    Beam ml notebooks (#22510)
    
    * Created using Colaboratory
    
    * RunInference API basic notebook
    
    * Change project and bucket to placeholders
    
    * Remove redundant info
    
    * Created using Colaboratory
    
    * Remove redundant info
---
 .../notebooks/beam-ml/run_inference_basic.ipynb    | 1367 ++++++++++++++++++++
 1 file changed, 1367 insertions(+)

diff --git a/examples/notebooks/beam-ml/run_inference_basic.ipynb b/examples/notebooks/beam-ml/run_inference_basic.ipynb
new file mode 100644
index 00000000000..deb5e618ca4
--- /dev/null
+++ b/examples/notebooks/beam-ml/run_inference_basic.ipynb
@@ -0,0 +1,1367 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 2,
+  "metadata": {
+    "colab": {
+      "name": "Beam DataFrames",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "code",
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "metadata": {
+        "id": "C1rAsD2L-hSO",
+        "cellView": "form"
+      },
+      "id": "C1rAsD2L-hSO",
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e",
+      "metadata": {
+        "id": "b6f8f3af-744e-4eaa-8a30-6d03e8e4d21e"
+      },
+      "source": [
+        "# RunInference\n",
+        "\n",
+        "<button>\n",
+        "  <a href=\"https://beam.apache.org/documentation/sdks/python-machine-learning/\">\n",
+        "    <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n",
+        "    Beam RunInference\n",
+        "  </a>\n",
+        "</button>\n",
+        "\n",
+        "In this notebook, we walk through the use of the RunInference transform.\n",
+        "The transform and its accompanying [ModelHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler) classes handle the following tasks:\n",
+        "\n",
+        "\n",
+        "*   Optimizing loading models from popular frameworks.\n",
+        "*   Batching examples in a scalable fashion.\n",
+        "\n",
+        "\n",
+        "This notebook illustrates common RunInference patterns such as the following:\n",
+        "*   Generating predictions using both Pytorch and Scikit-learn.\n",
+        "*   Post processing results after RunInference.\n",
+        "*   Inference with multiple models in the same pipeline.\n",
+        "\n",
+        "The linear regression models used in these samples are trained on data that correspondes to the 5 and 10 times table; that is,`y = 5x` and `y = 10x` respectively."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "299af9bb-b2fc-405c-96e7-ee0a6ae24bdd",
+      "metadata": {
+        "id": "299af9bb-b2fc-405c-96e7-ee0a6ae24bdd"
+      },
+      "source": [
+        "### Dependencies\n",
+        "\n",
+        "The RunInference library is available in Apache Beam version <b>2.40</b> or later.\n",
+        "\n",
+        "Pytorch module is needed to use Pytorch RunInference API. use `pip` to install Pytorch."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# issue: https://github.com/apache/beam/issues/22218. Becuase of the updates to the Google cloud APIs, Beam SDK from 2.34.0 till 2.40.0 has some dependency conflicts. See the issue for more details.\n",
+        "# Workaround to install the apache beam without getting stuck for long time. Runtime might need to restart after this step.\n",
+        "!pip install google-api-core==1.31.6 --quiet\n",
+        "!pip install google-cloud-pubsub==2.13.1 google-cloud-bigquery-storage==2.13.2 --quiet\n",
+        "!pip install apache-beam[gcp,dataframe] --quiet"
+      ],
+      "metadata": {
+        "id": "loxD-rOVchRn",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "661baa2d-6e0f-4478-b7c1-db911593d5ff"
+      },
+      "id": "loxD-rOVchRn",
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n",
+            "pandas-gbq 0.13.3 requires google-cloud-bigquery[bqstorage,pandas]<2.0.0dev,>=1.11.1, but you have google-cloud-bigquery 2.34.4 which is incompatible.\u001b[0m\n"
+          ]
+        }
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "7f841596-f217-46d2-b64e-1952db4de4cb",
+      "metadata": {
+        "id": "7f841596-f217-46d2-b64e-1952db4de4cb",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "da04ccb9-0801-47f6-ec9e-e87f0ca4569f"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n",
+            "Requirement already satisfied: torch in /usr/local/lib/python3.7/dist-packages (1.12.0+cu113)\n",
+            "Requirement already satisfied: typing-extensions in /usr/local/lib/python3.7/dist-packages (from torch) (4.1.1)\n"
+          ]
+        }
+      ],
+      "source": [
+        "%pip install torch"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "9a92e3a7-beb6-46ae-a5b0-53c15487de38",
+      "metadata": {
+        "id": "9a92e3a7-beb6-46ae-a5b0-53c15487de38"
+      },
+      "outputs": [],
+      "source": [
+        "import argparse\n",
+        "import csv\n",
+        "import json\n",
+        "import os\n",
+        "import torch\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "import numpy\n",
+        "from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n",
+        "from apache_beam.ml.inference.base import KeyedModelHandler\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.dataframe.convert import to_pcollection\n",
+        "from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor\n",
+        "from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "\n",
+        "import warnings\n",
+        "warnings.filterwarnings('ignore')"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "V0E35R5Ka2cE"
+      },
+      "id": "V0E35R5Ka2cE",
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae",
+      "metadata": {
+        "id": "248458a6-cfd8-474d-ad0e-f37f7ae981ae"
+      },
+      "outputs": [],
+      "source": [
+        "# Constants\n",
+        "project = \"<your-project>\"\n",
+        "bucket = \"<your-bucket>\"\n",
+        "\n",
+        "# set the project to avoid warnings.\n",
+        "os.environ['GOOGLE_CLOUD_PROJECT'] = project\n",
+        "\n",
+        "save_model_dir_multiply_five = 'five_times_table_torch.pt'\n",
+        "save_model_dir_multiply_ten = 'ten_times_table_torch.pt'"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "b2b7cedc-79f5-4599-8178-e5da35dba032",
+      "metadata": {
+        "tags": [],
+        "id": "b2b7cedc-79f5-4599-8178-e5da35dba032"
+      },
+      "source": [
+        "## Create data and Pytorch models for RunInference transform"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "202e5a3e-4ccd-4ae3-9852-e47de0721839",
+      "metadata": {
+        "id": "202e5a3e-4ccd-4ae3-9852-e47de0721839"
+      },
+      "source": [
+        "### Linear regression model in Pytorch."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "68bf8bf0-f735-45ee-8ebb-a2d8bb8a6bc7",
+      "metadata": {
+        "id": "68bf8bf0-f735-45ee-8ebb-a2d8bb8a6bc7"
+      },
+      "outputs": [],
+      "source": [
+        "class LinearRegression(torch.nn.Module):\n",
+        "    def __init__(self, input_dim=1, output_dim=1):\n",
+        "        super().__init__()\n",
+        "        self.linear = torch.nn.Linear(input_dim, output_dim)  \n",
+        "    def forward(self, x):\n",
+        "        out = self.linear(x)\n",
+        "        return out"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "1918435c-0029-4eb6-8eee-bda5470eb2ff",
+      "metadata": {
+        "id": "1918435c-0029-4eb6-8eee-bda5470eb2ff"
+      },
+      "source": [
+        "### Prepare train and test data to train a 5 times model.\n",
+        "* `x` contains values in the range from 0 to 99.\n",
+        "* `y` is a list of 5 * `x`. \n",
+        "* `value_to_predict` includes values outside of the training data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "9302917f-6200-4af4-a410-4bd6f2a070b8",
+      "metadata": {
+        "id": "9302917f-6200-4af4-a410-4bd6f2a070b8"
+      },
+      "outputs": [],
+      "source": [
+        "x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n",
+        "y = (x * 5).reshape(-1, 1)\n",
+        "value_to_predict = numpy.array([20, 40, 60, 90], dtype=numpy.float32).reshape(-1, 1)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "9dc22aec-08c3-43ab-a5ce-451cb63c485a",
+      "metadata": {
+        "id": "9dc22aec-08c3-43ab-a5ce-451cb63c485a"
+      },
+      "source": [
+        "### Train the linear regression mode on 5 times data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "0a8b7924-ff06-4584-8f41-079268387a67",
+      "metadata": {
+        "id": "0a8b7924-ff06-4584-8f41-079268387a67"
+      },
+      "outputs": [],
+      "source": [
+        "five_times_model = LinearRegression()\n",
+        "optimizer = torch.optim.Adam(five_times_model.parameters())\n",
+        "loss_fn = torch.nn.L1Loss()\n",
+        "\n",
+        "\"\"\"\n",
+        "Train the five_times_model\n",
+        "\"\"\"\n",
+        "epochs = 10000\n",
+        "tensor_x = torch.from_numpy(x)\n",
+        "tensor_y = torch.from_numpy(y)\n",
+        "for epoch in range(epochs):\n",
+        "    y_pred = five_times_model(tensor_x)\n",
+        "    loss = loss_fn(y_pred, tensor_y)\n",
+        "    five_times_model.zero_grad()\n",
+        "    loss.backward()\n",
+        "    optimizer.step()"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "bd106b29-6187-42c1-9743-1666c147b5e3",
+      "metadata": {
+        "id": "bd106b29-6187-42c1-9743-1666c147b5e3"
+      },
+      "source": [
+        "Save the model using `torch.save()` and verify if the saved model file exists."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "882bbada-4f6d-4370-a047-c5961e564ee8",
+      "metadata": {
+        "id": "882bbada-4f6d-4370-a047-c5961e564ee8",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "3002ed41-dbd5-4a87-d2d1-d1c7908be2f2"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "True\n"
+          ]
+        }
+      ],
+      "source": [
+        "torch.save(five_times_model.state_dict(), save_model_dir_multiply_five)\n",
+        "print(os.path.exists(save_model_dir_multiply_five)) # verify if the model is saved"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "fa84cfca-83c6-4a91-aea1-3dd034c42ae0",
+      "metadata": {
+        "id": "fa84cfca-83c6-4a91-aea1-3dd034c42ae0"
+      },
+      "source": [
+        "### Prepare train and test data to train a 10 times model.\n",
+        "* `x` contains values in the range from 0 to 99.\n",
+        "* `y` is a list of 10 * `x`. "
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "27e0d1f6-2c3e-4418-8fb0-b5b89ffa66d5",
+      "metadata": {
+        "id": "27e0d1f6-2c3e-4418-8fb0-b5b89ffa66d5"
+      },
+      "outputs": [],
+      "source": [
+        "x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n",
+        "y = (x * 10).reshape(-1, 1)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "24d946dc-4fe0-4030-8f6a-aa8d27fd353d",
+      "metadata": {
+        "id": "24d946dc-4fe0-4030-8f6a-aa8d27fd353d"
+      },
+      "source": [
+        "### Train the linear regression model on 10 times data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "2b352313-b791-48fd-9b9d-b54233176416",
+      "metadata": {
+        "id": "2b352313-b791-48fd-9b9d-b54233176416"
+      },
+      "outputs": [],
+      "source": [
+        "ten_times_model = LinearRegression()\n",
+        "optimizer = torch.optim.Adam(ten_times_model.parameters())\n",
+        "loss_fn = torch.nn.L1Loss()\n",
+        "\n",
+        "epochs = 10000\n",
+        "tensor_x = torch.from_numpy(x)\n",
+        "tensor_y = torch.from_numpy(y)\n",
+        "for epoch in range(epochs):\n",
+        "    y_pred = ten_times_model(tensor_x)\n",
+        "    loss = loss_fn(y_pred, tensor_y)\n",
+        "    ten_times_model.zero_grad()\n",
+        "    loss.backward()\n",
+        "    optimizer.step()"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "6f959e3b-230b-45e2-9df3-dd1f11acacd7",
+      "metadata": {
+        "id": "6f959e3b-230b-45e2-9df3-dd1f11acacd7"
+      },
+      "source": [
+        "Save the model using `torch.save()`"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "42b2ca0f-5d44-4d15-a313-f3d56ae7f675",
+      "metadata": {
+        "id": "42b2ca0f-5d44-4d15-a313-f3d56ae7f675",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "ed9f51c1-8dfe-44bc-c861-28d660ad3799"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "True\n"
+          ]
+        }
+      ],
+      "source": [
+        "torch.save(ten_times_model.state_dict(), save_model_dir_multiply_ten)\n",
+        "print(os.path.exists(save_model_dir_multiply_ten)) # verify if the model is saved"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "2e20efc4-13e8-46e2-9848-c0347deaa5af",
+      "metadata": {
+        "id": "2e20efc4-13e8-46e2-9848-c0347deaa5af"
+      },
+      "source": [
+        "# Pattern 1: RunInference for predictions."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "1099fe94-d4cf-422e-a0d3-0cfba8af64d5",
+      "metadata": {
+        "id": "1099fe94-d4cf-422e-a0d3-0cfba8af64d5"
+      },
+      "source": [
+        "### Step 1 - Use RunInference within the pipeline.\n",
+        "\n",
+        "1. Create pytorch model handler object by passing required arguments such as `state_dict_path`, `model_class`, `model_params` to the `PytorchModelHandlerTensor` class.\n",
+        "2. Pass the `PytorchModelHandlerTensor` object to the RunInference transform to peform prediction on unkeyed data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "e488a821-3b70-4284-96f3-ddee4dcb9d71",
+      "metadata": {
+        "id": "e488a821-3b70-4284-96f3-ddee4dcb9d71",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "6f4e4136-aa6c-4fd4-8be6-2ed8d7ca4545"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "PredictionResult(example=tensor([20.]), inference=tensor([99.9943], grad_fn=<UnbindBackward0>))\n",
+            "PredictionResult(example=tensor([40.]), inference=tensor([199.9889], grad_fn=<UnbindBackward0>))\n",
+            "PredictionResult(example=tensor([60.]), inference=tensor([299.9835], grad_fn=<UnbindBackward0>))\n",
+            "PredictionResult(example=tensor([90.]), inference=tensor([449.9753], grad_fn=<UnbindBackward0>))\n"
+          ]
+        }
+      ],
+      "source": [
+        "torch_five_times_model_handler = PytorchModelHandlerTensor(\n",
+        "    state_dict_path=save_model_dir_multiply_five,\n",
+        "    model_class=LinearRegression,\n",
+        "    model_params={'input_dim': 1,\n",
+        "                  'output_dim': 1}\n",
+        "                  )\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "with pipeline as p:\n",
+        "      (\n",
+        "      p \n",
+        "      | \"ReadInputData\" >> beam.Create(value_to_predict)\n",
+        "      | \"ConvertNumpyToTensor\" >> beam.Map(torch.Tensor)\n",
+        "      | \"RunInferenceTorch\" >> RunInference(torch_five_times_model_handler)\n",
+        "      | beam.Map(print)\n",
+        "      )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "9d95e69b-203f-4abb-9abb-360bdf4d769a",
+      "metadata": {
+        "id": "9d95e69b-203f-4abb-9abb-360bdf4d769a"
+      },
+      "source": [
+        "# Pattern 2: Post-process RunInference results.\n",
+        "Add a `PredictionProcessor` to the pipeline after `RunInference`. `PredictionProcessor` processes the output of the `RunInference` transform."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "96f38a5a-4db0-4c39-8ce7-80d9f9911b48",
+      "metadata": {
+        "id": "96f38a5a-4db0-4c39-8ce7-80d9f9911b48",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "1bfa4cc6-ef01-4020-c739-df1efdc632c4"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "input is 20.0 output is 99.99430084228516\n",
+            "input is 40.0 output is 199.98887634277344\n",
+            "input is 60.0 output is 299.98345947265625\n",
+            "input is 90.0 output is 449.9753112792969\n"
+          ]
+        }
+      ],
+      "source": [
+        "class PredictionProcessor(beam.DoFn):\n",
+        "  \"\"\"\n",
+        "  A processor to format the output of the RunInference transform.\n",
+        "  \"\"\"\n",
+        "  def process(\n",
+        "      self,\n",
+        "      element: PredictionResult):\n",
+        "    input_value = element.example\n",
+        "    output_value = element.inference\n",
+        "    yield (f\"input is {input_value.item()} output is {output_value.item()}\")\n",
+        "\n",
+        "pipeline = beam.Pipeline()\n",
+        "\n",
+        "with pipeline as p:\n",
+        "    (\n",
+        "    p\n",
+        "    | \"ReadInputData\" >> beam.Create(value_to_predict)\n",
+        "    | \"ConvertNumpyToTensor\" >> beam.Map(torch.Tensor)\n",
+        "    | \"RunInferenceTorch\" >> RunInference(torch_five_times_model_handler)\n",
+        "    | \"PostProcessPredictions\" >> beam.ParDo(PredictionProcessor())\n",
+        "    | beam.Map(print)\n",
+        "    )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "2be80463-cf79-481c-9d6a-81e500f1707b",
+      "metadata": {
+        "id": "2be80463-cf79-481c-9d6a-81e500f1707b"
+      },
+      "source": [
+        "# Pattern 3: Attach a key"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "f22da313-5bf8-4334-865b-bbfafc374e63",
+      "metadata": {
+        "id": "f22da313-5bf8-4334-865b-bbfafc374e63"
+      },
+      "source": [
+        "## Step 1 - Create a source with attached key.\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "746b67a7-3562-467f-bea3-d8cd18c14927",
+      "metadata": {
+        "id": "746b67a7-3562-467f-bea3-d8cd18c14927"
+      },
+      "source": [
+        "## Step 2 - Modify model handler and post processor.\n",
+        "* Modify the pipeline to read from sources like CSV files and BigQuery.\n",
+        "\n",
+        "In this step we:\n",
+        "\n",
+        "* Wrap the `PytorchModelHandlerTensor` object around `KeyedModelHandler` to handle keyed data.\n",
+        "* Add a map transform, which converts a table row into `Tuple[str, float]`.\n",
+        "* Add a map transform which converts `Tuple[str, float]` from  to `Tuple[str, torch.Tensor]`.\n",
+        "* Modify the post inference processor to output results along with the key."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "90b263fc-97a5-43dc-8874-083d7e65e96d",
+      "metadata": {
+        "id": "90b263fc-97a5-43dc-8874-083d7e65e96d"
+      },
+      "outputs": [],
+      "source": [
+        "class PredictionWithKeyProcessor(beam.DoFn):\n",
+        "    def __init__(self):\n",
+        "        beam.DoFn.__init__(self)\n",
+        "\n",
+        "    def process(\n",
+        "          self,\n",
+        "          element: Tuple[str, PredictionResult]):\n",
+        "        key = element[0]\n",
+        "        input_value = element[1].example\n",
+        "        output_value = element[1].inference\n",
+        "        yield (f\"key: {key}, input: {input_value.item()} output: {output_value.item()}\" )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "c9b0fb49-d605-4f26-931a-57f42b0ad253",
+      "metadata": {
+        "id": "c9b0fb49-d605-4f26-931a-57f42b0ad253"
+      },
+      "source": [
+        "#### Use BigQuery as the source."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "45ce4330-7d33-4c53-8033-f4fa02383894",
+      "metadata": {
+        "id": "45ce4330-7d33-4c53-8033-f4fa02383894"
+      },
+      "source": [
+        "Install Google Cloud BigQuery API using `pip`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "4eb859dd-ba54-45a1-916b-5bbe4dc3f16e",
+      "metadata": {
+        "id": "4eb859dd-ba54-45a1-916b-5bbe4dc3f16e",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "c01594f4-443e-434a-b61a-a38beb00f1a9"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "\u001b[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.\n",
+            "pandas-gbq 0.13.3 requires google-cloud-bigquery[bqstorage,pandas]<2.0.0dev,>=1.11.1, but you have google-cloud-bigquery 3.3.0 which is incompatible.\u001b[0m\n"
+          ]
+        }
+      ],
+      "source": [
+        "%pip install --upgrade google-cloud-bigquery --quiet"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "6e869347-dd49-40be-b1e5-749699dc0d83",
+      "metadata": {
+        "id": "6e869347-dd49-40be-b1e5-749699dc0d83"
+      },
+      "source": [
+        "Create a table in the BigQuery using the snippet below, which has two columns: One holds the key and the second holds the test value. To use BiqQuery, a Google Cloud account with the BigQuery API enabled is required."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "!gcloud config set project $project"
+      ],
+      "metadata": {
+        "id": "7mgnryX-Zlfs",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "ebd3e9e3-9c30-4027-f571-5cd3c1951e18"
+      },
+      "id": "7mgnryX-Zlfs",
+      "execution_count": null,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "Updated property [core/project].\n"
+          ]
+        }
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "a6a984cd-2e92-4c44-821b-9bf1dd52fb7d",
+      "metadata": {
+        "id": "a6a984cd-2e92-4c44-821b-9bf1dd52fb7d",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "8e60b448-1384-4290-c164-cb43d876c350"
+      },
+      "outputs": [
+        {
+          "output_type": "execute_result",
+          "data": {
+            "text/plain": [
+              "<google.cloud.bigquery.table._EmptyRowIterator at 0x7f47d0556a50>"
+            ]
+          },
+          "metadata": {},
+          "execution_count": 86
+        }
+      ],
+      "source": [
+        "from google.cloud import bigquery\n",
+        "\n",
+        "client = bigquery.Client(project=project)\n",
+        "\n",
+        "# Make sure the dataset_id is unique in your project.\n",
+        "dataset_id = '{project}.maths'.format(project=project)\n",
+        "dataset = bigquery.Dataset(dataset_id)\n",
+        "\n",
+        "# Modify the location based on your project configuration.\n",
+        "dataset.location = 'US'\n",
+        "dataset = client.create_dataset(dataset, exists_ok=True)\n",
+        "\n",
+        "# Table name in the BigQuery dataset.\n",
+        "table_name = 'maths_problems_1'\n",
+        "\n",
+        "query = \"\"\"\n",
+        "    CREATE OR REPLACE TABLE\n",
+        "      {project}.maths.{table} ( key STRING OPTIONS(description=\"A unique key for the maths problem\"),\n",
+        "    value FLOAT64 OPTIONS(description=\"Our maths problem\" ) );\n",
+        "    INSERT INTO maths.{table}\n",
+        "    VALUES\n",
+        "      (\"first_question\", 105.00),\n",
+        "      (\"second_question\", 108.00),\n",
+        "      (\"third_question\", 1000.00),\n",
+        "      (\"fourth_question\", 1013.00)\n",
+        "\"\"\".format(project=project, table=table_name)\n",
+        "\n",
+        "create_job = client.query(query)\n",
+        "create_job.result()"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "479c9319-3295-4288-971c-dd0f0adfdd1e",
+      "metadata": {
+        "id": "479c9319-3295-4288-971c-dd0f0adfdd1e"
+      },
+      "source": [
+        "Use `BigQuery` as the source in the pipeline to read keyed data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "34331897-23f5-4850-8974-67e522e956dc",
+      "metadata": {
+        "id": "34331897-23f5-4850-8974-67e522e956dc",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "23092c12-3370-414c-ba67-37be569cd21c"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "key: first_question, input: 105.0 output: 524.9712524414062\n",
+            "key: second_question, input: 108.0 output: 539.970458984375\n",
+            "key: third_question, input: 1000.0 output: 4999.72802734375\n",
+            "key: fourth_question, input: 1013.0 output: 5064.724609375\n"
+          ]
+        }
+      ],
+      "source": [
+        "pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'{bucket}/tmp',\n",
+        "                                                      })\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)\n",
+        "\n",
+        "table_name = 'maths_problems_1'\n",
+        "table_spec = f'{project}:maths.{table_name}'\n",
+        "\n",
+        "with pipeline as p:\n",
+        "      (\n",
+        "      p\n",
+        "      | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec) \n",
+        "      | \"PreprocessData\" >> beam.Map(lambda x: (x['key'], x['value']))\n",
+        "      | \"ConvertNumpyToTensor\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
+        "      | \"RunInferenceTorch\" >> RunInference(keyed_torch_five_times_model_handler)\n",
+        "      | \"PostProcessPredictions\" >> beam.ParDo(PredictionWithKeyProcessor())\n",
+        "      | beam.Map(print)\n",
+        "      )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "53ee7f24-5625-475a-b8cc-9c031591f304",
+      "metadata": {
+        "id": "53ee7f24-5625-475a-b8cc-9c031591f304"
+      },
+      "source": [
+        "### Using CSV file as the source."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "06bc4396-ee2e-4228-8548-f953b5020c4e",
+      "metadata": {
+        "id": "06bc4396-ee2e-4228-8548-f953b5020c4e"
+      },
+      "source": [
+        "Create a CSV file with two columns: one named `key` that holds the keys, and a second named `value` that holds the test values."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# creates a csv file with the below values.\n",
+        "csv_values = [(\"first_question\", 105.00),\n",
+        "      (\"second_question\", 108.00),\n",
+        "      (\"third_question\", 1000.00),\n",
+        "      (\"fourth_question\", 1013.00)]\n",
+        "input_csv_file = \"./maths_problem.csv\"\n",
+        "\n",
+        "with open(input_csv_file, 'w') as f:\n",
+        "  writer = csv.writer(f)\n",
+        "  writer.writerow(['key', 'value'])\n",
+        "  for row in csv_values:\n",
+        "    writer.writerow(row)\n",
+        "\n",
+        "assert os.path.exists(input_csv_file) == True"
+      ],
+      "metadata": {
+        "id": "exAZjP7cYAFv"
+      },
+      "id": "exAZjP7cYAFv",
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "9a054c2d-4d84-4b37-b067-1dda5347e776",
+      "metadata": {
+        "id": "9a054c2d-4d84-4b37-b067-1dda5347e776",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "000c72cf-6a7f-45d8-9dec-dc9db6ce0662"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "key: first_question, input: 105.0 output: 524.9712524414062\n",
+            "key: second_question, input: 108.0 output: 539.970458984375\n",
+            "key: third_question, input: 1000.0 output: 4999.72802734375\n",
+            "key: fourth_question, input: 1013.0 output: 5064.724609375\n"
+          ]
+        }
+      ],
+      "source": [
+        "pipeline_options = PipelineOptions().from_dictionary({'temp_location':f'{bucket}/tmp',\n",
+        "                                                      })\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "keyed_torch_five_times_model_handler = KeyedModelHandler(torch_five_times_model_handler)\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  df = p | beam.dataframe.io.read_csv(input_csv_file)\n",
+        "  pc = to_pcollection(df)\n",
+        "  (pc\n",
+        "    | \"ConvertNumpyToTensor\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
+        "    | \"RunInferenceTorch\" >> RunInference(keyed_torch_five_times_model_handler)\n",
+        "    | \"PostProcessPredictions\" >> beam.ParDo(PredictionWithKeyProcessor())\n",
+        "    | beam.Map(print)\n",
+        "    )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "742abfbb-545e-435b-8833-2410ce29d22c",
+      "metadata": {
+        "id": "742abfbb-545e-435b-8833-2410ce29d22c"
+      },
+      "source": [
+        "# Pattern 4: Inference with multiple models in the same pipeline.\n",
+        "\n",
+        "## Inference with multiple models in parallel. "
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "570b2f27-3beb-4295-b926-595592289c06",
+      "metadata": {
+        "id": "570b2f27-3beb-4295-b926-595592289c06"
+      },
+      "source": [
+        "Create a torch model handler for the 10 times model using `PytorchModelHandlerTensor`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "73607c45-afe1-4990-9a55-e687ed40302e",
+      "metadata": {
+        "id": "73607c45-afe1-4990-9a55-e687ed40302e"
+      },
+      "outputs": [],
+      "source": [
+        "torch_ten_times_model_handler = PytorchModelHandlerTensor(state_dict_path=save_model_dir_multiply_ten,\n",
+        "                                        model_class=LinearRegression,\n",
+        "                                        model_params={'input_dim': 1,\n",
+        "                                                      'output_dim': 1}\n",
+        "                                        )\n",
+        "keyed_torch_ten_times_model_handler = KeyedModelHandler(torch_ten_times_model_handler)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "70ebed52-4ead-4cae-ac96-8cf206012ce1",
+      "metadata": {
+        "id": "70ebed52-4ead-4cae-ac96-8cf206012ce1"
+      },
+      "source": [
+        "In this, the same data is run through two different models: the one that we’ve been using to multiply by 5 \n",
+        "and a new model, which will learn to multiply by 10."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "629d070e-9902-42c9-a1e7-56c3d1864f13",
+      "metadata": {
+        "id": "629d070e-9902-42c9-a1e7-56c3d1864f13",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "f798bbc7-3f45-496f-b029-3cff5599bfaa"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "key: first_question * 5, input: 105.0 output: 1046.1859130859375\n",
+            "key: second_question * 5, input: 108.0 output: 1075.8590087890625\n",
+            "key: third_question * 5, input: 1000.0 output: 9898.654296875\n",
+            "key: fourth_question * 5, input: 1013.0 output: 10027.2373046875\n",
+            "key: first_question * 10, input: 105.0 output: 1046.1859130859375\n",
+            "key: second_question * 10, input: 108.0 output: 1075.8590087890625\n",
+            "key: third_question * 10, input: 1000.0 output: 9898.654296875\n",
+            "key: fourth_question * 10, input: 1013.0 output: 10027.2373046875\n"
+          ]
+        }
+      ],
+      "source": [
+        "pipeline_options = PipelineOptions().from_dictionary(\n",
+        "                                      {'temp_location':f'{bucket}/tmp'})\n",
+        "\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  multiply_five = (\n",
+        "      p \n",
+        "      |  read_from_bq\n",
+        "      | \"CreateMultiplyFiveTuple\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 5'), x['value']))\n",
+        "      | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
+        "      | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n",
+        "  )\n",
+        "  multiply_ten = (\n",
+        "      p \n",
+        "      | read_from_bq \n",
+        "      | \"CreateMultiplyTenTuple\" >> beam.Map(lambda x: ('{} {}'.format(x['key'], '* 10'), x['value']))\n",
+        "      | \"ConvertNumpyToTensorTenTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
+        "      | \"RunInferenceTorchTenTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n",
+        "  )\n",
+        "\n",
+        "  inference_result = ((multiply_five, multiply_ten) | beam.Flatten() \n",
+        "                                 | beam.ParDo(PredictionWithKeyProcessor()))\n",
+        "  inference_result | beam.Map(print)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "e71e6706-5d8d-4322-9def-ac7fb20d4a50",
+      "metadata": {
+        "id": "e71e6706-5d8d-4322-9def-ac7fb20d4a50"
+      },
+      "source": [
+        "## Inference with multiple models in sequence \n",
+        "\n",
+        "In a sequential pattern, data is sent to one or more models in sequence, \n",
+        "with the output from each model chaining to the next model.\n",
+        "\n",
+        "1. Read the data from BigQuery.\n",
+        "2. Map the data.\n",
+        "3. RunInference with multiply by 5 model.\n",
+        "4. Process the results.\n",
+        "5. RunInference with multiply by 10 model.\n",
+        "6. Process the results.\n"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "8db9d649-5549-4b58-a9ad-7b8592c2bcbf",
+      "metadata": {
+        "id": "8db9d649-5549-4b58-a9ad-7b8592c2bcbf",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "4f600937-4cb4-42dd-aa50-fa15538cc964"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "key: original input is `first_question tensor([105.])`, input: 524.9712524414062 output: 5200.13232421875\n",
+            "key: original input is `second_question tensor([108.])`, input: 539.970458984375 output: 5348.490234375\n",
+            "key: original input is `third_question tensor([1000.])`, input: 4999.72802734375 output: 49460.0703125\n",
+            "key: original input is `fourth_question tensor([1013.])`, input: 5064.724609375 output: 50102.953125\n"
+          ]
+        }
+      ],
+      "source": [
+        "def process_interim_inference(element):\n",
+        "    key, prediction_result = element\n",
+        "    input_value = prediction_result.example\n",
+        "    inference = prediction_result.inference\n",
+        "    formatted_input_value = 'original input is `{} {}`'.format(key, input_value)\n",
+        "    return formatted_input_value, inference\n",
+        "\n",
+        "\n",
+        "pipeline_options = PipelineOptions().from_dictionary(\n",
+        "                                      {'temp_location':f'{bucket}/tmp'})\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  multiply_five = (\n",
+        "      p \n",
+        "      | beam.io.ReadFromBigQuery(table=table_spec) \n",
+        "      | \"CreateMultiplyFiveTuple\" >> beam.Map(lambda x: (x['key'], x['value']))\n",
+        "      | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
+        "      | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_five_times_model_handler)\n",
+        "  )\n",
+        "\n",
+        "  inference_result = (\n",
+        "    multiply_five \n",
+        "      | \"ExtractResult\" >> beam.Map(process_interim_inference) \n",
+        "      | \"RunInferenceTorchTenTuple\" >> RunInference(keyed_torch_ten_times_model_handler)\n",
+        "      | beam.ParDo(PredictionWithKeyProcessor())\n",
+        "    )\n",
+        "  inference_result | beam.Map(print)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2",
+      "metadata": {
+        "id": "32c9ba40-9396-48f4-9e7f-a2acced98bb2"
+      },
+      "source": [
+        "# Sklearn implementation of RunInference API.\n",
+        "\n",
+        "Here, we showcase the Sklearn implementation of the RunInference API with the unkeyed data and the keyed data.\n",
+        "\n",
+        "Sklearn is a build-dependency of Apache Beam. If a different version of sklearn needs to be installed, use `%pip install scikit-learn==<version>`"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26",
+      "metadata": {
+        "id": "d6142b75-eef1-4e52-9fa4-fe02fe916b26"
+      },
+      "outputs": [],
+      "source": [
+        "import pickle\n",
+        "from sklearn import linear_model\n",
+        "\n",
+        "import numpy as np\n",
+        "from apache_beam.ml.inference.sklearn_inference import ModelFileType\n",
+        "from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "6695cd22-e0bf-438f-8223-4a93392e6616",
+      "metadata": {
+        "id": "6695cd22-e0bf-438f-8223-4a93392e6616"
+      },
+      "source": [
+        "## Create the data and the Sklearn model.\n",
+        "In this cell, we perform:\n",
+        "1. Create the data to train the Sklearn linear regression model.\n",
+        "2. Train the linear regression model.\n",
+        "3. Save the Sklearn model using `pickle`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "c57843e8-f696-4196-ad39-827e34849976",
+      "metadata": {
+        "id": "c57843e8-f696-4196-ad39-827e34849976"
+      },
+      "outputs": [],
+      "source": [
+        "# Input data to train the sklearn model.\n",
+        "x = numpy.arange(0, 100, dtype=numpy.float32).reshape(-1, 1)\n",
+        "y = (x * 5).reshape(-1, 1)\n",
+        "\n",
+        "regression = linear_model.LinearRegression()\n",
+        "regression.fit(x,y)\n",
+        "\n",
+        "sklearn_model_filename = 'sklearn_5x_model.pkl'\n",
+        "with open(sklearn_model_filename, 'wb') as f:\n",
+        "    pickle.dump(regression, f)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "69008a3d-3d15-4643-828c-b0419b347d01",
+      "metadata": {
+        "id": "69008a3d-3d15-4643-828c-b0419b347d01"
+      },
+      "source": [
+        "### Scikit-learn RunInference pipeline.\n",
+        "\n",
+        "1. Define the Sklearn model handler that accepts array_like object as input.\n",
+        "2. Read the data from BigQuery.\n",
+        "3. Use the Sklearn trained model and the Sklearn RunInference transform on unkeyed data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "50a648a3-794a-4286-ab2b-fc0458db04ca",
+      "metadata": {
+        "id": "50a648a3-794a-4286-ab2b-fc0458db04ca",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "b305d977-6549-4c01-a402-c4e14f3f2b04"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "PredictionResult(example=[105.0], inference=array([525.]))\n",
+            "PredictionResult(example=[108.0], inference=array([540.]))\n",
+            "PredictionResult(example=[1000.0], inference=array([5000.]))\n",
+            "PredictionResult(example=[1013.0], inference=array([5065.]))\n"
+          ]
+        }
+      ],
+      "source": [
+        "# SklearnModelHandlerNumpy accepts only unkeyed examples.\n",
+        "sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=sklearn_model_filename,\n",
+        "                                                 model_file_type=ModelFileType.PICKLE) # Use ModelFileType.JOBLIB if the model is seriazlized using joblib.\n",
+        "\n",
+        "\n",
+        "pipeline_options = PipelineOptions().from_dictionary(\n",
+        "                                      {'temp_location':f'{bucket}/tmp'})\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  (\n",
+        "      p \n",
+        "      | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n",
+        "      | \"ExtractInputs\" >> beam.Map(lambda x: [x['value']]) \n",
+        "      | \"RunInferenceSklearn\" >> RunInference(model_handler=sklearn_model_handler)\n",
+        "      | beam.Map(print)\n",
+        "  )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "33e901d6-ed06-4268-8a5f-685d31b5558f",
+      "metadata": {
+        "id": "33e901d6-ed06-4268-8a5f-685d31b5558f"
+      },
+      "source": [
+        "### Sklearn RunInference on keyed inputs.\n",
+        "1. Wrap the `SklearnModelHandlerNumpy` object around `KeyedModelHandler` to handle keyed data.\n",
+        "2. Read the data from BigQuery.\n",
+        "3. Use the Sklearn trained model and the Sklearn RunInference transform on a keyed data."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "c212916d-b517-4589-ad15-a3a1df926fb3",
+      "metadata": {
+        "id": "c212916d-b517-4589-ad15-a3a1df926fb3",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "1c3ccf35-3cd7-401e-de23-c0e22b5f6ebd"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "('first_question', PredictionResult(example=[105.0], inference=array([525.])))\n",
+            "('second_question', PredictionResult(example=[108.0], inference=array([540.])))\n",
+            "('third_question', PredictionResult(example=[1000.0], inference=array([5000.])))\n",
+            "('fourth_question', PredictionResult(example=[1013.0], inference=array([5065.])))\n"
+          ]
+        }
+      ],
+      "source": [
+        "sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=sklearn_model_filename,\n",
+        "                                                 model_file_type=ModelFileType.PICKLE) # Use ModelFileType.JOBLIB if the model is serialized using joblib.\n",
+        "\n",
+        "keyed_sklearn_model_handler = KeyedModelHandler(sklearn_model_handler)\n",
+        "\n",
+        "pipeline_options = PipelineOptions().from_dictionary(\n",
+        "                                      {'temp_location':f'{bucket}/tmp'})\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  (\n",
+        "  p \n",
+        "  | \"ReadFromBQ\" >> beam.io.ReadFromBigQuery(table=table_spec)\n",
+        "  | \"ExtractInputs\" >> beam.Map(lambda x: (x['key'], [x['value']])) \n",
+        "  | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n",
+        "  | beam.Map(print)\n",
+        "  )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "id": "f1481883-423b-4da0-8ae0-1a602b1807c6",
+      "metadata": {
+        "id": "f1481883-423b-4da0-8ae0-1a602b1807c6"
+      },
+      "source": [
+        "# Cross framework transforms in a single pipeline\n",
+        "\n",
+        "In this pipeline, RunInference transforms of different frameworks are used in a single pipeline sequentially. \n",
+        "\n",
+        "In the below cells, we perform the following actions:\n",
+        "1. Create `KeyedModelHandler` for Sklearn and Pytorch. \n",
+        "2. Run inference on Sklearn and perform intermediate processing using `process_interim_inference`.\n",
+        "3. Take the intermediate result from Sklearn RunInference transform and run that through Pytorch RunInference transform."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "a45d496c-4d7b-4173-b27b-253c5767bb8d",
+      "metadata": {
+        "id": "a45d496c-4d7b-4173-b27b-253c5767bb8d"
+      },
+      "outputs": [],
+      "source": [
+        "def process_interim_inference(element: Tuple[str, PredictionResult]):\n",
+        "    \"\"\"\n",
+        "    Returns the key and the prediction to the next RunInference transform.\n",
+        "    \"\"\"\n",
+        "    key, prediction_result = element\n",
+        "    prediction = prediction_result.inference\n",
+        "    return key, prediction\n",
+        "\n",
+        "class PredictionProcessor(beam.DoFn):\n",
+        "    def process(self, element: Tuple[str, PredictionResult]):\n",
+        "        key, prediction_result = element\n",
+        "        input_from_upstream = prediction_result.example\n",
+        "        prediction = prediction_result.inference\n",
+        "        yield (key, prediction.item())"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "id": "ada71e7d-cf29-4441-a921-310c05fa8576",
+      "metadata": {
+        "id": "ada71e7d-cf29-4441-a921-310c05fa8576",
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "outputId": "78eb9a0d-ace2-4c02-8970-13488dc2767c"
+      },
+      "outputs": [
+        {
+          "output_type": "stream",
+          "name": "stdout",
+          "text": [
+            "('first_question', 2624.857421875)\n",
+            "('second_question', 2699.853271484375)\n",
+            "('third_question', 24998.642578125)\n",
+            "('fourth_question', 25323.625)\n"
+          ]
+        }
+      ],
+      "source": [
+        "pipeline_options = PipelineOptions().from_dictionary(\n",
+        "                                      {'temp_location':f'{bucket}/tmp'})\n",
+        "pipeline = beam.Pipeline(options=pipeline_options)\n",
+        "\n",
+        "read_from_bq = beam.io.ReadFromBigQuery(table=table_spec)\n",
+        "keyed_inputs = \"ExtractKeyedInputs\" >> beam.Map(lambda x: (x['key'], [x['value']]))\n",
+        "\n",
+        "keyed_sklearn_model_handler = KeyedModelHandler(SklearnModelHandlerNumpy(\n",
+        "    model_uri=sklearn_model_filename,\n",
+        "    model_file_type=ModelFileType.PICKLE))\n",
+        "\n",
+        "keyed_torch_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(\n",
+        "    state_dict_path=save_model_dir_multiply_ten,\n",
+        "    model_class=LinearRegression,\n",
+        "    model_params={'input_dim': 1,\n",
+        "                  'output_dim': 1}))\n",
+        "\n",
+        "with pipeline as p:\n",
+        "  sklearn_inference_result = (\n",
+        "      p\n",
+        "      | read_from_bq\n",
+        "      | keyed_inputs\n",
+        "      | \"RunInferenceSklearn\" >> RunInference(model_handler=keyed_sklearn_model_handler)\n",
+        "      | \"ExtractOutputs\" >> beam.Map(process_interim_inference)\n",
+        "  )\n",
+        "\n",
+        "  torch_inference_result = (\n",
+        "      sklearn_inference_result\n",
+        "      | \"ConvertNumpyToTensorFiveTuple\" >> beam.Map(lambda x: (x[0], torch.Tensor([x[1]])))\n",
+        "      | \"RunInferenceTorchFiveTuple\" >> RunInference(keyed_torch_five_times_model_handler)\n",
+        "      | \"ProcessPredictions\" >> beam.ParDo(PredictionProcessor())\n",
+        "      | beam.Map(print)\n",
+        "  )\n"
+      ]
+    }
+  ]
+}