You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/05 09:18:10 UTC

[airflow] branch main updated: AIP-47 - Migrate databricks DAGs to new design #22442 (#24203)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ddf9013098 AIP-47 - Migrate databricks DAGs to new design #22442 (#24203)
ddf9013098 is described below

commit ddf9013098b09176d7b34861b2357ded50b9fe26
Author: chethanuk-plutoflume <ch...@tessian.com>
AuthorDate: Sun Jun 5 10:18:04 2022 +0100

    AIP-47 - Migrate databricks DAGs to new design #22442 (#24203)
---
 airflow/providers/databricks/example_dags/__init__.py   | 17 -----------------
 docs/apache-airflow-providers-databricks/index.rst      |  2 +-
 .../operators/copy_into.rst                             |  2 +-
 .../operators/repos_create.rst                          |  2 +-
 .../operators/repos_delete.rst                          |  2 +-
 .../operators/repos_update.rst                          |  2 +-
 .../operators/sql.rst                                   |  8 ++++----
 .../operators/submit_run.rst                            |  4 ++--
 .../system/providers/databricks}/example_databricks.py  | 17 ++++++++++++++++-
 .../providers/databricks}/example_databricks_repos.py   | 17 ++++++++++++++++-
 .../providers/databricks}/example_databricks_sql.py     | 17 ++++++++++++++++-
 11 files changed, 59 insertions(+), 31 deletions(-)

diff --git a/airflow/providers/databricks/example_dags/__init__.py b/airflow/providers/databricks/example_dags/__init__.py
deleted file mode 100644
index 217e5db960..0000000000
--- a/airflow/providers/databricks/example_dags/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/docs/apache-airflow-providers-databricks/index.rst b/docs/apache-airflow-providers-databricks/index.rst
index fed33f60c9..f41834a95b 100644
--- a/docs/apache-airflow-providers-databricks/index.rst
+++ b/docs/apache-airflow-providers-databricks/index.rst
@@ -39,7 +39,7 @@ Content
     :maxdepth: 1
     :caption: Resources
 
-    Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/databricks/example_dags>
+    Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/databricks>
     PyPI Repository <https://pypi.org/project/apache-airflow-providers-databricks/>
     Installing from sources <installing-providers-from-sources>
 
diff --git a/docs/apache-airflow-providers-databricks/operators/copy_into.rst b/docs/apache-airflow-providers-databricks/operators/copy_into.rst
index 79716c256f..71a3fa9e89 100644
--- a/docs/apache-airflow-providers-databricks/operators/copy_into.rst
+++ b/docs/apache-airflow-providers-databricks/operators/copy_into.rst
@@ -46,7 +46,7 @@ Importing CSV data
 
 An example usage of the DatabricksCopyIntoOperator to import CSV data into a table is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
     :language: python
     :start-after: [START howto_operator_databricks_copy_into]
     :end-before: [END howto_operator_databricks_copy_into]
diff --git a/docs/apache-airflow-providers-databricks/operators/repos_create.rst b/docs/apache-airflow-providers-databricks/operators/repos_create.rst
index fc04340796..6611a51cd6 100644
--- a/docs/apache-airflow-providers-databricks/operators/repos_create.rst
+++ b/docs/apache-airflow-providers-databricks/operators/repos_create.rst
@@ -63,7 +63,7 @@ Create a Databricks Repo
 
 An example usage of the DatabricksReposCreateOperator is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
     :language: python
     :start-after: [START howto_operator_databricks_repo_create]
     :end-before: [END howto_operator_databricks_repo_create]
diff --git a/docs/apache-airflow-providers-databricks/operators/repos_delete.rst b/docs/apache-airflow-providers-databricks/operators/repos_delete.rst
index e359deb7c9..74d4b62972 100644
--- a/docs/apache-airflow-providers-databricks/operators/repos_delete.rst
+++ b/docs/apache-airflow-providers-databricks/operators/repos_delete.rst
@@ -55,7 +55,7 @@ Deleting Databricks Repo by specifying path
 
 An example usage of the DatabricksReposDeleteOperator is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
     :language: python
     :start-after: [START howto_operator_databricks_repo_delete]
     :end-before: [END howto_operator_databricks_repo_delete]
diff --git a/docs/apache-airflow-providers-databricks/operators/repos_update.rst b/docs/apache-airflow-providers-databricks/operators/repos_update.rst
index 0f63c24685..56af4edabb 100644
--- a/docs/apache-airflow-providers-databricks/operators/repos_update.rst
+++ b/docs/apache-airflow-providers-databricks/operators/repos_update.rst
@@ -60,7 +60,7 @@ Updating Databricks Repo by specifying path
 
 An example usage of the DatabricksReposUpdateOperator is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
     :language: python
     :start-after: [START howto_operator_databricks_repo_update]
     :end-before: [END howto_operator_databricks_repo_update]
diff --git a/docs/apache-airflow-providers-databricks/operators/sql.rst b/docs/apache-airflow-providers-databricks/operators/sql.rst
index 93a3b88007..7e80a6b7cf 100644
--- a/docs/apache-airflow-providers-databricks/operators/sql.rst
+++ b/docs/apache-airflow-providers-databricks/operators/sql.rst
@@ -49,7 +49,7 @@ Selecting data
 
 An example usage of the DatabricksSqlOperator to select data from a table is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
     :language: python
     :start-after: [START howto_operator_databricks_sql_select]
     :end-before: [END howto_operator_databricks_sql_select]
@@ -59,7 +59,7 @@ Selecting data into a file
 
 An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
     :language: python
     :start-after: [START howto_operator_databricks_sql_select_file]
     :end-before: [END howto_operator_databricks_sql_select_file]
@@ -69,7 +69,7 @@ Executing multiple statements
 
 An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
     :language: python
     :start-after: [START howto_operator_databricks_sql_multiple]
     :end-before: [END howto_operator_databricks_sql_multiple]
@@ -80,7 +80,7 @@ Executing multiple statements from a file
 
 An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
     :language: python
     :start-after: [START howto_operator_databricks_sql_multiple_file]
     :end-before: [END howto_operator_databricks_sql_multiple_file]
diff --git a/docs/apache-airflow-providers-databricks/operators/submit_run.rst b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
index 81f9dfd32f..1b32b771de 100644
--- a/docs/apache-airflow-providers-databricks/operators/submit_run.rst
+++ b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
@@ -61,7 +61,7 @@ Specifying parameters as JSON
 
 An example usage of the DatabricksSubmitRunOperator is as follows:
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py
     :language: python
     :start-after: [START howto_operator_databricks_json]
     :end-before: [END howto_operator_databricks_json]
@@ -71,7 +71,7 @@ Using named parameters
 
 You can also use named parameters to initialize the operator and run the job.
 
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py
     :language: python
     :start-after: [START howto_operator_databricks_named]
     :end-before: [END howto_operator_databricks_named]
diff --git a/airflow/providers/databricks/example_dags/example_databricks.py b/tests/system/providers/databricks/example_databricks.py
similarity index 84%
rename from airflow/providers/databricks/example_dags/example_databricks.py
rename to tests/system/providers/databricks/example_databricks.py
index bea9038afe..48a3ca922a 100644
--- a/airflow/providers/databricks/example_dags/example_databricks.py
+++ b/tests/system/providers/databricks/example_databricks.py
@@ -31,13 +31,17 @@ For more information about the state of a run refer to
 https://docs.databricks.com/api/latest/jobs.html#runstate
 """
 
+import os
 from datetime import datetime
 
 from airflow import DAG
 from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
 
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_databricks_operator"
+
 with DAG(
-    dag_id='example_databricks_operator',
+    dag_id=DAG_ID,
     schedule_interval='@daily',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
@@ -73,3 +77,14 @@ with DAG(
     )
     # [END howto_operator_databricks_named]
     notebook_task >> spark_jar_task
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/databricks/example_dags/example_databricks_repos.py b/tests/system/providers/databricks/example_databricks_repos.py
similarity index 84%
rename from airflow/providers/databricks/example_dags/example_databricks_repos.py
rename to tests/system/providers/databricks/example_databricks_repos.py
index e33d32044f..eb76abcf19 100644
--- a/airflow/providers/databricks/example_dags/example_databricks_repos.py
+++ b/tests/system/providers/databricks/example_databricks_repos.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
 from datetime import datetime
 
 from airflow import DAG
@@ -30,8 +31,11 @@ default_args = {
     'databricks_conn_id': 'databricks',
 }
 
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_databricks_repos_operator"
+
 with DAG(
-    dag_id='example_databricks_repos_operator',
+    dag_id=DAG_ID,
     schedule_interval='@daily',
     start_date=datetime(2021, 1, 1),
     default_args=default_args,
@@ -72,3 +76,14 @@ with DAG(
     # [END howto_operator_databricks_repo_delete]
 
     (create_repo >> update_repo >> notebook_task >> delete_repo)
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/databricks/example_dags/example_databricks_sql.py b/tests/system/providers/databricks/example_databricks_sql.py
similarity index 89%
rename from airflow/providers/databricks/example_dags/example_databricks_sql.py
rename to tests/system/providers/databricks/example_databricks_sql.py
index 6032c0fb03..33ee37c972 100644
--- a/airflow/providers/databricks/example_dags/example_databricks_sql.py
+++ b/tests/system/providers/databricks/example_databricks_sql.py
@@ -31,6 +31,7 @@ For more information about the state of a run refer to
 https://docs.databricks.com/api/latest/jobs.html#runstate
 """
 
+import os
 from datetime import datetime
 
 from airflow import DAG
@@ -39,8 +40,11 @@ from airflow.providers.databricks.operators.databricks_sql import (
     DatabricksSqlOperator,
 )
 
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_databricks_sql_operator"
+
 with DAG(
-    dag_id='example_databricks_sql_operator',
+    dag_id=DAG_ID,
     schedule_interval='@daily',
     start_date=datetime(2021, 1, 1),
     tags=['example'],
@@ -111,3 +115,14 @@ with DAG(
     # [END howto_operator_databricks_copy_into]
 
     (create >> create_file >> import_csv >> select >> select_into_file)
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)