You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 14:37:31 UTC

[airflow] 25/28: Fix postgres part of pipeline example of tutorial (#21586)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 13cc4881684219f8fb36cbc0106bea28a6d398aa
Author: KevinYanesG <75...@users.noreply.github.com>
AuthorDate: Tue Feb 15 19:26:30 2022 +0100

    Fix postgres part of pipeline example of tutorial (#21586)
    
    (cherry picked from commit 40028f3ea3e78a9cf0db9de6b16fa67fa730dd7a)
---
 docs/apache-airflow/tutorial.rst | 67 ++++++++++++++++++++++++----------------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 085be42..7a2245f 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -381,11 +381,30 @@ We need to have docker and postgres installed.
 We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
 Follow the instructions properly to set up Airflow.
 
-Create a Employee table in postgres using this:
+You can use the postgres_default connection:
+
+- Conn id: postgres_default
+- Conn Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+
+
+After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection. For
+
+
+Open up a postgres shell:
+
+.. code-block:: bash
+
+  ./airflow.sh airflow db shell
+
+Create the Employees table with:
 
 .. code-block:: sql
 
-  CREATE TABLE "Employees"
+  CREATE TABLE EMPLOYEES
   (
       "Serial Number" NUMERIC PRIMARY KEY,
       "Company Name" TEXT,
@@ -394,7 +413,11 @@ Create a Employee table in postgres using this:
       "Leave" INTEGER
   );
 
-  CREATE TABLE "Employees_temp"
+Afterwards, create the Employees_temp table:
+
+.. code-block:: sql
+
+  CREATE TABLE EMPLOYEES_TEMP
   (
       "Serial Number" NUMERIC PRIMARY KEY,
       "Company Name" TEXT,
@@ -403,17 +426,9 @@ Create a Employee table in postgres using this:
       "Leave" INTEGER
   );
 
-We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
+We are now ready write the DAG.
 
-- Conn id: LOCAL
-- Conn Type: postgres
-- Host: postgres
-- Schema: <DATABASE_NAME>
-- Login: airflow
-- Password: airflow
-- Port: 5432
 
-After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.
 
 Let's break this down into 2 steps: get data & merge data:
 
@@ -436,12 +451,12 @@ Let's break this down into 2 steps: get data & merge data:
       with open(data_path, "w") as file:
           file.write(response.text)
 
-      postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+      postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
       conn = postgres_hook.get_conn()
       cur = conn.cursor()
       with open(data_path, "r") as file:
           cur.copy_expert(
-              "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
               file,
           )
       conn.commit()
@@ -457,16 +472,16 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
   @task
   def merge_data():
       query = """
-          DELETE FROM "Employees" e
-          USING "Employees_temp" et
+          DELETE FROM EMPLOYEES e
+          USING EMPLOYEES_TEMP et
           WHERE e."Serial Number" = et."Serial Number";
 
-          INSERT INTO "Employees"
+          INSERT INTO EMPLOYEES
           SELECT *
-          FROM "Employees_temp";
+          FROM EMPLOYEES_TEMP;
       """
       try:
-          postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+          postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           cur.execute(query)
@@ -509,12 +524,12 @@ Lets look at our DAG:
           with open(data_path, "w") as file:
               file.write(response.text)
 
-          postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+          postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           with open(data_path, "r") as file:
               cur.copy_expert(
-                  "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+                  "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                   file,
               )
           conn.commit()
@@ -522,16 +537,16 @@ Lets look at our DAG:
       @task
       def merge_data():
           query = """
-                  DELETE FROM "Employees" e
-                  USING "Employees_temp" et
+                  DELETE FROM EMPLOYEES e
+                  USING EMPLOYEES_TEMP et
                   WHERE e."Serial Number" = et."Serial Number";
 
-                  INSERT INTO "Employees"
+                  INSERT INTO EMPLOYEES
                   SELECT *
-                  FROM "Employees_temp";
+                  FROM EMPLOYEES_TEMP;
                   """
           try:
-              postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+              postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
               conn = postgres_hook.get_conn()
               cur = conn.cursor()
               cur.execute(query)