You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/06 21:49:38 UTC
[airflow] branch master updated: Add How-To guide for
PostgresOperator (#13281)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 9c75ea3 Add How-To guide for PostgresOperator (#13281)
9c75ea3 is described below
commit 9c75ea3c14b71d2f96d997aeef68c764c7d2984c
Author: Dr. Dennis Akpenyi <de...@gmail.com>
AuthorDate: Wed Jan 6 22:49:25 2021 +0100
Add How-To guide for PostgresOperator (#13281)
closes: #11917
---
.../{provider.yaml => example_dags/__init__.py} | 27 ---
.../postgres/example_dags/example_postgres.py | 81 +++++++++
airflow/providers/postgres/provider.yaml | 4 +-
docs/apache-airflow-providers-postgres/index.rst | 9 +-
.../operators/postgres_operator_howto_guide.rst | 181 +++++++++++++++++++++
.../postgres/operators/test_postgres_system.py | 33 ++--
6 files changed, 285 insertions(+), 50 deletions(-)
diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/example_dags/__init__.py
similarity index 57%
copy from airflow/providers/postgres/provider.yaml
copy to airflow/providers/postgres/example_dags/__init__.py
index 54952a7..13a8339 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/airflow/providers/postgres/example_dags/__init__.py
@@ -14,30 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-postgres
-name: PostgreSQL
-description: |
- `PostgreSQL <https://www.postgresql.org/>`__
-
-versions:
- - 1.0.0
-
-integrations:
- - integration-name: PostgreSQL
- external-doc-url: https://www.postgresql.org/
- tags: [software]
-
-operators:
- - integration-name: PostgreSQL
- python-modules:
- - airflow.providers.postgres.operators.postgres
-
-hooks:
- - integration-name: PostgreSQL
- python-modules:
- - airflow.providers.postgres.hooks.postgres
-
-hook-class-names:
- - airflow.providers.postgres.hooks.postgres.PostgresHook
diff --git a/airflow/providers/postgres/example_dags/example_postgres.py b/airflow/providers/postgres/example_dags/example_postgres.py
new file mode 100644
index 0000000..5537b1c
--- /dev/null
+++ b/airflow/providers/postgres/example_dags/example_postgres.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [START postgres_operator_howto_guide]
+import datetime
+
+from airflow import DAG
+from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+default_args = {"owner": "airflow"}
+
+# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
+# instantiating the Postgres Operator
+
+with DAG(
+ dag_id="postgres_operator_dag",
+ start_date=datetime.datetime(2020, 2, 2),
+ schedule_interval="@once",
+ default_args=default_args,
+ catchup=False,
+) as dag:
+ # [START postgres_operator_howto_guide_create_pet_table]
+ create_pet_table = PostgresOperator(
+ task_id="create_pet_table",
+ postgres_conn_id="postgres_default",
+ sql="""
+ CREATE TABLE IF NOT EXISTS pet (
+ pet_id SERIAL PRIMARY KEY,
+ name VARCHAR NOT NULL,
+ pet_type VARCHAR NOT NULL,
+ birth_date DATE NOT NULL,
+ OWNER VARCHAR NOT NULL);
+ """,
+ )
+ # [END postgres_operator_howto_guide_create_pet_table]
+ # [START postgres_operator_howto_guide_populate_pet_table]
+ populate_pet_table = PostgresOperator(
+ task_id="populate_pet_table",
+ postgres_conn_id="postgres_default",
+ sql="""
+ INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
+ INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
+ INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
+ INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
+ """,
+ )
+ # [END postgres_operator_howto_guide_populate_pet_table]
+ # [START postgres_operator_howto_guide_get_all_pets]
+ get_all_pets = PostgresOperator(
+ task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
+ )
+ # [END postgres_operator_howto_guide_get_all_pets]
+ # [START postgres_operator_howto_guide_get_birth_date]
+ get_birth_date = PostgresOperator(
+ task_id="get_birth_date",
+ postgres_conn_id="postgres_default",
+ sql="""
+ SELECT * FROM pet
+ WHERE birth_date
+ BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
+ """,
+ params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
+ )
+ # [START postgres_operator_howto_guide_get_birth_date]
+
+ create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
+ # [END postgres_operator_howto_guide]
diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/provider.yaml
index 54952a7..591c90a 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/airflow/providers/postgres/provider.yaml
@@ -19,7 +19,7 @@
package-name: apache-airflow-providers-postgres
name: PostgreSQL
description: |
- `PostgreSQL <https://www.postgresql.org/>`__
+ `PostgreSQL <https://www.postgresql.org/>`__
versions:
- 1.0.0
@@ -27,6 +27,8 @@ versions:
integrations:
- integration-name: PostgreSQL
external-doc-url: https://www.postgresql.org/
+ how-to-guide:
+ - /docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
tags: [software]
operators:
diff --git a/docs/apache-airflow-providers-postgres/index.rst b/docs/apache-airflow-providers-postgres/index.rst
index fcd910f..7ab296a 100644
--- a/docs/apache-airflow-providers-postgres/index.rst
+++ b/docs/apache-airflow-providers-postgres/index.rst
@@ -1,4 +1,3 @@
-
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@@ -27,6 +26,8 @@ Content
:caption: Guides
Connection types <connections/postgres>
+ PostgresOperator types <operators/postgres_operator_howto_guide>
+
.. toctree::
:maxdepth: 1
@@ -38,4 +39,10 @@ Content
:maxdepth: 1
:caption: Resources
+ Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/postgres/example_dags>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Resources
+
PyPI Repository <https://pypi.org/project/apache-airflow-providers-postgres/>
diff --git a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
new file mode 100644
index 0000000..8d9bd96
--- /dev/null
+++ b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
@@ -0,0 +1,181 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+How-to Guide for PostgresOperator
+=================================
+
+Introduction
+------------
+
+Apache Airflow has a robust trove of operators that can be used to implement the various tasks that make up your
+workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tasks (nodes) and dependencies (edges).
+
+A task defined or implemented by a operator is a unit of work in your data pipeline.
+
+The purpose of Postgres Operator is to define tasks involving interactions with the PostgreSQL database.
+ In ``Airflow-2.0``, the ``PostgresOperator`` class resides at ``airflow.providers.postgres.operator.postgres``.
+
+Under the hood, the :class:`~airflow.providers.postgres.operator.postgres.PostgresOperator` delegates its heavy lifting to the :class:`~airflow.providers.postgres.hooks.postgres.PostgresHook`.
+
+Common Database Operations with PostgresOperator
+------------------------------------------------
+
+To use the postgres operator to carry out SQL request, two parameters are required: ``sql`` and ``postgres_conn_id``.
+These two parameters are eventually fed to the postgres hook object that interacts directly with the postgres database.
+
+Creating a Postgres database table
+----------------------------------
+
+The code snippets below are based on Airflow-2.0
+
+.. exampleinclude:: /../../airflow/providers/postgres/example_dags/example_postgres.py
+ :language: python
+ :start-after: [START postgres_operator_howto_guide]
+ :end-before: [END postgres_operator_howto_guide_create_pet_table]
+
+
+Dumping SQL statements into your PostgresOperator isn't quite appealing and will create maintainability pains somewhere
+down to the road. To prevent this, Airflow offers an elegant solution. This is how it works: you simply create
+a directory inside the DAG folder called ``sql`` and then put all the SQL files containing your SQL queries inside it.
+
+Your ``dags/sql/pet_schema.sql`` should like this:
+
+::
+
+ -- create pet table
+ CREATE TABLE IF NOT EXISTS pet (
+ pet_id SERIAL PRIMARY KEY,
+ name VARCHAR NOT NULL,
+ pet_type VARCHAR NOT NULL,
+ birth_date DATE NOT NULL,
+ OWNER VARCHAR NOT NULL);
+
+
+Now let's refactor ``create_pet_table`` in our DAG:
+
+.. code-block:: python
+
+ create_pet_table = PostgresOperator(
+ task_id="create_pet_table",
+ postgres_conn_id="postgres_default",
+ sql="sql/pet_schema.sql"
+ )
+
+
+Inserting data into a Postgres database table
+---------------------------------------------
+
+Let's say we already have the SQL insert statement below in our ``dags/sql/pet_schema.sql`` file:
+
+::
+
+ -- populate pet table
+ INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
+ INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
+ INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
+ INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
+
+We can then create a PostgresOperator task that populate the ``pet`` table.
+
+.. code-block:: python
+
+ populate_pet_table = PostgresOperator(
+ task_id="populate_pet_table",
+ postgres_conn_id="postgres_default",
+ sql="sql/pet_schema.sql"
+ )
+
+
+Fetching records from your postgres database table
+--------------------------------------------------
+
+Fetching records from your postgres database table can be as simple as:
+
+.. code-block:: python
+
+ get_all_pets = PostgresOperator(
+ task_id="get_all_pets",
+ postgres_conn_id="postgres_default",
+ sql="SELECT * FROM pet;"
+ )
+
+
+
+Passing Parameters into PostgresOperator
+----------------------------------------
+
+PostgresOperator provides ``parameters`` attribute which makes it possible to dynamically inject values into your
+SQL requests during runtime. The BaseOperator class has the ``params`` attribute which is available to the PostgresOperator
+by virtue of inheritance. Both ``parameters`` and ``params`` make it possible to dynamically pass in parameters in many
+interesting ways.
+
+To find the owner of the pet called 'Lester':
+
+.. code-block:: python
+
+ get_birth_date = PostgresOperator(
+ task_id="get_birth_date",
+ postgres_conn_id="postgres_default",
+ sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
+ parameters={
+ 'begin_date': '2020-01-01',
+ 'end_date': '2020-12-31'
+ }
+ )
+
+Now lets refactor our ``get_birth_date`` task. Instead of dumping SQL statements directly into our code, let's tidy things up
+by creating a sql file.
+
+::
+
+ -- dags/sql/birth_date.sql
+ SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
+
+And this time we will use the ``params`` attribute which we get for free from the parent ``BaseOperator``
+class.
+
+.. code-block:: python
+
+ get_birth_date = PostgresOperator(
+ task_id="get_birth_date",
+ postgres_conn_id="postgres_default",
+ sql="sql/birth_date.sql",
+ params={
+ 'begin_date': '2020-01-01',
+ 'end_date': '2020-12-31'
+ }
+ )
+
+The complete Postgres Operator DAG
+----------------------------------
+
+When we put everything together, our DAG should look like this:
+
+.. exampleinclude:: /../../airflow/providers/postgres/example_dags/example_postgres.py
+ :language: python
+ :start-after: [START postgres_operator_howto_guide]
+ :end-before: [END postgres_operator_howto_guide]
+
+
+Conclusion
+----------
+
+In this how-to guide we explored the Apache Airflow PostgreOperator. Let's quickly highlight the key takeaways.
+In Airflow-2.0, PostgresOperator class now resides in the ``providers`` package. It is best practice to create subdirectory
+called ``sql`` in your ``dags`` directory where you can store your sql files. This will make your code more elegant and more
+maintainable. And finally, we looked at the different ways you can dynamically pass parameters into our postgres operator
+tasks using ``parameters`` or ``params`` attribute.
diff --git a/airflow/providers/postgres/provider.yaml b/tests/providers/postgres/operators/test_postgres_system.py
similarity index 57%
copy from airflow/providers/postgres/provider.yaml
copy to tests/providers/postgres/operators/test_postgres_system.py
index 54952a7..1d66c23 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/tests/providers/postgres/operators/test_postgres_system.py
@@ -15,29 +15,20 @@
# specific language governing permissions and limitations
# under the License.
----
-package-name: apache-airflow-providers-postgres
-name: PostgreSQL
-description: |
- `PostgreSQL <https://www.postgresql.org/>`__
+import os
-versions:
- - 1.0.0
+import pytest
-integrations:
- - integration-name: PostgreSQL
- external-doc-url: https://www.postgresql.org/
- tags: [software]
+from tests.test_utils import AIRFLOW_MAIN_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
-operators:
- - integration-name: PostgreSQL
- python-modules:
- - airflow.providers.postgres.operators.postgres
+POSTGRES_OPERATOR_DAG_FOLDER = os.path.join(
+ AIRFLOW_MAIN_FOLDER, "airflow", "providers", "postgres", "example_dags"
+)
-hooks:
- - integration-name: PostgreSQL
- python-modules:
- - airflow.providers.postgres.hooks.postgres
-hook-class-names:
- - airflow.providers.postgres.hooks.postgres.PostgresHook
+@pytest.mark.backend("postgres")
+@pytest.mark.system("postgres")
+class PostgresOperatorExampleDagSystemTest(SystemTest):
+ def test_run_example_dag_postgres_operator(self):
+ self.run_dag('postgres_operator_dag', POSTGRES_OPERATOR_DAG_FOLDER)