You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/05 15:47:02 UTC

[airflow] branch main updated: AIP-47 - Migrate beam DAGs to new design #22439 (#24211)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 34e0ab9b23 AIP-47 - Migrate beam DAGs to new design #22439 (#24211)
34e0ab9b23 is described below

commit 34e0ab9b23e0dcf416169777240b97f4de08f772
Author: chethanuk-plutoflume <ch...@tessian.com>
AuthorDate: Sun Jun 5 16:46:56 2022 +0100

    AIP-47 - Migrate beam DAGs to new design #22439 (#24211)
    
    * AIP-47 - Migrate beam DAGs to new design #22439
---
 .../apache/beam/example_dags/example_beam.py       | 437 ---------------------
 .../apache-airflow-providers-apache-beam/index.rst |   2 +-
 .../operators.rst                                  |  20 +-
 .../apache/beam/operators/test_beam_system.py      |   2 +-
 .../system/providers/apache/beam}/__init__.py      |   0
 tests/system/providers/apache/beam/example_beam.py |  66 ++++
 .../apache/beam/example_beam_java_flink.py         |  65 +++
 .../apache/beam/example_beam_java_spark.py         |  65 +++
 tests/system/providers/apache/beam/example_go.py   | 108 +++++
 .../providers/apache/beam/example_go_dataflow.py   |  80 ++++
 .../providers/apache/beam/example_java_dataflow.py |  70 ++++
 .../system/providers/apache/beam/example_python.py | 124 ++++++
 .../apache/beam/example_python_dataflow.py         |  83 ++++
 tests/system/providers/apache/beam/utils.py        |  75 ++++
 14 files changed, 748 insertions(+), 449 deletions(-)

diff --git a/airflow/providers/apache/beam/example_dags/example_beam.py b/airflow/providers/apache/beam/example_dags/example_beam.py
deleted file mode 100644
index ea52458303..0000000000
--- a/airflow/providers/apache/beam/example_dags/example_beam.py
+++ /dev/null
@@ -1,437 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG for Apache Beam operators
-"""
-import os
-from datetime import datetime
-from urllib.parse import urlparse
-
-from airflow import models
-from airflow.providers.apache.beam.operators.beam import (
-    BeamRunGoPipelineOperator,
-    BeamRunJavaPipelineOperator,
-    BeamRunPythonPipelineOperator,
-)
-from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
-from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
-from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
-from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
-GCS_INPUT = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/shakespeare/kinglear.txt')
-GCS_TMP = os.environ.get('APACHE_BEAM_GCS_TMP', 'gs://INVALID BUCKET NAME/temp/')
-GCS_STAGING = os.environ.get('APACHE_BEAM_GCS_STAGING', 'gs://INVALID BUCKET NAME/staging/')
-GCS_OUTPUT = os.environ.get('APACHE_BEAM_GCS_OUTPUT', 'gs://INVALID BUCKET NAME/output')
-GCS_PYTHON = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/wordcount_debugging.py')
-GCS_PYTHON_DATAFLOW_ASYNC = os.environ.get(
-    'APACHE_BEAM_PYTHON_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.py'
-)
-GCS_GO = os.environ.get('APACHE_BEAM_GO', 'gs://INVALID BUCKET NAME/wordcount_debugging.go')
-GCS_GO_DATAFLOW_ASYNC = os.environ.get(
-    'APACHE_BEAM_GO_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.go'
-)
-GCS_JAR_DIRECT_RUNNER = os.environ.get(
-    'APACHE_BEAM_DIRECT_RUNNER_JAR',
-    'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-DirectRunner.jar',
-)
-GCS_JAR_DATAFLOW_RUNNER = os.environ.get(
-    'APACHE_BEAM_DATAFLOW_RUNNER_JAR', 'gs://INVALID BUCKET NAME/word-count-beam-bundled-0.1.jar'
-)
-GCS_JAR_SPARK_RUNNER = os.environ.get(
-    'APACHE_BEAM_SPARK_RUNNER_JAR',
-    'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-SparkRunner.jar',
-)
-GCS_JAR_FLINK_RUNNER = os.environ.get(
-    'APACHE_BEAM_FLINK_RUNNER_JAR',
-    'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-FlinkRunner.jar',
-)
-
-GCS_JAR_DIRECT_RUNNER_PARTS = urlparse(GCS_JAR_DIRECT_RUNNER)
-GCS_JAR_DIRECT_RUNNER_BUCKET_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.netloc
-GCS_JAR_DIRECT_RUNNER_OBJECT_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.path[1:]
-GCS_JAR_DATAFLOW_RUNNER_PARTS = urlparse(GCS_JAR_DATAFLOW_RUNNER)
-GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.netloc
-GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.path[1:]
-GCS_JAR_SPARK_RUNNER_PARTS = urlparse(GCS_JAR_SPARK_RUNNER)
-GCS_JAR_SPARK_RUNNER_BUCKET_NAME = GCS_JAR_SPARK_RUNNER_PARTS.netloc
-GCS_JAR_SPARK_RUNNER_OBJECT_NAME = GCS_JAR_SPARK_RUNNER_PARTS.path[1:]
-GCS_JAR_FLINK_RUNNER_PARTS = urlparse(GCS_JAR_FLINK_RUNNER)
-GCS_JAR_FLINK_RUNNER_BUCKET_NAME = GCS_JAR_FLINK_RUNNER_PARTS.netloc
-GCS_JAR_FLINK_RUNNER_OBJECT_NAME = GCS_JAR_FLINK_RUNNER_PARTS.path[1:]
-
-
-DEFAULT_ARGS = {
-    'default_pipeline_options': {'output': '/tmp/example_beam'},
-    'trigger_rule': TriggerRule.ALL_DONE,
-}
-START_DATE = datetime(2021, 1, 1)
-
-
-with models.DAG(
-    "example_beam_native_java_direct_runner",
-    schedule_interval=None,  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=['example'],
-) as dag_native_java_direct_runner:
-
-    # [START howto_operator_start_java_direct_runner_pipeline]
-    jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
-        task_id="jar_to_local_direct_runner",
-        bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
-        object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
-        filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
-    )
-
-    start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
-        task_id="start_java_pipeline_direct_runner",
-        jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
-        pipeline_options={
-            'output': '/tmp/start_java_pipeline_direct_runner',
-            'inputFile': GCS_INPUT,
-        },
-        job_class='org.apache.beam.examples.WordCount',
-    )
-
-    jar_to_local_direct_runner >> start_java_pipeline_direct_runner
-    # [END howto_operator_start_java_direct_runner_pipeline]
-
-with models.DAG(
-    "example_beam_native_java_dataflow_runner",
-    schedule_interval=None,  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=['example'],
-) as dag_native_java_dataflow_runner:
-    # [START howto_operator_start_java_dataflow_runner_pipeline]
-    jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
-        task_id="jar_to_local_dataflow_runner",
-        bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
-        object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
-        filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
-    )
-
-    start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
-        task_id="start_java_pipeline_dataflow",
-        runner="DataflowRunner",
-        jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
-        pipeline_options={
-            'tempLocation': GCS_TMP,
-            'stagingLocation': GCS_STAGING,
-            'output': GCS_OUTPUT,
-        },
-        job_class='org.apache.beam.examples.WordCount',
-        dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
-    )
-
-    jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
-    # [END howto_operator_start_java_dataflow_runner_pipeline]
-
-with models.DAG(
-    "example_beam_native_java_spark_runner",
-    schedule_interval=None,  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=['example'],
-) as dag_native_java_spark_runner:
-
-    jar_to_local_spark_runner = GCSToLocalFilesystemOperator(
-        task_id="jar_to_local_spark_runner",
-        bucket=GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
-        object_name=GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
-        filename="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
-    )
-
-    start_java_pipeline_spark_runner = BeamRunJavaPipelineOperator(
-        task_id="start_java_pipeline_spark_runner",
-        runner="SparkRunner",
-        jar="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
-        pipeline_options={
-            'output': '/tmp/start_java_pipeline_spark_runner',
-            'inputFile': GCS_INPUT,
-        },
-        job_class='org.apache.beam.examples.WordCount',
-    )
-
-    jar_to_local_spark_runner >> start_java_pipeline_spark_runner
-
-with models.DAG(
-    "example_beam_native_java_flink_runner",
-    schedule_interval=None,  # Override to match your needs
-    start_date=START_DATE,
-    catchup=False,
-    tags=['example'],
-) as dag_native_java_flink_runner:
-
-    jar_to_local_flink_runner = GCSToLocalFilesystemOperator(
-        task_id="jar_to_local_flink_runner",
-        bucket=GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
-        object_name=GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
-        filename="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
-    )
-
-    start_java_pipeline_flink_runner = BeamRunJavaPipelineOperator(
-        task_id="start_java_pipeline_flink_runner",
-        runner="FlinkRunner",
-        jar="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
-        pipeline_options={
-            'output': '/tmp/start_java_pipeline_flink_runner',
-            'inputFile': GCS_INPUT,
-        },
-        job_class='org.apache.beam.examples.WordCount',
-    )
-
-    jar_to_local_flink_runner >> start_java_pipeline_flink_runner
-
-
-with models.DAG(
-    "example_beam_native_python",
-    start_date=START_DATE,
-    schedule_interval=None,  # Override to match your needs
-    catchup=False,
-    default_args=DEFAULT_ARGS,
-    tags=['example'],
-) as dag_native_python:
-
-    # [START howto_operator_start_python_direct_runner_pipeline_local_file]
-    start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
-        task_id="start_python_pipeline_local_direct_runner",
-        py_file='apache_beam.examples.wordcount',
-        py_options=['-m'],
-        py_requirements=['apache-beam[gcp]==2.26.0'],
-        py_interpreter='python3',
-        py_system_site_packages=False,
-    )
-    # [END howto_operator_start_python_direct_runner_pipeline_local_file]
-
-    # [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
-    start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
-        task_id="start_python_pipeline_direct_runner",
-        py_file=GCS_PYTHON,
-        py_options=[],
-        pipeline_options={"output": GCS_OUTPUT},
-        py_requirements=['apache-beam[gcp]==2.26.0'],
-        py_interpreter='python3',
-        py_system_site_packages=False,
-    )
-    # [END howto_operator_start_python_direct_runner_pipeline_gcs_file]
-
-    # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
-    start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
-        task_id="start_python_pipeline_dataflow_runner",
-        runner="DataflowRunner",
-        py_file=GCS_PYTHON,
-        pipeline_options={
-            'tempLocation': GCS_TMP,
-            'stagingLocation': GCS_STAGING,
-            'output': GCS_OUTPUT,
-        },
-        py_options=[],
-        py_requirements=['apache-beam[gcp]==2.26.0'],
-        py_interpreter='python3',
-        py_system_site_packages=False,
-        dataflow_config=DataflowConfiguration(
-            job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
-        ),
-    )
-    # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
-
-    start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator(
-        task_id="start_python_pipeline_local_spark_runner",
-        py_file='apache_beam.examples.wordcount',
-        runner="SparkRunner",
-        py_options=['-m'],
-        py_requirements=['apache-beam[gcp]==2.26.0'],
-        py_interpreter='python3',
-        py_system_site_packages=False,
-    )
-
-    start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator(
-        task_id="start_python_pipeline_local_flink_runner",
-        py_file='apache_beam.examples.wordcount',
-        runner="FlinkRunner",
-        py_options=['-m'],
-        pipeline_options={
-            'output': '/tmp/start_python_pipeline_local_flink_runner',
-        },
-        py_requirements=['apache-beam[gcp]==2.26.0'],
-        py_interpreter='python3',
-        py_system_site_packages=False,
-    )
-
-    (
-        [
-            start_python_pipeline_local_direct_runner,
-            start_python_pipeline_direct_runner,
-        ]
-        >> start_python_pipeline_local_flink_runner
-        >> start_python_pipeline_local_spark_runner
-    )
-
-
-with models.DAG(
-    "example_beam_native_python_dataflow_async",
-    default_args=DEFAULT_ARGS,
-    start_date=START_DATE,
-    schedule_interval=None,  # Override to match your needs
-    catchup=False,
-    tags=['example'],
-) as dag_native_python_dataflow_async:
-    # [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
-    start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
-        task_id="start_python_job_dataflow_runner_async",
-        runner="DataflowRunner",
-        py_file=GCS_PYTHON_DATAFLOW_ASYNC,
-        pipeline_options={
-            'tempLocation': GCS_TMP,
-            'stagingLocation': GCS_STAGING,
-            'output': GCS_OUTPUT,
-        },
-        py_options=[],
-        py_requirements=['apache-beam[gcp]==2.26.0'],
-        py_interpreter='python3',
-        py_system_site_packages=False,
-        dataflow_config=DataflowConfiguration(
-            job_name='{{task.task_id}}',
-            project_id=GCP_PROJECT_ID,
-            location="us-central1",
-            wait_until_finished=False,
-        ),
-    )
-
-    wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
-        task_id="wait-for-python-job-async-done",
-        job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
-        expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
-        project_id=GCP_PROJECT_ID,
-        location='us-central1',
-    )
-
-    start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
-    # [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
-
-
-with models.DAG(
-    "example_beam_native_go",
-    start_date=START_DATE,
-    schedule_interval="@once",
-    catchup=False,
-    default_args=DEFAULT_ARGS,
-    tags=['example'],
-) as dag_native_go:
-
-    # [START howto_operator_start_go_direct_runner_pipeline_local_file]
-    start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
-        task_id="start_go_pipeline_local_direct_runner",
-        go_file='files/apache_beam/examples/wordcount.go',
-    )
-    # [END howto_operator_start_go_direct_runner_pipeline_local_file]
-
-    # [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
-    start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
-        task_id="start_go_pipeline_direct_runner",
-        go_file=GCS_GO,
-        pipeline_options={"output": GCS_OUTPUT},
-    )
-    # [END howto_operator_start_go_direct_runner_pipeline_gcs_file]
-
-    # [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
-    start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
-        task_id="start_go_pipeline_dataflow_runner",
-        runner="DataflowRunner",
-        go_file=GCS_GO,
-        pipeline_options={
-            'tempLocation': GCS_TMP,
-            'stagingLocation': GCS_STAGING,
-            'output': GCS_OUTPUT,
-            'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
-        },
-        dataflow_config=DataflowConfiguration(
-            job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
-        ),
-    )
-    # [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
-
-    start_go_pipeline_local_spark_runner = BeamRunGoPipelineOperator(
-        task_id="start_go_pipeline_local_spark_runner",
-        go_file='/files/apache_beam/examples/wordcount.go',
-        runner="SparkRunner",
-        pipeline_options={
-            'endpoint': '/your/spark/endpoint',
-        },
-    )
-
-    start_go_pipeline_local_flink_runner = BeamRunGoPipelineOperator(
-        task_id="start_go_pipeline_local_flink_runner",
-        go_file='/files/apache_beam/examples/wordcount.go',
-        runner="FlinkRunner",
-        pipeline_options={
-            'output': '/tmp/start_go_pipeline_local_flink_runner',
-        },
-    )
-
-    (
-        [
-            start_go_pipeline_local_direct_runner,
-            start_go_pipeline_direct_runner,
-        ]
-        >> start_go_pipeline_local_flink_runner
-        >> start_go_pipeline_local_spark_runner
-    )
-
-
-with models.DAG(
-    "example_beam_native_go_dataflow_async",
-    default_args=DEFAULT_ARGS,
-    start_date=START_DATE,
-    schedule_interval="@once",
-    catchup=False,
-    tags=['example'],
-) as dag_native_go_dataflow_async:
-    # [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
-    start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
-        task_id="start_go_job_dataflow_runner_async",
-        runner="DataflowRunner",
-        go_file=GCS_GO_DATAFLOW_ASYNC,
-        pipeline_options={
-            'tempLocation': GCS_TMP,
-            'stagingLocation': GCS_STAGING,
-            'output': GCS_OUTPUT,
-            'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
-        },
-        dataflow_config=DataflowConfiguration(
-            job_name='{{task.task_id}}',
-            project_id=GCP_PROJECT_ID,
-            location="us-central1",
-            wait_until_finished=False,
-        ),
-    )
-
-    wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
-        task_id="wait-for-go-job-async-done",
-        job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
-        expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
-        project_id=GCP_PROJECT_ID,
-        location='us-central1',
-    )
-
-    start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
-    # [END howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
diff --git a/docs/apache-airflow-providers-apache-beam/index.rst b/docs/apache-airflow-providers-apache-beam/index.rst
index d3573e9309..740e7c8909 100644
--- a/docs/apache-airflow-providers-apache-beam/index.rst
+++ b/docs/apache-airflow-providers-apache-beam/index.rst
@@ -27,7 +27,7 @@ Content
 
     Python API <_api/airflow/providers/apache/beam/index>
     PyPI Repository <https://pypi.org/project/apache-airflow-providers-apache-beam/>
-    Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/apache/beam/example_dags>
+    Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/apache/beam>
 
 .. toctree::
     :maxdepth: 1
diff --git a/docs/apache-airflow-providers-apache-beam/operators.rst b/docs/apache-airflow-providers-apache-beam/operators.rst
index bb18191124..2468c917f3 100644
--- a/docs/apache-airflow-providers-apache-beam/operators.rst
+++ b/docs/apache-airflow-providers-apache-beam/operators.rst
@@ -49,13 +49,13 @@ recommend avoiding unless the Dataflow job requires it.
 Python Pipelines with DirectRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_python_direct_runner_pipeline_local_file]
     :end-before: [END howto_operator_start_python_direct_runner_pipeline_local_file]
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
@@ -64,13 +64,13 @@ Python Pipelines with DirectRunner
 Python Pipelines with DataflowRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
     :end-before: [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python_dataflow.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
@@ -92,7 +92,7 @@ has the ability to download or available on the local filesystem (provide the ab
 Java Pipelines with DirectRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_beam.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_java_direct_runner_pipeline]
@@ -101,7 +101,7 @@ Java Pipelines with DirectRunner
 Java Pipelines with DataflowRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_java_dataflow.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_java_dataflow_runner_pipeline]
@@ -125,13 +125,13 @@ init the module and install dependencies with ``go run init example.com/main`` a
 Go Pipelines with DirectRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_go_direct_runner_pipeline_local_file]
     :end-before: [END howto_operator_start_go_direct_runner_pipeline_local_file]
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
@@ -140,13 +140,13 @@ Go Pipelines with DirectRunner
 Go Pipelines with DataflowRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
     :end-before: [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
 
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go_dataflow.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
diff --git a/tests/providers/apache/beam/operators/test_beam_system.py b/tests/providers/apache/beam/operators/test_beam_system.py
index 4defe58c2d..a589169d5d 100644
--- a/tests/providers/apache/beam/operators/test_beam_system.py
+++ b/tests/providers/apache/beam/operators/test_beam_system.py
@@ -23,7 +23,7 @@ import pytest
 from tests.test_utils import AIRFLOW_MAIN_FOLDER
 from tests.test_utils.system_tests_class import SystemTest
 
-BEAM_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "airflow", "providers", "apache", "beam", "example_dags")
+BEAM_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "tests", "system", "providers", "apache", "beam")
 
 
 @pytest.mark.system("apache.beam")
diff --git a/airflow/providers/apache/beam/example_dags/__init__.py b/tests/system/providers/apache/beam/__init__.py
similarity index 100%
rename from airflow/providers/apache/beam/example_dags/__init__.py
rename to tests/system/providers/apache/beam/__init__.py
diff --git a/tests/system/providers/apache/beam/example_beam.py b/tests/system/providers/apache/beam/example_beam.py
new file mode 100644
index 0000000000..94d34f1140
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_beam.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+    GCS_INPUT,
+    GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
+    GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_java_direct_runner",
+    schedule_interval=None,  # Override to match your needs
+    start_date=START_DATE,
+    catchup=False,
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_start_java_direct_runner_pipeline]
+    jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
+        task_id="jar_to_local_direct_runner",
+        bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
+        object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
+        filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
+    )
+
+    start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
+        task_id="start_java_pipeline_direct_runner",
+        jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
+        pipeline_options={
+            'output': '/tmp/start_java_pipeline_direct_runner',
+            'inputFile': GCS_INPUT,
+        },
+        job_class='org.apache.beam.examples.WordCount',
+    )
+
+    jar_to_local_direct_runner >> start_java_pipeline_direct_runner
+    # [END howto_operator_start_java_direct_runner_pipeline]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_beam_java_flink.py b/tests/system/providers/apache/beam/example_beam_java_flink.py
new file mode 100644
index 0000000000..a82331e6b5
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_beam_java_flink.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+    GCS_INPUT,
+    GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
+    GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_java_flink_runner",
+    schedule_interval=None,  # Override to match your needs
+    start_date=START_DATE,
+    catchup=False,
+    tags=['example'],
+) as dag:
+
+    jar_to_local_flink_runner = GCSToLocalFilesystemOperator(
+        task_id="jar_to_local_flink_runner",
+        bucket=GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
+        object_name=GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
+        filename="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
+    )
+
+    start_java_pipeline_flink_runner = BeamRunJavaPipelineOperator(
+        task_id="start_java_pipeline_flink_runner",
+        runner="FlinkRunner",
+        jar="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
+        pipeline_options={
+            'output': '/tmp/start_java_pipeline_flink_runner',
+            'inputFile': GCS_INPUT,
+        },
+        job_class='org.apache.beam.examples.WordCount',
+    )
+
+    jar_to_local_flink_runner >> start_java_pipeline_flink_runner
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_beam_java_spark.py b/tests/system/providers/apache/beam/example_beam_java_spark.py
new file mode 100644
index 0000000000..ca602b30c4
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_beam_java_spark.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+    GCS_INPUT,
+    GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
+    GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_java_spark_runner",
+    schedule_interval=None,  # Override to match your needs
+    start_date=START_DATE,
+    catchup=False,
+    tags=['example'],
+) as dag:
+
+    jar_to_local_spark_runner = GCSToLocalFilesystemOperator(
+        task_id="jar_to_local_spark_runner",
+        bucket=GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
+        object_name=GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
+        filename="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
+    )
+
+    start_java_pipeline_spark_runner = BeamRunJavaPipelineOperator(
+        task_id="start_java_pipeline_spark_runner",
+        runner="SparkRunner",
+        jar="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
+        pipeline_options={
+            'output': '/tmp/start_java_pipeline_spark_runner',
+            'inputFile': GCS_INPUT,
+        },
+        job_class='org.apache.beam.examples.WordCount',
+    )
+
+    jar_to_local_spark_runner >> start_java_pipeline_spark_runner
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_go.py b/tests/system/providers/apache/beam/example_go.py
new file mode 100644
index 0000000000..768136560a
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_go.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from tests.system.providers.apache.beam.utils import (
+    DEFAULT_ARGS,
+    GCP_PROJECT_ID,
+    GCS_GO,
+    GCS_OUTPUT,
+    GCS_STAGING,
+    GCS_TMP,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_go",
+    start_date=START_DATE,
+    schedule_interval="@once",
+    catchup=False,
+    default_args=DEFAULT_ARGS,
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_start_go_direct_runner_pipeline_local_file]
+    start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
+        task_id="start_go_pipeline_local_direct_runner",
+        go_file='files/apache_beam/examples/wordcount.go',
+    )
+    # [END howto_operator_start_go_direct_runner_pipeline_local_file]
+
+    # [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
+    start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
+        task_id="start_go_pipeline_direct_runner",
+        go_file=GCS_GO,
+        pipeline_options={"output": GCS_OUTPUT},
+    )
+    # [END howto_operator_start_go_direct_runner_pipeline_gcs_file]
+
+    # [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
+    start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
+        task_id="start_go_pipeline_dataflow_runner",
+        runner="DataflowRunner",
+        go_file=GCS_GO,
+        pipeline_options={
+            'tempLocation': GCS_TMP,
+            'stagingLocation': GCS_STAGING,
+            'output': GCS_OUTPUT,
+            'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
+        },
+        dataflow_config=DataflowConfiguration(
+            job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
+        ),
+    )
+    # [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
+
+    start_go_pipeline_local_spark_runner = BeamRunGoPipelineOperator(
+        task_id="start_go_pipeline_local_spark_runner",
+        go_file='/files/apache_beam/examples/wordcount.go',
+        runner="SparkRunner",
+        pipeline_options={
+            'endpoint': '/your/spark/endpoint',
+        },
+    )
+
+    start_go_pipeline_local_flink_runner = BeamRunGoPipelineOperator(
+        task_id="start_go_pipeline_local_flink_runner",
+        go_file='/files/apache_beam/examples/wordcount.go',
+        runner="FlinkRunner",
+        pipeline_options={
+            'output': '/tmp/start_go_pipeline_local_flink_runner',
+        },
+    )
+
+    (
+        [
+            start_go_pipeline_local_direct_runner,
+            start_go_pipeline_direct_runner,
+        ]
+        >> start_go_pipeline_local_flink_runner
+        >> start_go_pipeline_local_spark_runner
+    )
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_go_dataflow.py b/tests/system/providers/apache/beam/example_go_dataflow.py
new file mode 100644
index 0000000000..590318f25a
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_go_dataflow.py
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator
+from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
+from tests.system.providers.apache.beam.utils import (
+    DEFAULT_ARGS,
+    GCP_PROJECT_ID,
+    GCS_GO_DATAFLOW_ASYNC,
+    GCS_OUTPUT,
+    GCS_STAGING,
+    GCS_TMP,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_go_dataflow_async",
+    default_args=DEFAULT_ARGS,
+    start_date=START_DATE,
+    schedule_interval="@once",
+    catchup=False,
+    tags=['example'],
+) as dag:
+    # [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
+    start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
+        task_id="start_go_job_dataflow_runner_async",
+        runner="DataflowRunner",
+        go_file=GCS_GO_DATAFLOW_ASYNC,
+        pipeline_options={
+            'tempLocation': GCS_TMP,
+            'stagingLocation': GCS_STAGING,
+            'output': GCS_OUTPUT,
+            'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
+        },
+        dataflow_config=DataflowConfiguration(
+            job_name='{{task.task_id}}',
+            project_id=GCP_PROJECT_ID,
+            location="us-central1",
+            wait_until_finished=False,
+        ),
+    )
+
+    wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
+        task_id="wait-for-go-job-async-done",
+        job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
+        expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
+        project_id=GCP_PROJECT_ID,
+        location='us-central1',
+    )
+
+    start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
+    # [END howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_java_dataflow.py b/tests/system/providers/apache/beam/example_java_dataflow.py
new file mode 100644
index 0000000000..092dff4aaf
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_java_dataflow.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+    GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
+    GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
+    GCS_OUTPUT,
+    GCS_STAGING,
+    GCS_TMP,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_java_dataflow_runner",
+    schedule_interval=None,  # Override to match your needs
+    start_date=START_DATE,
+    catchup=False,
+    tags=['example'],
+) as dag:
+    # [START howto_operator_start_java_dataflow_runner_pipeline]
+    jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
+        task_id="jar_to_local_dataflow_runner",
+        bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
+        object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
+        filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
+    )
+
+    start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
+        task_id="start_java_pipeline_dataflow",
+        runner="DataflowRunner",
+        jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
+        pipeline_options={
+            'tempLocation': GCS_TMP,
+            'stagingLocation': GCS_STAGING,
+            'output': GCS_OUTPUT,
+        },
+        job_class='org.apache.beam.examples.WordCount',
+        dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
+    )
+
+    jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
+    # [END howto_operator_start_java_dataflow_runner_pipeline]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_python.py b/tests/system/providers/apache/beam/example_python.py
new file mode 100644
index 0000000000..3349e30565
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_python.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from tests.system.providers.apache.beam.utils import (
+    DEFAULT_ARGS,
+    GCP_PROJECT_ID,
+    GCS_OUTPUT,
+    GCS_PYTHON,
+    GCS_STAGING,
+    GCS_TMP,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_python",
+    start_date=START_DATE,
+    schedule_interval=None,  # Override to match your needs
+    catchup=False,
+    default_args=DEFAULT_ARGS,
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_start_python_direct_runner_pipeline_local_file]
+    start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
+        task_id="start_python_pipeline_local_direct_runner",
+        py_file='apache_beam.examples.wordcount',
+        py_options=['-m'],
+        py_requirements=['apache-beam[gcp]==2.26.0'],
+        py_interpreter='python3',
+        py_system_site_packages=False,
+    )
+    # [END howto_operator_start_python_direct_runner_pipeline_local_file]
+
+    # [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
+    start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
+        task_id="start_python_pipeline_direct_runner",
+        py_file=GCS_PYTHON,
+        py_options=[],
+        pipeline_options={"output": GCS_OUTPUT},
+        py_requirements=['apache-beam[gcp]==2.26.0'],
+        py_interpreter='python3',
+        py_system_site_packages=False,
+    )
+    # [END howto_operator_start_python_direct_runner_pipeline_gcs_file]
+
+    # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+    start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
+        task_id="start_python_pipeline_dataflow_runner",
+        runner="DataflowRunner",
+        py_file=GCS_PYTHON,
+        pipeline_options={
+            'tempLocation': GCS_TMP,
+            'stagingLocation': GCS_STAGING,
+            'output': GCS_OUTPUT,
+        },
+        py_options=[],
+        py_requirements=['apache-beam[gcp]==2.26.0'],
+        py_interpreter='python3',
+        py_system_site_packages=False,
+        dataflow_config=DataflowConfiguration(
+            job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
+        ),
+    )
+    # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+
+    start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator(
+        task_id="start_python_pipeline_local_spark_runner",
+        py_file='apache_beam.examples.wordcount',
+        runner="SparkRunner",
+        py_options=['-m'],
+        py_requirements=['apache-beam[gcp]==2.26.0'],
+        py_interpreter='python3',
+        py_system_site_packages=False,
+    )
+
+    start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator(
+        task_id="start_python_pipeline_local_flink_runner",
+        py_file='apache_beam.examples.wordcount',
+        runner="FlinkRunner",
+        py_options=['-m'],
+        pipeline_options={
+            'output': '/tmp/start_python_pipeline_local_flink_runner',
+        },
+        py_requirements=['apache-beam[gcp]==2.26.0'],
+        py_interpreter='python3',
+        py_system_site_packages=False,
+    )
+
+    (
+        [
+            start_python_pipeline_local_direct_runner,
+            start_python_pipeline_direct_runner,
+        ]
+        >> start_python_pipeline_local_flink_runner
+        >> start_python_pipeline_local_spark_runner
+    )
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_python_dataflow.py b/tests/system/providers/apache/beam/example_python_dataflow.py
new file mode 100644
index 0000000000..f119e31a66
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_python_dataflow.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
+from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
+from tests.system.providers.apache.beam.utils import (
+    DEFAULT_ARGS,
+    GCP_PROJECT_ID,
+    GCS_OUTPUT,
+    GCS_PYTHON_DATAFLOW_ASYNC,
+    GCS_STAGING,
+    GCS_TMP,
+    START_DATE,
+)
+
+with models.DAG(
+    "example_beam_native_python_dataflow_async",
+    default_args=DEFAULT_ARGS,
+    start_date=START_DATE,
+    schedule_interval=None,  # Override to match your needs
+    catchup=False,
+    tags=['example'],
+) as dag:
+    # [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
+    start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
+        task_id="start_python_job_dataflow_runner_async",
+        runner="DataflowRunner",
+        py_file=GCS_PYTHON_DATAFLOW_ASYNC,
+        pipeline_options={
+            'tempLocation': GCS_TMP,
+            'stagingLocation': GCS_STAGING,
+            'output': GCS_OUTPUT,
+        },
+        py_options=[],
+        py_requirements=['apache-beam[gcp]==2.26.0'],
+        py_interpreter='python3',
+        py_system_site_packages=False,
+        dataflow_config=DataflowConfiguration(
+            job_name='{{task.task_id}}',
+            project_id=GCP_PROJECT_ID,
+            location="us-central1",
+            wait_until_finished=False,
+        ),
+    )
+
+    wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
+        task_id="wait-for-python-job-async-done",
+        job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
+        expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
+        project_id=GCP_PROJECT_ID,
+        location='us-central1',
+    )
+
+    start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
+    # [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/utils.py b/tests/system/providers/apache/beam/utils.py
new file mode 100644
index 0000000000..462e22169a
--- /dev/null
+++ b/tests/system/providers/apache/beam/utils.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Utils for Apache Beam operator example DAG's
+"""
+import os
+from datetime import datetime
+from urllib.parse import urlparse
+
+from airflow.utils.trigger_rule import TriggerRule
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCS_INPUT = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/shakespeare/kinglear.txt')
+GCS_TMP = os.environ.get('APACHE_BEAM_GCS_TMP', 'gs://INVALID BUCKET NAME/temp/')
+GCS_STAGING = os.environ.get('APACHE_BEAM_GCS_STAGING', 'gs://INVALID BUCKET NAME/staging/')
+GCS_OUTPUT = os.environ.get('APACHE_BEAM_GCS_OUTPUT', 'gs://INVALID BUCKET NAME/output')
+GCS_PYTHON = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/wordcount_debugging.py')
+GCS_PYTHON_DATAFLOW_ASYNC = os.environ.get(
+    'APACHE_BEAM_PYTHON_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.py'
+)
+GCS_GO = os.environ.get('APACHE_BEAM_GO', 'gs://INVALID BUCKET NAME/wordcount_debugging.go')
+GCS_GO_DATAFLOW_ASYNC = os.environ.get(
+    'APACHE_BEAM_GO_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.go'
+)
+GCS_JAR_DIRECT_RUNNER = os.environ.get(
+    'APACHE_BEAM_DIRECT_RUNNER_JAR',
+    'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-DirectRunner.jar',
+)
+GCS_JAR_DATAFLOW_RUNNER = os.environ.get(
+    'APACHE_BEAM_DATAFLOW_RUNNER_JAR', 'gs://INVALID BUCKET NAME/word-count-beam-bundled-0.1.jar'
+)
+GCS_JAR_SPARK_RUNNER = os.environ.get(
+    'APACHE_BEAM_SPARK_RUNNER_JAR',
+    'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-SparkRunner.jar',
+)
+GCS_JAR_FLINK_RUNNER = os.environ.get(
+    'APACHE_BEAM_FLINK_RUNNER_JAR',
+    'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-FlinkRunner.jar',
+)
+
+GCS_JAR_DIRECT_RUNNER_PARTS = urlparse(GCS_JAR_DIRECT_RUNNER)
+GCS_JAR_DIRECT_RUNNER_BUCKET_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.netloc
+GCS_JAR_DIRECT_RUNNER_OBJECT_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.path[1:]
+GCS_JAR_DATAFLOW_RUNNER_PARTS = urlparse(GCS_JAR_DATAFLOW_RUNNER)
+GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.netloc
+GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.path[1:]
+GCS_JAR_SPARK_RUNNER_PARTS = urlparse(GCS_JAR_SPARK_RUNNER)
+GCS_JAR_SPARK_RUNNER_BUCKET_NAME = GCS_JAR_SPARK_RUNNER_PARTS.netloc
+GCS_JAR_SPARK_RUNNER_OBJECT_NAME = GCS_JAR_SPARK_RUNNER_PARTS.path[1:]
+GCS_JAR_FLINK_RUNNER_PARTS = urlparse(GCS_JAR_FLINK_RUNNER)
+GCS_JAR_FLINK_RUNNER_BUCKET_NAME = GCS_JAR_FLINK_RUNNER_PARTS.netloc
+GCS_JAR_FLINK_RUNNER_OBJECT_NAME = GCS_JAR_FLINK_RUNNER_PARTS.path[1:]
+
+
+DEFAULT_ARGS = {
+    'default_pipeline_options': {'output': '/tmp/example_beam'},
+    'trigger_rule': TriggerRule.ALL_DONE,
+}
+START_DATE = datetime(2021, 1, 1)