You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2022/03/17 03:58:25 UTC

[airflow] branch main updated: Add recipe for BeamRunGoPipelineOperator (#22296)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a1503b  Add recipe for BeamRunGoPipelineOperator (#22296)
4a1503b is described below

commit 4a1503b39b0aaf50940c29ac886c6eeda35a79ff
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Thu Mar 17 04:57:22 2022 +0100

    Add recipe for BeamRunGoPipelineOperator (#22296)
---
 airflow/providers/apache/beam/hooks/beam.py        | 10 +++++-
 .../docker-images-recipes/go-beam.Dockerfile       | 37 ++++++++++++++++++++++
 docs/docker-stack/recipes.rst                      | 20 ++++++++++++
 tests/providers/apache/beam/hooks/test_beam.py     | 21 +++++++++++-
 4 files changed, 86 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/apache/beam/hooks/beam.py b/airflow/providers/apache/beam/hooks/beam.py
index 9be1a75..0644e02 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -20,12 +20,13 @@ import json
 import os
 import select
 import shlex
+import shutil
 import subprocess
 import textwrap
 from tempfile import TemporaryDirectory
 from typing import Callable, List, Optional
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.hooks.base import BaseHook
 from airflow.providers.google.go_module_utils import init_module, install_dependencies
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -307,6 +308,13 @@ class BeamHook(BaseHook):
             source with GCSHook.
         :return:
         """
+        if shutil.which("go") is None:
+            raise AirflowConfigException(
+                "You need to have Go installed to run beam go pipeline. See https://go.dev/doc/install "
+                "installation guide. If you are running airflow in Docker see more info at "
+                "'https://airflow.apache.org/docs/docker-stack/recipes.html'."
+            )
+
         if "labels" in variables:
             variables["labels"] = json.dumps(variables["labels"], separators=(",", ":"))
 
diff --git a/docs/docker-stack/docker-images-recipes/go-beam.Dockerfile b/docs/docker-stack/docker-images-recipes/go-beam.Dockerfile
new file mode 100644
index 0000000..b224fe1
--- /dev/null
+++ b/docs/docker-stack/docker-images-recipes/go-beam.Dockerfile
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+ARG BASE_AIRFLOW_IMAGE
+FROM ${BASE_AIRFLOW_IMAGE}
+
+SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"]
+
+USER 0
+
+ARG GO_VERSION=1.16.4
+ENV GO_INSTALL_DIR=/usr/local/go
+
+# Install Go
+RUN if [[ "$(uname -a)" = *"x86_64"* ]] ; then export ARCH=amd64 ; else export ARCH=arm64 ; fi \
+    && DOWNLOAD_URL="https://dl.google.com/go/go${GO_VERSION}.linux-${ARCH}.tar.gz" \
+    && TMP_DIR="$(mktemp -d)" \
+    && curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/go.linux-${ARCH}.tar.gz" \
+    && mkdir -p "${GO_INSTALL_DIR}" \
+    && tar xzf "${TMP_DIR}/go.linux-${ARCH}.tar.gz" -C "${GO_INSTALL_DIR}" --strip-components=1 \
+    && rm -rf "${TMP_DIR}"
+
+ENV GOROOT=/usr/local/go
+ENV PATH="$GOROOT/bin:$PATH"
+
+USER ${AIRFLOW_UID}
diff --git a/docs/docker-stack/recipes.rst b/docs/docker-stack/recipes.rst
index a1c5777..1d258ab 100644
--- a/docs/docker-stack/recipes.rst
+++ b/docs/docker-stack/recipes.rst
@@ -70,3 +70,23 @@ Then build a new image.
     --pull \
     --build-arg BASE_AIRFLOW_IMAGE="apache/airflow:2.0.2" \
     --tag my-airflow-image:0.0.1
+
+Apache Beam Go Stack installation
+---------------------------------
+
+To be able to run Beam Go Pipeline with the :class:`~airflow.providers.apache.beam.operators.beam.BeamRunGoPipelineOperator`,
+you will need Go in your container. Install airflow with ``apache-airflow-providers-google>=6.5.0`` and ``apache-airflow-providers-apache-beam>=3.2.0``
+
+Create a new Dockerfile like the one shown below.
+
+.. exampleinclude:: /docker-images-recipes/go-beam.Dockerfile
+    :language: dockerfile
+
+Then build a new image.
+
+.. code-block:: bash
+
+  docker build . \
+    --pull \
+    --build-arg BASE_AIRFLOW_IMAGE="apache/airflow:2.0.2" \
+    --tag my-airflow-image:0.0.1
diff --git a/tests/providers/apache/beam/hooks/test_beam.py b/tests/providers/apache/beam/hooks/test_beam.py
index 859cf3b..a69560f 100644
--- a/tests/providers/apache/beam/hooks/test_beam.py
+++ b/tests/providers/apache/beam/hooks/test_beam.py
@@ -233,8 +233,10 @@ class TestBeamHook(unittest.TestCase):
         )
         wait_for_done.assert_called_once_with()
 
+    @mock.patch(BEAM_STRING.format('shutil.which'))
     @mock.patch(BEAM_STRING.format('BeamCommandRunner'))
-    def test_start_go_pipeline(self, mock_runner):
+    def test_start_go_pipeline(self, mock_runner, mock_which):
+        mock_which.return_value = "/some_path/to/go"
         hook = BeamHook(runner=DEFAULT_RUNNER)
         wait_for_done = mock_runner.return_value.wait_for_done
         process_line_callback = MagicMock()
@@ -260,6 +262,23 @@ class TestBeamHook(unittest.TestCase):
         )
         wait_for_done.assert_called_once_with()
 
+    @mock.patch(BEAM_STRING.format('shutil.which'))
+    def test_start_go_pipeline_without_go_installed_raises(self, mock_which):
+        mock_which.return_value = None
+        hook = BeamHook(runner=DEFAULT_RUNNER)
+
+        with self.assertRaises(AirflowException) as ex_ctx:
+            hook.start_go_pipeline(
+                go_file=GO_FILE,
+                variables=copy.deepcopy(BEAM_VARIABLES_GO),
+            )
+
+        assert (
+            "You need to have Go installed to run beam go pipeline. See https://go.dev/doc/install "
+            "installation guide. If you are running airflow in Docker see more info at "
+            "'https://airflow.apache.org/docs/docker-stack/recipes.html'." == str(ex_ctx.exception)
+        )
+
 
 class TestBeamRunner(unittest.TestCase):
     @mock.patch('airflow.providers.apache.beam.hooks.beam.BeamCommandRunner.log')