You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/23 13:22:00 UTC
[airflow] 06/24: Doc: Improve tutorial documentation and code (#19186)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2cc9ed00b22245fcc9c9feb4dac36817a1447ff5
Author: Kian Yang Lee <ke...@gmail.com>
AuthorDate: Wed Oct 27 02:37:01 2021 +0800
Doc: Improve tutorial documentation and code (#19186)
1. Added instructions on adding postgres connection.
2. Modified proper SQL syntax.
3. Remove redundant lines when writing to CSV.
4. Added QUOTE argument for copy_expert
(cherry picked from commit f83099cd4c2eaecfd363fce4562aa19e975c146c)
---
docs/apache-airflow/tutorial.rst | 99 ++++++++++++++++++++++------------------
1 file changed, 55 insertions(+), 44 deletions(-)
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index d9587bc..acb7e84 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -377,73 +377,86 @@ We need to have docker and postgres installed.
We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
Follow the instructions properly to set up Airflow.
-Create a Employee table in postgres using this
+Create a Employee table in postgres using this:
.. code-block:: sql
- create table "Employees"
+ CREATE TABLE "Employees"
(
- "Serial Number" numeric not null
- constraint employees_pk
- primary key,
- "Company Name" text,
- "Employee Markme" text,
- "Description" text,
- "Leave" integer
+ "Serial Number" NUMERIC PRIMARY KEY,
+ "Company Name" TEXT,
+ "Employee Markme" TEXT,
+ "Description" TEXT,
+ "Leave" INTEGER
);
- create table "Employees_temp"
+ CREATE TABLE "Employees_temp"
(
- "Serial Number" numeric not null
- constraint employees_temp_pk
- primary key,
- "Company Name" text,
- "Employee Markme" text,
- "Description" text,
- "Leave" integer
+ "Serial Number" NUMERIC PRIMARY KEY,
+ "Company Name" TEXT,
+ "Employee Markme" TEXT,
+ "Description" TEXT,
+ "Leave" INTEGER
);
+We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
+
+- Conn id: LOCAL
+- Conn Type: postgres
+- Host: postgres
+- Schema: <DATABASE_NAME>
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.
Let's break this down into 2 steps: get data & merge data:
.. code-block:: python
+ from airflow.decorators import dag, task
+ from airflow.hooks.postgres import PostgresHook
+ from datetime import datetime, timedelta
+ import requests
+
+
@task
def get_data():
- data_path = "/usr/local/airflow/dags/files/employees.csv"
+ # NOTE: configure this as appropriate for your airflow environment
+ data_path = "/opt/airflow/dags/files/employees.csv"
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
- for row in response.text.split("\n"):
- if row:
- file.write(row + "\n")
+ file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
- "COPY \"Employees_temp\" FROM stdin WITH CSV HEADER DELIMITER AS ','", file
+ "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+ file,
)
conn.commit()
-Here we are passing a ``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table
+Here we are passing a ``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table.
.. code-block:: python
@task
def merge_data():
query = """
- delete
- from "Employees" e using "Employees_temp" et
- where e."Serial Number" = et."Serial Number";
+ DELETE FROM "Employees" e
+ USING "Employees_temp" et
+ WHERE e."Serial Number" = et."Serial Number";
- insert into "Employees"
- select *
- from "Employees_temp";
+ INSERT INTO "Employees"
+ SELECT *
+ FROM "Employees_temp";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
@@ -455,7 +468,7 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
except Exception as e:
return 1
-Here we are first looking for duplicate values and removing them before we insert new values in our final table
+Here we are first looking for duplicate values and removing them before we insert new values in our final table.
Lets look at our DAG:
@@ -478,23 +491,21 @@ Lets look at our DAG:
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
- data_path = "/usr/local/airflow/dags/files/employees.csv"
+ data_path = "/opt/airflow/dags/files/employees.csv"
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
- for row in response.text.split("\n"):
- if row:
- file.write(row + "\n")
+ file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
- "COPY \"Employees_temp\" FROM stdin WITH CSV HEADER DELIMITER AS ','",
+ "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@@ -502,13 +513,13 @@ Lets look at our DAG:
@task
def merge_data():
query = """
- delete
- from "Employees" e using "Employees_temp" et
- where e."Serial Number" = et."Serial Number";
+ DELETE FROM "Employees" e
+ USING "Employees_temp" et
+ WHERE e."Serial Number" = et."Serial Number";
- insert into "Employees"
- select *
- from "Employees_temp";
+ INSERT INTO "Employees"
+ SELECT *
+ FROM "Employees_temp";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
@@ -526,14 +537,14 @@ Lets look at our DAG:
dag = Etl()
This dag runs daily at 00:00.
-Add this python file to airflow/dags folder (e.g. ``dags/etl.py``) and go back to the main folder and run
+Add this python file to airflow/dags folder (e.g. ``dags/etl.py``) and go back to the main folder and run:
.. code-block:: bash
docker-compose up airflow-init
docker-compose up
-Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example
+Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example:
.. image:: img/new_tutorial-1.png
@@ -541,7 +552,7 @@ Go to your browser and go to the site http://localhost:8080/home and trigger you
.. image:: img/new_tutorial-2.png
The DAG ran successfully as we can see the green boxes. If there had been an error the boxes would be red.
-Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows
+Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows.
What's Next?