You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/16 09:59:59 UTC
[airflow] 12/19: Fix postgres part of pipeline example of tutorial (#21586)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 445ae53db7a086c2153cdcbf5447baa7191d05af
Author: KevinYanesG <75...@users.noreply.github.com>
AuthorDate: Tue Feb 15 19:26:30 2022 +0100
Fix postgres part of pipeline example of tutorial (#21586)
(cherry picked from commit 40028f3ea3e78a9cf0db9de6b16fa67fa730dd7a)
---
docs/apache-airflow/tutorial.rst | 67 ++++++++++++++++++++++++----------------
1 file changed, 41 insertions(+), 26 deletions(-)
diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 085be42..7a2245f 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -381,11 +381,30 @@ We need to have docker and postgres installed.
We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
Follow the instructions properly to set up Airflow.
-Create a Employee table in postgres using this:
+You can use the postgres_default connection:
+
+- Conn id: postgres_default
+- Conn Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+
+
+After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection. For
+
+
+Open up a postgres shell:
+
+.. code-block:: bash
+
+ ./airflow.sh airflow db shell
+
+Create the Employees table with:
.. code-block:: sql
- CREATE TABLE "Employees"
+ CREATE TABLE EMPLOYEES
(
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
@@ -394,7 +413,11 @@ Create a Employee table in postgres using this:
"Leave" INTEGER
);
- CREATE TABLE "Employees_temp"
+Afterwards, create the Employees_temp table:
+
+.. code-block:: sql
+
+ CREATE TABLE EMPLOYEES_TEMP
(
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
@@ -403,17 +426,9 @@ Create a Employee table in postgres using this:
"Leave" INTEGER
);
-We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
+We are now ready write the DAG.
-- Conn id: LOCAL
-- Conn Type: postgres
-- Host: postgres
-- Schema: <DATABASE_NAME>
-- Login: airflow
-- Password: airflow
-- Port: 5432
-After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.
Let's break this down into 2 steps: get data & merge data:
@@ -436,12 +451,12 @@ Let's break this down into 2 steps: get data & merge data:
with open(data_path, "w") as file:
file.write(response.text)
- postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+ postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
- "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+ "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@@ -457,16 +472,16 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
@task
def merge_data():
query = """
- DELETE FROM "Employees" e
- USING "Employees_temp" et
+ DELETE FROM EMPLOYEES e
+ USING EMPLOYEES_TEMP et
WHERE e."Serial Number" = et."Serial Number";
- INSERT INTO "Employees"
+ INSERT INTO EMPLOYEES
SELECT *
- FROM "Employees_temp";
+ FROM EMPLOYEES_TEMP;
"""
try:
- postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+ postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
@@ -509,12 +524,12 @@ Lets look at our DAG:
with open(data_path, "w") as file:
file.write(response.text)
- postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+ postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
- "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+ "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@@ -522,16 +537,16 @@ Lets look at our DAG:
@task
def merge_data():
query = """
- DELETE FROM "Employees" e
- USING "Employees_temp" et
+ DELETE FROM EMPLOYEES e
+ USING EMPLOYEES_TEMP et
WHERE e."Serial Number" = et."Serial Number";
- INSERT INTO "Employees"
+ INSERT INTO EMPLOYEES
SELECT *
- FROM "Employees_temp";
+ FROM EMPLOYEES_TEMP;
"""
try:
- postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+ postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)