You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/05 09:18:10 UTC
[airflow] branch main updated: AIP-47 - Migrate databricks DAGs to new design #22442 (#24203)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ddf9013098 AIP-47 - Migrate databricks DAGs to new design #22442 (#24203)
ddf9013098 is described below
commit ddf9013098b09176d7b34861b2357ded50b9fe26
Author: chethanuk-plutoflume <ch...@tessian.com>
AuthorDate: Sun Jun 5 10:18:04 2022 +0100
AIP-47 - Migrate databricks DAGs to new design #22442 (#24203)
---
airflow/providers/databricks/example_dags/__init__.py | 17 -----------------
docs/apache-airflow-providers-databricks/index.rst | 2 +-
.../operators/copy_into.rst | 2 +-
.../operators/repos_create.rst | 2 +-
.../operators/repos_delete.rst | 2 +-
.../operators/repos_update.rst | 2 +-
.../operators/sql.rst | 8 ++++----
.../operators/submit_run.rst | 4 ++--
.../system/providers/databricks}/example_databricks.py | 17 ++++++++++++++++-
.../providers/databricks}/example_databricks_repos.py | 17 ++++++++++++++++-
.../providers/databricks}/example_databricks_sql.py | 17 ++++++++++++++++-
11 files changed, 59 insertions(+), 31 deletions(-)
diff --git a/airflow/providers/databricks/example_dags/__init__.py b/airflow/providers/databricks/example_dags/__init__.py
deleted file mode 100644
index 217e5db960..0000000000
--- a/airflow/providers/databricks/example_dags/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/docs/apache-airflow-providers-databricks/index.rst b/docs/apache-airflow-providers-databricks/index.rst
index fed33f60c9..f41834a95b 100644
--- a/docs/apache-airflow-providers-databricks/index.rst
+++ b/docs/apache-airflow-providers-databricks/index.rst
@@ -39,7 +39,7 @@ Content
:maxdepth: 1
:caption: Resources
- Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/databricks/example_dags>
+ Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/databricks>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-databricks/>
Installing from sources <installing-providers-from-sources>
diff --git a/docs/apache-airflow-providers-databricks/operators/copy_into.rst b/docs/apache-airflow-providers-databricks/operators/copy_into.rst
index 79716c256f..71a3fa9e89 100644
--- a/docs/apache-airflow-providers-databricks/operators/copy_into.rst
+++ b/docs/apache-airflow-providers-databricks/operators/copy_into.rst
@@ -46,7 +46,7 @@ Importing CSV data
An example usage of the DatabricksCopyIntoOperator to import CSV data into a table is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_copy_into]
:end-before: [END howto_operator_databricks_copy_into]
diff --git a/docs/apache-airflow-providers-databricks/operators/repos_create.rst b/docs/apache-airflow-providers-databricks/operators/repos_create.rst
index fc04340796..6611a51cd6 100644
--- a/docs/apache-airflow-providers-databricks/operators/repos_create.rst
+++ b/docs/apache-airflow-providers-databricks/operators/repos_create.rst
@@ -63,7 +63,7 @@ Create a Databricks Repo
An example usage of the DatabricksReposCreateOperator is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
:language: python
:start-after: [START howto_operator_databricks_repo_create]
:end-before: [END howto_operator_databricks_repo_create]
diff --git a/docs/apache-airflow-providers-databricks/operators/repos_delete.rst b/docs/apache-airflow-providers-databricks/operators/repos_delete.rst
index e359deb7c9..74d4b62972 100644
--- a/docs/apache-airflow-providers-databricks/operators/repos_delete.rst
+++ b/docs/apache-airflow-providers-databricks/operators/repos_delete.rst
@@ -55,7 +55,7 @@ Deleting Databricks Repo by specifying path
An example usage of the DatabricksReposDeleteOperator is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
:language: python
:start-after: [START howto_operator_databricks_repo_delete]
:end-before: [END howto_operator_databricks_repo_delete]
diff --git a/docs/apache-airflow-providers-databricks/operators/repos_update.rst b/docs/apache-airflow-providers-databricks/operators/repos_update.rst
index 0f63c24685..56af4edabb 100644
--- a/docs/apache-airflow-providers-databricks/operators/repos_update.rst
+++ b/docs/apache-airflow-providers-databricks/operators/repos_update.rst
@@ -60,7 +60,7 @@ Updating Databricks Repo by specifying path
An example usage of the DatabricksReposUpdateOperator is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
:language: python
:start-after: [START howto_operator_databricks_repo_update]
:end-before: [END howto_operator_databricks_repo_update]
diff --git a/docs/apache-airflow-providers-databricks/operators/sql.rst b/docs/apache-airflow-providers-databricks/operators/sql.rst
index 93a3b88007..7e80a6b7cf 100644
--- a/docs/apache-airflow-providers-databricks/operators/sql.rst
+++ b/docs/apache-airflow-providers-databricks/operators/sql.rst
@@ -49,7 +49,7 @@ Selecting data
An example usage of the DatabricksSqlOperator to select data from a table is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_select]
:end-before: [END howto_operator_databricks_sql_select]
@@ -59,7 +59,7 @@ Selecting data into a file
An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_select_file]
:end-before: [END howto_operator_databricks_sql_select_file]
@@ -69,7 +69,7 @@ Executing multiple statements
An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_multiple]
:end-before: [END howto_operator_databricks_sql_multiple]
@@ -80,7 +80,7 @@ Executing multiple statements from a file
An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_multiple_file]
:end-before: [END howto_operator_databricks_sql_multiple_file]
diff --git a/docs/apache-airflow-providers-databricks/operators/submit_run.rst b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
index 81f9dfd32f..1b32b771de 100644
--- a/docs/apache-airflow-providers-databricks/operators/submit_run.rst
+++ b/docs/apache-airflow-providers-databricks/operators/submit_run.rst
@@ -61,7 +61,7 @@ Specifying parameters as JSON
An example usage of the DatabricksSubmitRunOperator is as follows:
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py
:language: python
:start-after: [START howto_operator_databricks_json]
:end-before: [END howto_operator_databricks_json]
@@ -71,7 +71,7 @@ Using named parameters
You can also use named parameters to initialize the operator and run the job.
-.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py
+.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py
:language: python
:start-after: [START howto_operator_databricks_named]
:end-before: [END howto_operator_databricks_named]
diff --git a/airflow/providers/databricks/example_dags/example_databricks.py b/tests/system/providers/databricks/example_databricks.py
similarity index 84%
rename from airflow/providers/databricks/example_dags/example_databricks.py
rename to tests/system/providers/databricks/example_databricks.py
index bea9038afe..48a3ca922a 100644
--- a/airflow/providers/databricks/example_dags/example_databricks.py
+++ b/tests/system/providers/databricks/example_databricks.py
@@ -31,13 +31,17 @@ For more information about the state of a run refer to
https://docs.databricks.com/api/latest/jobs.html#runstate
"""
+import os
from datetime import datetime
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_databricks_operator"
+
with DAG(
- dag_id='example_databricks_operator',
+ dag_id=DAG_ID,
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
tags=['example'],
@@ -73,3 +77,14 @@ with DAG(
)
# [END howto_operator_databricks_named]
notebook_task >> spark_jar_task
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/databricks/example_dags/example_databricks_repos.py b/tests/system/providers/databricks/example_databricks_repos.py
similarity index 84%
rename from airflow/providers/databricks/example_dags/example_databricks_repos.py
rename to tests/system/providers/databricks/example_databricks_repos.py
index e33d32044f..eb76abcf19 100644
--- a/airflow/providers/databricks/example_dags/example_databricks_repos.py
+++ b/tests/system/providers/databricks/example_databricks_repos.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import os
from datetime import datetime
from airflow import DAG
@@ -30,8 +31,11 @@ default_args = {
'databricks_conn_id': 'databricks',
}
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_databricks_repos_operator"
+
with DAG(
- dag_id='example_databricks_repos_operator',
+ dag_id=DAG_ID,
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
default_args=default_args,
@@ -72,3 +76,14 @@ with DAG(
# [END howto_operator_databricks_repo_delete]
(create_repo >> update_repo >> notebook_task >> delete_repo)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/databricks/example_dags/example_databricks_sql.py b/tests/system/providers/databricks/example_databricks_sql.py
similarity index 89%
rename from airflow/providers/databricks/example_dags/example_databricks_sql.py
rename to tests/system/providers/databricks/example_databricks_sql.py
index 6032c0fb03..33ee37c972 100644
--- a/airflow/providers/databricks/example_dags/example_databricks_sql.py
+++ b/tests/system/providers/databricks/example_databricks_sql.py
@@ -31,6 +31,7 @@ For more information about the state of a run refer to
https://docs.databricks.com/api/latest/jobs.html#runstate
"""
+import os
from datetime import datetime
from airflow import DAG
@@ -39,8 +40,11 @@ from airflow.providers.databricks.operators.databricks_sql import (
DatabricksSqlOperator,
)
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_databricks_sql_operator"
+
with DAG(
- dag_id='example_databricks_sql_operator',
+ dag_id=DAG_ID,
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
tags=['example'],
@@ -111,3 +115,14 @@ with DAG(
# [END howto_operator_databricks_copy_into]
(create >> create_file >> import_csv >> select >> select_into_file)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)