You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/06 21:49:38 UTC

[airflow] branch master updated: Add How-To guide for PostgresOperator (#13281)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c75ea3  Add How-To guide for PostgresOperator (#13281)
9c75ea3 is described below

commit 9c75ea3c14b71d2f96d997aeef68c764c7d2984c
Author: Dr. Dennis Akpenyi <de...@gmail.com>
AuthorDate: Wed Jan 6 22:49:25 2021 +0100

    Add How-To guide for PostgresOperator (#13281)
    
    closes: #11917
---
 .../{provider.yaml => example_dags/__init__.py}    |  27 ---
 .../postgres/example_dags/example_postgres.py      |  81 +++++++++
 airflow/providers/postgres/provider.yaml           |   4 +-
 docs/apache-airflow-providers-postgres/index.rst   |   9 +-
 .../operators/postgres_operator_howto_guide.rst    | 181 +++++++++++++++++++++
 .../postgres/operators/test_postgres_system.py     |  33 ++--
 6 files changed, 285 insertions(+), 50 deletions(-)

diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/example_dags/__init__.py
similarity index 57%
copy from airflow/providers/postgres/provider.yaml
copy to airflow/providers/postgres/example_dags/__init__.py
index 54952a7..13a8339 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/airflow/providers/postgres/example_dags/__init__.py
@@ -14,30 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-postgres
-name: PostgreSQL
-description: |
-    `PostgreSQL <https://www.postgresql.org/>`__
-
-versions:
-  - 1.0.0
-
-integrations:
-  - integration-name: PostgreSQL
-    external-doc-url: https://www.postgresql.org/
-    tags: [software]
-
-operators:
-  - integration-name: PostgreSQL
-    python-modules:
-      - airflow.providers.postgres.operators.postgres
-
-hooks:
-  - integration-name: PostgreSQL
-    python-modules:
-      - airflow.providers.postgres.hooks.postgres
-
-hook-class-names:
-  - airflow.providers.postgres.hooks.postgres.PostgresHook
diff --git a/airflow/providers/postgres/example_dags/example_postgres.py b/airflow/providers/postgres/example_dags/example_postgres.py
new file mode 100644
index 0000000..5537b1c
--- /dev/null
+++ b/airflow/providers/postgres/example_dags/example_postgres.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [START postgres_operator_howto_guide]
+import datetime
+
+from airflow import DAG
+from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+default_args = {"owner": "airflow"}
+
+# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
+# instantiating the Postgres Operator
+
+with DAG(
+    dag_id="postgres_operator_dag",
+    start_date=datetime.datetime(2020, 2, 2),
+    schedule_interval="@once",
+    default_args=default_args,
+    catchup=False,
+) as dag:
+    # [START postgres_operator_howto_guide_create_pet_table]
+    create_pet_table = PostgresOperator(
+        task_id="create_pet_table",
+        postgres_conn_id="postgres_default",
+        sql="""
+            CREATE TABLE IF NOT EXISTS pet (
+            pet_id SERIAL PRIMARY KEY,
+            name VARCHAR NOT NULL,
+            pet_type VARCHAR NOT NULL,
+            birth_date DATE NOT NULL,
+            OWNER VARCHAR NOT NULL);
+          """,
+    )
+    # [END postgres_operator_howto_guide_create_pet_table]
+    # [START postgres_operator_howto_guide_populate_pet_table]
+    populate_pet_table = PostgresOperator(
+        task_id="populate_pet_table",
+        postgres_conn_id="postgres_default",
+        sql="""
+            INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
+            INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
+            INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
+            INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
+            """,
+    )
+    # [END postgres_operator_howto_guide_populate_pet_table]
+    # [START postgres_operator_howto_guide_get_all_pets]
+    get_all_pets = PostgresOperator(
+        task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
+    )
+    # [END postgres_operator_howto_guide_get_all_pets]
+    # [START postgres_operator_howto_guide_get_birth_date]
+    get_birth_date = PostgresOperator(
+        task_id="get_birth_date",
+        postgres_conn_id="postgres_default",
+        sql="""
+            SELECT * FROM pet
+            WHERE birth_date
+            BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
+            """,
+        params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
+    )
+    # [START postgres_operator_howto_guide_get_birth_date]
+
+    create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
+    # [END postgres_operator_howto_guide]
diff --git a/airflow/providers/postgres/provider.yaml b/airflow/providers/postgres/provider.yaml
index 54952a7..591c90a 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/airflow/providers/postgres/provider.yaml
@@ -19,7 +19,7 @@
 package-name: apache-airflow-providers-postgres
 name: PostgreSQL
 description: |
-    `PostgreSQL <https://www.postgresql.org/>`__
+  `PostgreSQL <https://www.postgresql.org/>`__
 
 versions:
   - 1.0.0
@@ -27,6 +27,8 @@ versions:
 integrations:
   - integration-name: PostgreSQL
     external-doc-url: https://www.postgresql.org/
+    how-to-guide:
+      - /docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
     tags: [software]
 
 operators:
diff --git a/docs/apache-airflow-providers-postgres/index.rst b/docs/apache-airflow-providers-postgres/index.rst
index fcd910f..7ab296a 100644
--- a/docs/apache-airflow-providers-postgres/index.rst
+++ b/docs/apache-airflow-providers-postgres/index.rst
@@ -1,4 +1,3 @@
-
  .. Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
@@ -27,6 +26,8 @@ Content
     :caption: Guides
 
     Connection types <connections/postgres>
+    PostgresOperator types <operators/postgres_operator_howto_guide>
+
 
 .. toctree::
     :maxdepth: 1
@@ -38,4 +39,10 @@ Content
     :maxdepth: 1
     :caption: Resources
 
+    Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/postgres/example_dags>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
     PyPI Repository <https://pypi.org/project/apache-airflow-providers-postgres/>
diff --git a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
new file mode 100644
index 0000000..8d9bd96
--- /dev/null
+++ b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
@@ -0,0 +1,181 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+How-to Guide for PostgresOperator
+=================================
+
+Introduction
+------------
+
+Apache Airflow has a robust trove of operators that can be used to implement the various tasks that make up your
+workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tasks (nodes) and dependencies (edges).
+
+A task defined or implemented by a operator is a unit of work in your data pipeline.
+
+The purpose of Postgres Operator is to define tasks involving interactions with the PostgreSQL database.
+ In ``Airflow-2.0``, the ``PostgresOperator`` class resides at ``airflow.providers.postgres.operator.postgres``.
+
+Under the hood, the :class:`~airflow.providers.postgres.operator.postgres.PostgresOperator` delegates its heavy lifting to the :class:`~airflow.providers.postgres.hooks.postgres.PostgresHook`.
+
+Common Database Operations with PostgresOperator
+------------------------------------------------
+
+To use the postgres operator to carry out SQL request, two parameters are required: ``sql`` and ``postgres_conn_id``.
+These two parameters are eventually fed to the postgres hook object that interacts directly with the postgres database.
+
+Creating a Postgres database table
+----------------------------------
+
+The code snippets below are based on Airflow-2.0
+
+.. exampleinclude:: /../../airflow/providers/postgres/example_dags/example_postgres.py
+    :language: python
+    :start-after: [START postgres_operator_howto_guide]
+    :end-before: [END postgres_operator_howto_guide_create_pet_table]
+
+
+Dumping SQL statements into your PostgresOperator isn't quite appealing and will create maintainability pains somewhere
+down to the road. To prevent this, Airflow offers an elegant solution. This is how it works: you simply create
+a directory inside the DAG folder called ``sql`` and then put all the SQL files containing your SQL queries inside it.
+
+Your ``dags/sql/pet_schema.sql`` should like this:
+
+::
+
+      -- create pet table
+      CREATE TABLE IF NOT EXISTS pet (
+          pet_id SERIAL PRIMARY KEY,
+          name VARCHAR NOT NULL,
+          pet_type VARCHAR NOT NULL,
+          birth_date DATE NOT NULL,
+          OWNER VARCHAR NOT NULL);
+
+
+Now let's refactor ``create_pet_table`` in our DAG:
+
+.. code-block:: python
+
+        create_pet_table = PostgresOperator(
+              task_id="create_pet_table",
+              postgres_conn_id="postgres_default",
+              sql="sql/pet_schema.sql"
+              )
+
+
+Inserting data into a Postgres database table
+---------------------------------------------
+
+Let's say we already have the SQL insert statement below in our ``dags/sql/pet_schema.sql`` file:
+
+::
+
+  -- populate pet table
+  INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
+  INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
+  INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
+  INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
+
+We can then create a PostgresOperator task that populate the ``pet`` table.
+
+.. code-block:: python
+
+  populate_pet_table = PostgresOperator(
+            task_id="populate_pet_table",
+            postgres_conn_id="postgres_default",
+            sql="sql/pet_schema.sql"
+            )
+
+
+Fetching records from your postgres database table
+--------------------------------------------------
+
+Fetching records from your postgres database table can be as simple as:
+
+.. code-block:: python
+
+  get_all_pets = PostgresOperator(
+            task_id="get_all_pets",
+            postgres_conn_id="postgres_default",
+            sql="SELECT * FROM pet;"
+            )
+
+
+
+Passing Parameters into PostgresOperator
+----------------------------------------
+
+PostgresOperator provides ``parameters`` attribute which makes it possible to dynamically inject values into your
+SQL requests during runtime. The BaseOperator class has the ``params`` attribute which is available to the PostgresOperator
+by virtue of inheritance. Both ``parameters`` and ``params`` make it possible to dynamically pass in parameters in many
+interesting ways.
+
+To find the owner of the pet called 'Lester':
+
+.. code-block:: python
+
+  get_birth_date = PostgresOperator(
+            task_id="get_birth_date",
+            postgres_conn_id="postgres_default",
+            sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
+            parameters={
+                'begin_date': '2020-01-01',
+                'end_date': '2020-12-31'
+                }
+            )
+
+Now lets refactor our ``get_birth_date`` task. Instead of dumping SQL statements directly into our code, let's tidy things up
+by creating a sql file.
+
+::
+
+  -- dags/sql/birth_date.sql
+  SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
+
+And this time we will use the ``params`` attribute which we get for free from the parent ``BaseOperator``
+class.
+
+.. code-block:: python
+
+  get_birth_date = PostgresOperator(
+          task_id="get_birth_date",
+          postgres_conn_id="postgres_default",
+          sql="sql/birth_date.sql",
+          params={
+             'begin_date': '2020-01-01',
+              'end_date': '2020-12-31'
+            }
+          )
+
+The complete Postgres Operator DAG
+----------------------------------
+
+When we put everything together, our DAG should look like this:
+
+.. exampleinclude:: /../../airflow/providers/postgres/example_dags/example_postgres.py
+    :language: python
+    :start-after: [START postgres_operator_howto_guide]
+    :end-before: [END postgres_operator_howto_guide]
+
+
+Conclusion
+----------
+
+In this how-to guide we explored the Apache Airflow PostgreOperator. Let's quickly highlight the key takeaways.
+In Airflow-2.0, PostgresOperator class now resides in the ``providers`` package. It is best practice to create subdirectory
+called ``sql`` in your ``dags`` directory where you can store your sql files. This will make your code more elegant and more
+maintainable. And finally, we looked at the different ways you can dynamically pass parameters into our postgres operator
+tasks  using ``parameters`` or ``params`` attribute.
diff --git a/airflow/providers/postgres/provider.yaml b/tests/providers/postgres/operators/test_postgres_system.py
similarity index 57%
copy from airflow/providers/postgres/provider.yaml
copy to tests/providers/postgres/operators/test_postgres_system.py
index 54952a7..1d66c23 100644
--- a/airflow/providers/postgres/provider.yaml
+++ b/tests/providers/postgres/operators/test_postgres_system.py
@@ -15,29 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
----
-package-name: apache-airflow-providers-postgres
-name: PostgreSQL
-description: |
-    `PostgreSQL <https://www.postgresql.org/>`__
+import os
 
-versions:
-  - 1.0.0
+import pytest
 
-integrations:
-  - integration-name: PostgreSQL
-    external-doc-url: https://www.postgresql.org/
-    tags: [software]
+from tests.test_utils import AIRFLOW_MAIN_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
 
-operators:
-  - integration-name: PostgreSQL
-    python-modules:
-      - airflow.providers.postgres.operators.postgres
+POSTGRES_OPERATOR_DAG_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "providers", "postgres", "example_dags"
+)
 
-hooks:
-  - integration-name: PostgreSQL
-    python-modules:
-      - airflow.providers.postgres.hooks.postgres
 
-hook-class-names:
-  - airflow.providers.postgres.hooks.postgres.PostgresHook
+@pytest.mark.backend("postgres")
+@pytest.mark.system("postgres")
+class PostgresOperatorExampleDagSystemTest(SystemTest):
+    def test_run_example_dag_postgres_operator(self):
+        self.run_dag('postgres_operator_dag', POSTGRES_OPERATOR_DAG_FOLDER)