You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Juta (via GitHub)" <gi...@apache.org> on 2023/02/09 16:08:06 UTC

[GitHub] [beam] Juta commented on a diff in pull request #25381: TFMA notebook showing ExtractEvaluateAndWriteResult Tranfsorm

Juta commented on code in PR #25381:
URL: https://github.com/apache/beam/pull/25381#discussion_r1101701712


##########
examples/notebooks/beam-ml/tfma_beam.ipynb:
##########
@@ -0,0 +1,669 @@
+{
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# TensorFlow Model Analysis in Beam\n",
+        "[TensorFlow Model Analysis (TFMA)](https://www.tensorflow.org/tfx/guide/tfma) is a library for performing model evaluation across different slices of data. TFMA performs its computations in a distributed manner over large amounts of data using Apache Beam.\n",
+        "\n",
+        "This example colab notebook illustrates how TFMA can be used to investigate and visualize the performance of a model as part of your Beam pipeline. This allows for scalable and flexible execution of your evaluation pipeline. For this we will use [**ExtractEvaluateAndWriteResults**](https://www.tensorflow.org/tfx/model_analysis/api_docs/python/tfma/ExtractEvaluateAndWriteResults), which is a PTransform for performing extraction, evaluation, and writing results all in one step.\n",
+        "\n",
+        "For additional information on TFMA, you can refer to the [TFMA basic notebook](https://www.tensorflow.org/tfx/tutorials/model_analysis/tfma_basic) which provides a more in-depth look at its capabilities"
+      ],
+      "metadata": {
+        "id": "1m9dEIsQAP_-"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Install Jupyter extensions\n",
+        "Note: If running in a local Jupyter notebook, then these Jupyter extensions must be installed in the environment before running Jupyter.\n",
+        "\n",
+        "```bash\n",
+        "jupyter nbextension enable --py widgetsnbextension --sys-prefix \n",
+        "jupyter nbextension install --py --symlink tensorflow_model_analysis --sys-prefix \n",
+        "jupyter nbextension enable --py tensorflow_model_analysis --sys-prefix \n",
+        "```"
+      ],
+      "metadata": {
+        "id": "GKcUZKcTRhW_"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "##  Install TensorFlow Model Analysis (TFMA)\n",
+        "\n",
+        "This will pull in all the dependencies, and will take a minute."
+      ],
+      "metadata": {
+        "id": "-01Hts8eR9OV"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "8aLJh4pFasK0"
+      },
+      "outputs": [],
+      "source": [
+        "# Upgrade pip to the latest, and install TFMA.\n",
+        "!pip install -U pip\n",
+        "!pip install tensorflow-model-analysis\n",
+        "\n",
+        "# To use the newly installed version, restart the runtime.\n",
+        "exit() "
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "SUTczmH2dWk2"
+      },
+      "outputs": [],
+      "source": [
+        "# This setup was tested with TF 2.11, TFMA 0.43 and Beam 2.44 (using colab),\n",
+        "# but it should also work with the latest release.\n",
+        "import sys\n",
+        "\n",
+        "# Confirm that we're using Python 3\n",
+        "assert sys.version_info.major==3, 'This notebook must be run using Python 3.'\n",
+        "\n",
+        "import tensorflow as tf\n",
+        "print('TF version: {}'.format(tf.__version__))\n",
+        "import apache_beam as beam\n",
+        "print('Beam version: {}'.format(beam.__version__))\n",
+        "import tensorflow_model_analysis as tfma\n",
+        "print('TFMA version: {}'.format(tfma.__version__))\n",
+        "import tensorflow_datasets as tfds\n",
+        "print('TFDS version: {}'.format(tfds.__version__))"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**NOTE: The output above should be clear of errors before proceeding. Re-run the install and restart your kernel if you are still seeing errors.**"
+      ],
+      "metadata": {
+        "id": "KP7V1pIU_9H2"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "SHL89whuLqmq"
+      },
+      "source": [
+        "# Data preprocessing"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "6yapf7rN_lB7"
+      },
+      "source": [
+        "## Diamonds price prediction\n",
+        "\n",
+        "We will be using the [TFDS diamonds dataset](https://www.tensorflow.org/datasets/catalog/diamonds) to train a linear regression model that will predict the price of a diamond. This dataset contains various physical attributes such as the weight of the diamond (carat), the cut quality, color, clarity and the price of 53940 diamonds. The model's performance will be evaluated using metrics such as mean squared error and mean absolute error.\n"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "In order to simulate a scenario where a model's performance improves over time as new data is added to the dataset, we will first train a model called v1 using half of the diamonds dataset. Later on, we will train a second model called v2 using additional data. This will enable us to demonstrate the use of TFMA when comparing the performance of the two models for the same task."
+      ],
+      "metadata": {
+        "id": "y9a9rv3_YYhE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "hQCHORIG8Ixv"
+      },
+      "outputs": [],
+      "source": [
+        "# Load the data from TFDS and create a train, test and validation dataset by splitting the dataset into parts\n",
+        "(ds_train_v1, ds_test, ds_val), info = tfds.load('diamonds', split=['train[:40%]', 'train[80%:90%]', 'train[90%:]'], as_supervised=True, with_info=True)"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "import numpy as np\n",
+        "\n",
+        "# Load in the numerical training data to use for normalization\n",
+        "def extract_numerical_features(item):\n",
+        "  carat = item['carat']\n",
+        "  depth = item['depth']\n",
+        "  table = item['table']\n",
+        "  x = item['x']\n",
+        "  y = item['y']\n",
+        "  z = item['z']\n",
+        "  \n",
+        "  return [carat, depth, table, x, y, z]\n",
+        "\n",
+        "def get_train_data(ds_train):\n",
+        "  train_data = []\n",
+        "  for item, label in ds_train:\n",
+        "    features = extract_numerical_features(item)\n",
+        "    train_data.append(features)\n",
+        "\n",
+        "  train_data = np.array(train_data)\n",
+        "\n",
+        "  return train_data"
+      ],
+      "metadata": {
+        "id": "_sNqOzwNGo6V"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "train_data_v1 = get_train_data(ds_train_v1)"
+      ],
+      "metadata": {
+        "id": "PVVUZ1LOwQAf"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define the features length\n",
+        "NUMERICAL_FEATURES = 6\n",
+        "NUM_FEATURES = (NUMERICAL_FEATURES +\n",
+        "                info.features['features']['color'].num_classes +\n",
+        "                info.features['features']['cut'].num_classes +\n",
+        "                info.features['features']['clarity'].num_classes)"
+      ],
+      "metadata": {
+        "id": "__6xLw92aI9a"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "ZJt8Wu2LmX9j"
+      },
+      "outputs": [],
+      "source": [
+        "# Transform the input data into a feature vector and label by selecting the input and output for the model\n",
+        "def transform_data(item, label):\n",
+        "  numerical_features = extract_numerical_features(item)\n",
+        "\n",
+        "  # Categorical features will be encoded using one-hot encoding\n",
+        "  color = tf.one_hot(item['color'], info.features['features']['color'].num_classes)\n",
+        "  cut = tf.one_hot(item['cut'], info.features['features']['cut'].num_classes)\n",
+        "  clarity = tf.one_hot(item['clarity'], info.features['features']['clarity'].num_classes)\n",
+        "  \n",
+        "  # Create output tensor\n",
+        "  output = tf.concat([tf.stack(numerical_features, axis=0), color, cut, clarity], 0)\n",
+        "  return output, [label]"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "f3ZxhNZDsbkx"
+      },
+      "outputs": [],
+      "source": [
+        "ds_train_v1 = ds_train_v1.map(transform_data)\n",
+        "ds_test = ds_test.map(transform_data)\n",
+        "ds_val = ds_val.map(transform_data)"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Prepare the data for training by structuring it in batches\n",
+        "BATCH_SIZE = 32\n",
+        "ds_train_v1 = ds_train_v1.batch(BATCH_SIZE)\n",
+        "ds_test = ds_test.batch(BATCH_SIZE)"
+      ],
+      "metadata": {
+        "id": "sw3udkicwVZE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TFRecords creation\n",
+        "\n",
+        "TFMA and Beam need to read the dataset used during evaluation from a file. We will create a TFRecords file that contains our validation dataset."
+      ],
+      "metadata": {
+        "id": "KFd6NwPAacSM"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "ipHOHiRqMJOi"
+      },
+      "outputs": [],
+      "source": [
+        "!mkdir data"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "TkeG4uw1K9Dt"
+      },
+      "outputs": [],
+      "source": [
+        "# Write the validation record to a file (used by TFMA)\n",
+        "tfrecord_file = 'data/val_data.tfrecord'\n",
+        "\n",
+        "with tf.io.TFRecordWriter(tfrecord_file) as file_writer:\n",
+        "  for x, y in ds_val:\n",
+        "    record_bytes = tf.train.Example(features=tf.train.Features(feature={\n",
+        "        \"inputs\": tf.train.Feature(float_list=tf.train.FloatList(value=x)),\n",
+        "        \"output\": tf.train.Feature(float_list=tf.train.FloatList(value=[y])),\n",
+        "    })).SerializeToString()\n",
+        "    file_writer.write(record_bytes)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "zhunMzSOLmdG"
+      },
+      "source": [
+        "# Model definition and training"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Now let's train a linear regression model that will predict the price of a diamond. The model we will train is a neural network with one hidden layer. We also will use one normalisation layer to scale all the numerical features between 0 and 1."
+      ],
+      "metadata": {
+        "id": "Svsic-PSbGu7"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "so7l2WDzd2kg"
+      },
+      "outputs": [],
+      "source": [
+        "def construct_model(model_name, train_data):\n",
+        "  inputs = tf.keras.Input(shape=(NUM_FEATURES,), name='inputs')\n",
+        "\n",
+        "  # Normalize numerical features\n",
+        "  normalization_layer = tf.keras.layers.Normalization()\n",
+        "  # Fit normalization layer on training data\n",
+        "  normalization_layer.adapt(train_data)\n",
+        "  # Split input between numerical and categorical input\n",
+        "  input_numerical = tf.gather(inputs, indices=[*range(NUMERICAL_FEATURES)], axis=1)\n",
+        "  input_normalized = normalization_layer(input_numerical)\n",
+        "  input_one_hot = tf.gather(inputs, indices=[*range(NUMERICAL_FEATURES, NUM_FEATURES)], axis=1)\n",
+        "  # Define one hidden layer with 8 neurons\n",
+        "  x = tf.keras.layers.Dense(8, activation='relu')(tf.concat([input_normalized, input_one_hot], 1))\n",
+        "  outputs = tf.keras.layers.Dense(1, name='output')(x)\n",
+        "  model = tf.keras.Model(inputs=inputs, outputs=outputs, name=model_name)\n",
+        "\n",
+        "  model.compile(\n",
+        "    optimizer=tf.keras.optimizers.Adam(learning_rate=0.1),\n",
+        "    loss='mean_absolute_error')\n",
+        "  \n",
+        "  return model"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "buc0NIc4oRv4"
+      },
+      "outputs": [],
+      "source": [
+        "model_v1 = construct_model('model_v1', train_data_v1)"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "8bvna8ZJAKj5"
+      },
+      "outputs": [],
+      "source": [
+        "# Train the model\n",
+        "history = model_v1.fit(\n",
+        "    ds_train_v1,\n",
+        "    validation_data=ds_test,\n",
+        "    epochs=5,\n",
+        "    verbose=1)"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "pyCfKRY9DUhO"
+      },
+      "outputs": [],
+      "source": [
+        "# Save the model to disk\n",
+        "model_path_v1 = 'saved_model_v1'\n",
+        "model_v1.save(model_path_v1)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "kwUycnjpLvTX"
+      },
+      "source": [
+        "# Evaluation"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Now that we have trained a model, we can use TFMA to analyze the performance. The first thing we need to do is define our evaluation config. For our use case, we will use the most common metrics used for a linear regression model: MAE & MSE. See [TFMA metrics and plots](https://www.tensorflow.org/tfx/model_analysis/metrics) for more information about the supported evaluation parameters. "
+      ],
+      "metadata": {
+        "id": "7gmbEx6wG2Za"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "b2NN9zVr-AE1"
+      },
+      "outputs": [],
+      "source": [
+        "from google.protobuf import text_format\n",
+        "\n",
+        "# Define TFMA evaluation config\n",
+        "eval_config = text_format.Parse(\"\"\"\n",
+        "  ## Model information\n",
+        "  model_specs {\n",
+        "    # For keras (and serving models) we need to add a `label_key`.\n",
+        "    label_key: \"output\"\n",
+        "  }\n",
+        "\n",
+        "  ## Post training metric information. These will be merged with any built-in\n",
+        "  ## metrics from training.\n",
+        "  metrics_specs {\n",
+        "    metrics { class_name: \"ExampleCount\" }\n",
+        "    metrics { class_name: \"MeanAbsoluteError\" }\n",
+        "    metrics { class_name: \"MeanSquaredError\" }\n",
+        "    metrics { class_name: \"MeanPrediction\" }\n",
+        "  }\n",
+        "\n",
+        "  slicing_specs {}\n",
+        "\"\"\", tfma.EvalConfig())"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "We will now use [ExtractEvaluateAndWriteResults](https://www.tensorflow.org/tfx/model_analysis/api_docs/python/tfma/ExtractEvaluateAndWriteResults), which is a PTransform for performing extraction, evaluation, and writing results. This PTransform can directly be used in our Beam pipeline if we combine it with reading in our TFRecords via [TFXIO](https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio)"
+      ],
+      "metadata": {
+        "id": "8JVSVK4iH-d2"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "4NDEQAMDhJWL"
+      },
+      "outputs": [],
+      "source": [
+        "from tfx_bsl.public import tfxio\n",
+        "\n",
+        "output_path = 'evaluation_results'\n",
+        "\n",
+        "eval_shared_model = tfma.default_eval_shared_model(\n",
+        "    eval_saved_model_path=model_path_v1, eval_config=eval_config)\n",
+        "\n",
+        "tfx_io = tfxio.TFExampleRecord(\n",
+        "          file_pattern=tfrecord_file,\n",
+        "          raw_record_column_name=tfma.ARROW_INPUT_COLUMN)\n",
+        "\n",
+        "# Run Evaluation\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "    _ = (\n",
+        "        pipeline\n",
+        "        | 'ReadData' >> tfx_io.BeamSource()\n",
+        "        | 'EvalModel' >> tfma.ExtractEvaluateAndWriteResults(\n",
+        "           eval_shared_model=eval_shared_model,\n",
+        "           eval_config=eval_config,\n",
+        "           output_path=output_path))"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "He97LPB_NbtU"
+      },
+      "outputs": [],
+      "source": [
+        "# Visualize results\n",
+        "result = tfma.load_eval_result(output_path=output_path)\n",
+        "tfma.view.render_slicing_metrics(result)"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Comparing multiple models"
+      ],
+      "metadata": {
+        "id": "I7ZuqLcdwpCs"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "An interesting and common use case it to compare the performance of multiple models to select the best candidate to put into production. We can also use Beam to evaluate and compare multiple models in one step."
+      ],
+      "metadata": {
+        "id": "ebkJsE3zJ2W2"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Model v2"
+      ],
+      "metadata": {
+        "id": "ah1xX9jl44O8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "In order to showcase this use case, we will now train a second model on the full dataset."
+      ],
+      "metadata": {
+        "id": "6GcGg08YKRhg"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Preprocess data\n",
+        "ds_train_v2 = tfds.load('diamonds', split=['train[:80%]'], as_supervised=True)[0]\n",
+        "train_data_v2 = get_train_data(ds_train_v2)\n",
+        "ds_train_v2 = ds_train_v2.map(transform_data)\n",
+        "ds_train_v2 = ds_train_v2.batch(BATCH_SIZE)"
+      ],
+      "metadata": {
+        "id": "cQYA0cdzwoXr"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define and train model\n",
+        "model_v2 = construct_model('model_v2', train_data_v2)\n",
+        "history = model_v2.fit(\n",
+        "    ds_train_v2,\n",
+        "    validation_data=ds_test,\n",
+        "    epochs=5,\n",
+        "    verbose=1)"
+      ],
+      "metadata": {
+        "id": "WII_PC5rxzTc"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Save model to file\n",
+        "model_path_v2 = 'saved_model_v2'\n",
+        "model_v2.save(model_path_v2)"
+      ],
+      "metadata": {
+        "id": "ppVFHy7myhXu"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Evaluation"
+      ],
+      "metadata": {
+        "id": "QKO-vh1X48tv"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define TFMA evaluation config, including two model specs for the two models we want to compare\n",
+        "eval_config_compare = text_format.Parse(\"\"\"\n",
+        "  ## Model information\n",
+        "  model_specs {\n",
+        "    name: \"model_v1\"\n",
+        "    # For keras (and serving models) we need to add a `label_key`.\n",
+        "    label_key: \"output\"\n",
+        "    is_baseline: true\n",
+        "  }\n",
+        "  model_specs {\n",
+        "    name: \"model_v2\"\n",
+        "    # For keras (and serving models) we need to add a `label_key`.\n",
+        "    label_key: \"output\"\n",
+        "  }\n",
+        "\n",
+        "  ## Post training metric information. These will be merged with any built-in\n",
+        "  ## metrics from training.\n",
+        "  metrics_specs {\n",
+        "    metrics { class_name: \"ExampleCount\" }\n",
+        "    metrics { class_name: \"MeanAbsoluteError\" }\n",
+        "    metrics { class_name: \"MeanSquaredError\" }\n",
+        "    metrics { class_name: \"MeanPrediction\" }\n",
+        "  }\n",
+        "\n",
+        "  slicing_specs {}\n",
+        "\"\"\", tfma.EvalConfig())"
+      ],
+      "metadata": {
+        "id": "pB4aXUo45RAg"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "from tfx_bsl.public import tfxio\n",
+        "\n",
+        "output_path_compare = 'evaluation_results_compare'\n",
+        "\n",
+        "eval_shared_models = [\n",
+        "  tfma.default_eval_shared_model(\n",
+        "      model_name='model_v1',\n",
+        "      eval_saved_model_path=model_path_v1,\n",
+        "      eval_config=eval_config_compare),\n",
+        "  tfma.default_eval_shared_model(\n",
+        "      model_name='model_v2',\n",
+        "      eval_saved_model_path=model_path_v2,\n",
+        "      eval_config=eval_config_compare),\n",
+        "]\n",
+        "\n",
+        "tfx_io = tfxio.TFExampleRecord(\n",
+        "          file_pattern=tfrecord_file,\n",
+        "          raw_record_column_name=tfma.ARROW_INPUT_COLUMN)\n",
+        "\n",
+        "# Run Evaluation\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "    _ = (\n",
+        "        pipeline\n",
+        "        | 'ReadData' >> tfx_io.BeamSource()\n",
+        "        | 'EvalModel' >> tfma.ExtractEvaluateAndWriteResults(\n",
+        "           eval_shared_model=eval_shared_models,\n",
+        "           eval_config=eval_config_compare,\n",
+        "           output_path=output_path_compare))"
+      ],
+      "metadata": {
+        "id": "4eVdpW0aWTah"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Visualize results\n",
+        "results = tfma.load_eval_results(output_paths=output_path_compare)\n",
+        "tfma.view.render_time_series(results)"

Review Comment:
   I first committed with the output of the two visualisations. But they did not render on Github so I removed them. I can try again if you want but I don't know if/ how can can solve this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org