You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/06/24 15:22:28 UTC

[beam] branch master updated: Removes examples of unscalable sinks from documentation. (#22020)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 10dab960d96 Removes examples of unscalable sinks from documentation. (#22020)
10dab960d96 is described below

commit 10dab960d9695266fbbbeb040a378550fb440be6
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri Jun 24 08:22:14 2022 -0700

    Removes examples of unscalable sinks from documentation. (#22020)
    
    These sinks would only work on small data, and do not properly
    handle failures.  Users should instead be encouraged to use the
    builtin IOs or write their own following best practices documented
    in the IO guides.
    
    Incorporated some of the relevant information into the IO page.
---
 .../tour-of-beam/reading-and-writing-data.ipynb    | 266 ---------------------
 .../en/documentation/io/developing-io-overview.md  |  10 +-
 2 files changed, 9 insertions(+), 267 deletions(-)

diff --git a/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb b/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
index 3d0333e6ce6..2fb541236f3 100644
--- a/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
+++ b/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
@@ -726,272 +726,6 @@
         "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
       ]
     },
-    {
-      "cell_type": "markdown",
-      "metadata": {
-        "id": "FpM368NEhc-q"
-      },
-      "source": [
-        "## Creating an output transform\n",
-        "\n",
-        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
-        "\n",
-        "Instead, most data services are optimized to write _batches_ of elements at a time. Batch writes only connects to the service once, and can load many elements at a time.\n",
-        "\n",
-        "Here, we discuss two common ways of batching elements for optimized writes: _fixed-sized batches_, and\n",
-        "_[windows](https://beam.apache.org/documentation/programming-guide/#windowing)\n",
-        "of elements_."
-      ]
-    },
-    {
-      "cell_type": "markdown",
-      "metadata": {
-        "id": "5gypFFh4hM48"
-      },
-      "source": [
-        "## Writing fixed-sized batches\n",
-        "\n",
-        "If the order of the elements _is not_ important, we can simply create fixed-sized batches and write those independently.\n",
-        "\n",
-        "We can use\n",
-        "[`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches)\n",
-        "to get fixed-sized batches. Note that it expects `(key, value)` pairs. Since `GroupIntoBatches` is an _aggregation_, all the elements in a batch _must_ fit into memory for each worker.\n",
-        "\n",
-        "> ℹ️ `GroupIntoBatches` requires a `(key, value)` pair. For simplicity, this example uses a placeholder `None` key and discards it later. Depending on your data, there might be a key that makes more sense. Using a _balanced_ key, where each key contains around the same number of elements, may help parallelize the batching process.\n",
-        "\n",
-        "Let's create something similar to `WriteToText` but keep it simple with a unique identifier in the file name instead of the file count.\n",
-        "\n",
-        "To write a file using the Beam `filesystems` module, we need to use [`create`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.create), which writes `bytes` into the file.\n",
-        "\n",
-        "> ℹ️ To read a file instead, use the [`open`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.open)\n",
-        "> function instead.\n",
-        "\n",
-        "For the output type hint, we can use [`PDone`](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/pvalue.html) to indicate this is the last transform in a given pipeline."
-      ]
-    },
-    {
-      "cell_type": "code",
-      "metadata": {
-        "id": "LcRHXwyT8Rrj"
-      },
-      "source": [
-        "import apache_beam as beam\n",
-        "from apache_beam.io.filesystems import FileSystems as beam_fs\n",
-        "from apache_beam.options.pipeline_options import PipelineOptions\n",
-        "import os\n",
-        "import uuid\n",
-        "from typing import Iterable\n",
-        "\n",
-        "@beam.ptransform_fn\n",
-        "@beam.typehints.with_input_types(str)\n",
-        "@beam.typehints.with_output_types(beam.pvalue.PDone)\n",
-        "def WriteBatchesToFiles(\n",
-        "    pcollection: beam.PCollection[str],\n",
-        "    file_name_prefix: str,\n",
-        "    file_name_suffix: str = '.txt',\n",
-        "    batch_size: int = 100,\n",
-        ") -> beam.pvalue.PDone:\n",
-        "  def expand_pattern(pattern):\n",
-        "    for match_result in beam_fs.match([pattern])[0].metadata_list:\n",
-        "      yield match_result.path\n",
-        "\n",
-        "  def write_file(lines: Iterable[str]):\n",
-        "    file_name = f\"{file_name_prefix}-{uuid.uuid4().hex}{file_name_suffix}\"\n",
-        "    with beam_fs.create(file_name) as f:\n",
-        "      for line in lines:\n",
-        "        f.write(f\"{line}\\n\".encode('utf-8'))\n",
-        "\n",
-        "  # Remove existing files matching the output file_name pattern.\n",
-        "  for path in expand_pattern(f\"{file_name_prefix}*{file_name_suffix}\"):\n",
-        "    os.remove(path)\n",
-        "  return (\n",
-        "      pcollection\n",
-        "      # For simplicity we key with `None` and discard it.\n",
-        "      | 'Key with None' >> beam.WithKeys(lambda _: None)\n",
-        "      | 'Group into batches' >> beam.GroupIntoBatches(batch_size)\n",
-        "      | 'Discard key' >> beam.Values()\n",
-        "      | 'Write file' >> beam.Map(write_file)\n",
-        "  )\n",
-        "\n",
-        "output_file_name_prefix = 'outputs/batch'\n",
-        "options = PipelineOptions(flags=[], type_check_additional='all')\n",
-        "with beam.Pipeline(options=options) as pipeline:\n",
-        "  (\n",
-        "      pipeline\n",
-        "      | 'Create file lines' >> beam.Create([\n",
-        "          'Each element must be a string.',\n",
-        "          'It writes one element per line.',\n",
-        "          'There are no guarantees on the line order.',\n",
-        "          'The data might be written into multiple files.',\n",
-        "      ])\n",
-        "      | 'Write batches to files' >> WriteBatchesToFiles(\n",
-        "          file_name_prefix=output_file_name_prefix,\n",
-        "          file_name_suffix='.txt',\n",
-        "          batch_size=3,\n",
-        "      )\n",
-        "  )"
-      ],
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "code",
-      "metadata": {
-        "colab": {
-          "base_uri": "https://localhost:8080/"
-        },
-        "id": "CUklk4JtEbft",
-        "outputId": "adddbd9f-e66d-4def-ba59-1eafccdbe793"
-      },
-      "source": [
-        "# Lets look at the output files and contents.\n",
-        "!head outputs/batch*.txt"
-      ],
-      "execution_count": null,
-      "outputs": [
-        {
-          "output_type": "stream",
-          "text": [
-            "==> outputs/batch-30d399fb3f24430db193e8130f439cb0.txt <==\n",
-            "Each element must be a string.\n",
-            "It writes one element per line.\n",
-            "There are no guarantees on the line order.\n",
-            "\n",
-            "==> outputs/batch-ab16a5c2018e4c32b01a5acaa2671fd0.txt <==\n",
-            "The data might be written into multiple files.\n"
-          ],
-          "name": "stdout"
-        }
-      ]
-    },
-    {
-      "cell_type": "markdown",
-      "metadata": {
-        "id": "hbmPT317hP5K"
-      },
-      "source": [
-        "## Writing windows of elements\n",
-        "\n",
-        "If the order of the elements _is_ important, we could batch the elements by windows. This could be useful in _streaming_ pipelines, where we have an indefinite number of incoming elements and we would like to write windows as they are being processed.\n",
-        "\n",
-        "> ℹ️ For more information about windows and triggers, check the [Windowing](https://beam.apache.org/documentation/programming-guide/#windowing) page.\n",
-        "\n",
-        "We use a\n",
-        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-2-pardo-with-timestamp-and-window-information)\n",
-        "to extract the window start time and end time.\n",
-        "We use this for the file names of the output files."
-      ]
-    },
-    {
-      "cell_type": "code",
-      "metadata": {
-        "id": "v_qK300FG9js"
-      },
-      "source": [
-        "import apache_beam as beam\n",
-        "from apache_beam.io.filesystems import FileSystems as beam_fs\n",
-        "from apache_beam.options.pipeline_options import PipelineOptions\n",
-        "from datetime import datetime\n",
-        "import time\n",
-        "from typing import Any, Dict\n",
-        "\n",
-        "def unix_time(time_str: str) -> int:\n",
-        "  return time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))\n",
-        "\n",
-        "class WithWindowInfo(beam.DoFn):\n",
-        "  def process(self, element: Any, window=beam.DoFn.WindowParam) -> Iterable[Dict[str, Any]]:\n",
-        "    yield {\n",
-        "        'element': element,\n",
-        "        'window_start': window.start.to_utc_datetime(),\n",
-        "        'window_end': window.end.to_utc_datetime(),\n",
-        "    }\n",
-        "\n",
-        "@beam.ptransform_fn\n",
-        "@beam.typehints.with_input_types(str)\n",
-        "@beam.typehints.with_output_types(beam.pvalue.PDone)\n",
-        "def WriteWindowsToFiles(\n",
-        "    pcollection: beam.PCollection[str],\n",
-        "    file_name_prefix: str,\n",
-        "    file_name_suffix: str = '.txt',\n",
-        ") -> beam.pvalue.PDone:\n",
-        "  def write_file(batch: Dict[str, Any]):\n",
-        "    start_date = batch['window_start'].date()\n",
-        "    start_time = batch['window_start'].time()\n",
-        "    end_time = batch['window_end'].time()\n",
-        "    file_name = f\"{file_name_prefix}-{start_date}-{start_time}-{end_time}{file_name_suffix}\"\n",
-        "    with beam_fs.create(file_name) as f:\n",
-        "      for x in batch['element']:\n",
-        "        f.write(f\"{x}\\n\".encode('utf-8'))\n",
-        "\n",
-        "  return (\n",
-        "      pcollection\n",
-        "      | 'Group all per window' >> beam.GroupBy(lambda _: None)\n",
-        "      | 'Discard key' >> beam.Values()\n",
-        "      | 'Get window info' >> beam.ParDo(WithWindowInfo())\n",
-        "      | 'Write files' >> beam.Map(write_file)\n",
-        "  )\n",
-        "\n",
-        "output_file_name_prefix = 'outputs/window'\n",
-        "window_size_sec = 5 * 60  # 5 minutes\n",
-        "options = PipelineOptions(flags=[], type_check_additional='all')\n",
-        "with beam.Pipeline(options=options) as pipeline:\n",
-        "  (\n",
-        "      pipeline\n",
-        "      | 'Create elements' >> beam.Create([\n",
-        "          {'timestamp': unix_time('2020-03-19 08:49:00'), 'event': 'login'},\n",
-        "          {'timestamp': unix_time('2020-03-19 08:49:20'), 'event': 'view_account'},\n",
-        "          {'timestamp': unix_time('2020-03-19 08:50:00'), 'event': 'view_orders'},\n",
-        "          {'timestamp': unix_time('2020-03-19 08:51:00'), 'event': 'track_order'},\n",
-        "          {'timestamp': unix_time('2020-03-19 09:00:00'), 'event': 'logout'},\n",
-        "      ])\n",
-        "      | 'With timestamps' >> beam.Map(\n",
-        "          lambda x: beam.window.TimestampedValue(x, x['timestamp']))\n",
-        "      | 'Fixed-sized windows' >> beam.WindowInto(\n",
-        "            beam.window.FixedWindows(window_size_sec))\n",
-        "      | 'To string' >> beam.Map(\n",
-        "          lambda x: f\"{datetime.fromtimestamp(x['timestamp'])}: {x['event']}\")\n",
-        "      | 'Write windows to files' >> WriteWindowsToFiles(\n",
-        "          file_name_prefix=output_file_name_prefix,\n",
-        "          file_name_suffix='.txt',\n",
-        "      )\n",
-        "  )"
-      ],
-      "execution_count": null,
-      "outputs": []
-    },
-    {
-      "cell_type": "code",
-      "metadata": {
-        "colab": {
-          "base_uri": "https://localhost:8080/"
-        },
-        "id": "4QXKKVawTJ2_",
-        "outputId": "96a84b29-3fd2-46f4-b21b-d3f07daa928b"
-      },
-      "source": [
-        "# Lets look at the output files and contents.\n",
-        "!head outputs/window*.txt"
-      ],
-      "execution_count": null,
-      "outputs": [
-        {
-          "output_type": "stream",
-          "text": [
-            "==> outputs/window-2020-03-19-08:45:00-08:50:00.txt <==\n",
-            "2020-03-19 08:49:00: login\n",
-            "2020-03-19 08:49:20: view_account\n",
-            "\n",
-            "==> outputs/window-2020-03-19-08:50:00-08:55:00.txt <==\n",
-            "2020-03-19 08:50:00: view_orders\n",
-            "2020-03-19 08:51:00: track_order\n",
-            "\n",
-            "==> outputs/window-2020-03-19-09:00:00-09:05:00.txt <==\n",
-            "2020-03-19 09:00:00: logout\n"
-          ],
-          "name": "stdout"
-        }
-      ]
-    },
     {
       "cell_type": "markdown",
       "metadata": {
diff --git a/website/www/site/content/en/documentation/io/developing-io-overview.md b/website/www/site/content/en/documentation/io/developing-io-overview.md
index c8e0482aa9d..7a01b460ba5 100644
--- a/website/www/site/content/en/documentation/io/developing-io-overview.md
+++ b/website/www/site/content/en/documentation/io/developing-io-overview.md
@@ -180,9 +180,17 @@ To create a Beam sink, we recommend that you use a `ParDo` that writes the
 received records to the data store. To develop more complex sinks (for example,
 to support data de-duplication when failures are retried by a runner), use
 `ParDo`, `GroupByKey`, and other available Beam transforms.
+Many data services are optimized to write batches of elements at a time,
+so it may make sense to group the elements into batches before writing.
+Persistant connectons can be initialized in a DoFn's `setUp` or `startBundle`
+method rather than upon the receipt of every element as well.
+It should also be noted that in a large-scale, distributed system work can
+[fail and/or be retried](/documentation/runtime/model/), so it is preferable to
+make the external interactions idempotent when possible.
 
 For **file-based sinks**, you can use the `FileBasedSink` abstraction that is
-provided by both the Java and Python SDKs. See our language specific
+provided by both the Java and Python SDKs. Beam's `FileSystems` utility classes
+can also be useful for reading and writing files. See our language specific
 implementation guides for more details: