You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/23 13:22:00 UTC

[airflow] 06/24: Doc: Improve tutorial documentation and code (#19186)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2cc9ed00b22245fcc9c9feb4dac36817a1447ff5
Author: Kian Yang Lee <ke...@gmail.com>
AuthorDate: Wed Oct 27 02:37:01 2021 +0800

    Doc: Improve tutorial documentation and code (#19186)
    
    1. Added instructions on adding postgres connection.
    2. Modified proper SQL syntax.
    3. Remove redundant lines when writing to CSV.
    4. Added QUOTE argument for copy_expert
    
    (cherry picked from commit f83099cd4c2eaecfd363fce4562aa19e975c146c)
---
 docs/apache-airflow/tutorial.rst | 99 ++++++++++++++++++++++------------------
 1 file changed, 55 insertions(+), 44 deletions(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index d9587bc..acb7e84 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -377,73 +377,86 @@ We need to have docker and postgres installed.
 We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
 Follow the instructions properly to set up Airflow.
 
-Create a Employee table in postgres using this
+Create a Employee table in postgres using this:
 
 .. code-block:: sql
 
-  create table "Employees"
+  CREATE TABLE "Employees"
   (
-      "Serial Number" numeric not null
-   constraint employees_pk
-              primary key,
-      "Company Name" text,
-      "Employee Markme" text,
-      "Description" text,
-      "Leave" integer
+      "Serial Number" NUMERIC PRIMARY KEY,
+      "Company Name" TEXT,
+      "Employee Markme" TEXT,
+      "Description" TEXT,
+      "Leave" INTEGER
   );
 
-  create table "Employees_temp"
+  CREATE TABLE "Employees_temp"
   (
-      "Serial Number" numeric not null
-   constraint employees_temp_pk
-              primary key,
-      "Company Name" text,
-      "Employee Markme" text,
-      "Description" text,
-      "Leave" integer
+      "Serial Number" NUMERIC PRIMARY KEY,
+      "Company Name" TEXT,
+      "Employee Markme" TEXT,
+      "Description" TEXT,
+      "Leave" INTEGER
   );
 
+We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
+
+- Conn id: LOCAL
+- Conn Type: postgres
+- Host: postgres
+- Schema: <DATABASE_NAME>
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.
 
 Let's break this down into 2 steps: get data & merge data:
 
 .. code-block:: python
 
+  from airflow.decorators import dag, task
+  from airflow.hooks.postgres import PostgresHook
+  from datetime import datetime, timedelta
+  import requests
+
+
   @task
   def get_data():
-      data_path = "/usr/local/airflow/dags/files/employees.csv"
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
 
       url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
 
       response = requests.request("GET", url)
 
       with open(data_path, "w") as file:
-          for row in response.text.split("\n"):
-              if row:
-                  file.write(row + "\n")
+          file.write(response.text)
 
       postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
       conn = postgres_hook.get_conn()
       cur = conn.cursor()
       with open(data_path, "r") as file:
           cur.copy_expert(
-              "COPY \"Employees_temp\" FROM stdin WITH CSV HEADER DELIMITER AS ','", file
+              "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              file,
           )
       conn.commit()
 
-Here we are passing a ``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table
+Here we are passing a ``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table.
 
 .. code-block:: python
 
   @task
   def merge_data():
       query = """
-          delete
-          from "Employees" e using "Employees_temp" et
-          where e."Serial Number" = et."Serial Number";
+          DELETE FROM "Employees" e
+          USING "Employees_temp" et
+          WHERE e."Serial Number" = et."Serial Number";
 
-          insert into "Employees"
-          select *
-          from "Employees_temp";
+          INSERT INTO "Employees"
+          SELECT *
+          FROM "Employees_temp";
       """
       try:
           postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
@@ -455,7 +468,7 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
       except Exception as e:
           return 1
 
-Here we are first looking for duplicate values and removing them before we insert new values in our final table
+Here we are first looking for duplicate values and removing them before we insert new values in our final table.
 
 
 Lets look at our DAG:
@@ -478,23 +491,21 @@ Lets look at our DAG:
       @task
       def get_data():
           # NOTE: configure this as appropriate for your airflow environment
-          data_path = "/usr/local/airflow/dags/files/employees.csv"
+          data_path = "/opt/airflow/dags/files/employees.csv"
 
           url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
 
           response = requests.request("GET", url)
 
           with open(data_path, "w") as file:
-              for row in response.text.split("\n"):
-                  if row:
-                      file.write(row + "\n")
+              file.write(response.text)
 
           postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           with open(data_path, "r") as file:
               cur.copy_expert(
-                  "COPY \"Employees_temp\" FROM stdin WITH CSV HEADER DELIMITER AS ','",
+                  "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                   file,
               )
           conn.commit()
@@ -502,13 +513,13 @@ Lets look at our DAG:
       @task
       def merge_data():
           query = """
-                  delete
-                  from "Employees" e using "Employees_temp" et
-                  where e."Serial Number" = et."Serial Number";
+                  DELETE FROM "Employees" e
+                  USING "Employees_temp" et
+                  WHERE e."Serial Number" = et."Serial Number";
 
-                  insert into "Employees"
-                  select *
-                  from "Employees_temp";
+                  INSERT INTO "Employees"
+                  SELECT *
+                  FROM "Employees_temp";
                   """
           try:
               postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
@@ -526,14 +537,14 @@ Lets look at our DAG:
   dag = Etl()
 
 This dag runs daily at 00:00.
-Add this python file to airflow/dags folder (e.g. ``dags/etl.py``) and go back to the main folder and run
+Add this python file to airflow/dags folder (e.g. ``dags/etl.py``) and go back to the main folder and run:
 
 .. code-block:: bash
 
   docker-compose up airflow-init
   docker-compose up
 
-Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example
+Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example:
 
 .. image:: img/new_tutorial-1.png
 
@@ -541,7 +552,7 @@ Go to your browser and go to the site http://localhost:8080/home and trigger you
 .. image:: img/new_tutorial-2.png
 
 The DAG ran successfully as we can see the green boxes. If there had been an error the boxes would be red.
-Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows
+Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows.
 
 
 What's Next?