You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/25 11:01:28 UTC

[GitHub] [airflow] potiuk commented on a diff in pull request #25890: Reorganize tutorials into a section

potiuk commented on code in PR #25890:
URL: https://github.com/apache/airflow/pull/25890#discussion_r954799592


##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is hosted online and insert it into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start docker-compose installation <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for this example.
+The steps below should be sufficient, but see the quick-start documentation for full instructions.
+
+.. code-block:: bash
+
+  # Download the docker-compose.yaml file
+  curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
+
+  # Make expected directories and set an expected environment variable
+  mkdir -p ./dags ./logs ./plugins
+  echo -e "AIRFLOW_UID=$(id -u)" > .env
+
+  # Initialize the database
+  docker-compose up airflow-init
+
+  # Start up all services
+  docker-compose up
+
+After all services have started up, the web UI will be available at: ``http://localhost:8080``. The default account has the username ``airflow`` and the password ``airflow``.
+
+We will also need to create a `connection <https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_ to the postgres db. To create one via the web UI, from the "Admin" menu, select "Connections", then click the Plus sign to "Add a new record" to the list of connections.
+
+Fill in the fields as shown below. Note the Connection Id value, which we'll pass as a parameter for the ``postgres_conn_id`` kwarg.
+
+- Connection Id: tutorial_pg_conn
+- Connection Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+Test your connection and if the test is successful, save your connection.
+
+Table Creation Tasks
+--------------------
+
+We can use the `PostgresOperator <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_ to define tasks that create tables in our postgres db.
+
+We'll create one table to facilitate data cleaning steps (``employees_temp``) and another table to store our cleaned data (``employees``).
+
+.. code-block:: python
+
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          CREATE TABLE IF NOT EXISTS employees (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+  create_employees_temp_table = PostgresOperator(
+      task_id="create_employees_temp_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          DROP TABLE IF EXISTS employees_temp;
+          CREATE TABLE employees_temp (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+Optional: Using SQL From Files
+------------------------------
+
+If you want to abstract these sql statements out of your DAG, you can move the statements sql files somewhere within the ``dags/`` directory and pass the sql file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in ``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
+
+.. code-block:: python
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="sql/employees_schema.sql",
+  )
+
+and repeat for the ``employees_temp`` table.
+
+Data Retrieval Task
+-------------------
+
+Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps.
+
+.. code-block:: python
+
+  import os
+  import requests
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def get_data():
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
+      os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+      url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
+
+      response = requests.request("GET", url)
+
+      with open(data_path, "w") as file:
+          file.write(response.text)
+
+      postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+      conn = postgres_hook.get_conn()
+      cur = conn.cursor()
+      with open(data_path, "r") as file:
+          cur.copy_expert(
+              "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              file,
+          )
+      conn.commit()
+
+Data Merge Task
+---------------
+
+Here we select completely unique records from the retrieved data, then we check to see if any employee ``Serial Numbers`` are already in the database (if they are, we update those records with the new data).
+
+.. code-block:: python
+
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def merge_data():
+      query = """
+          INSERT INTO employees
+          SELECT *
+          FROM (
+              SELECT DISTINCT *
+              FROM employees_temp
+          )
+          ON CONFLICT ("Serial Number") DO UPDATE
+          SET "Serial Number" = excluded."Serial Number";
+      """
+      try:
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          cur.execute(query)
+          conn.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+
+
+Completing our DAG
+------------------
+
+We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to:
+
+* run every day at midnight starting on Jan 1, 2021,
+* only run once in the event that days are missed, and
+* timeout after 60 minutes
+
+And from the last line in the definition of the ``Etl`` DAG, we see:

Review Comment:
   I think we should get rid of ETL here (and also in the examples - including the TaskFlow tutorial. 
   
   This is something that always bothered me, even if you do ETL with Airflow, you usually do it completely differently than described in this tutorial. 
   
   I do not think ETL is as "catchy" thing that was when the documentation was initially created, and I see no harm with completely getting rid of it here. We should stop pretending this is somewhat "real" ETL example that people should follow - because it is as far from the real ETL as it can be.
   



##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is hosted online and insert it into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start docker-compose installation <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for this example.
+The steps below should be sufficient, but see the quick-start documentation for full instructions.
+
+.. code-block:: bash
+
+  # Download the docker-compose.yaml file
+  curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
+
+  # Make expected directories and set an expected environment variable
+  mkdir -p ./dags ./logs ./plugins
+  echo -e "AIRFLOW_UID=$(id -u)" > .env
+
+  # Initialize the database
+  docker-compose up airflow-init
+
+  # Start up all services
+  docker-compose up
+
+After all services have started up, the web UI will be available at: ``http://localhost:8080``. The default account has the username ``airflow`` and the password ``airflow``.
+
+We will also need to create a `connection <https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_ to the postgres db. To create one via the web UI, from the "Admin" menu, select "Connections", then click the Plus sign to "Add a new record" to the list of connections.
+
+Fill in the fields as shown below. Note the Connection Id value, which we'll pass as a parameter for the ``postgres_conn_id`` kwarg.
+
+- Connection Id: tutorial_pg_conn
+- Connection Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+Test your connection and if the test is successful, save your connection.
+
+Table Creation Tasks
+--------------------
+
+We can use the `PostgresOperator <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_ to define tasks that create tables in our postgres db.
+
+We'll create one table to facilitate data cleaning steps (``employees_temp``) and another table to store our cleaned data (``employees``).
+
+.. code-block:: python
+
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          CREATE TABLE IF NOT EXISTS employees (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+  create_employees_temp_table = PostgresOperator(
+      task_id="create_employees_temp_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          DROP TABLE IF EXISTS employees_temp;
+          CREATE TABLE employees_temp (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+Optional: Using SQL From Files
+------------------------------
+
+If you want to abstract these sql statements out of your DAG, you can move the statements sql files somewhere within the ``dags/`` directory and pass the sql file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in ``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
+
+.. code-block:: python
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="sql/employees_schema.sql",
+  )
+
+and repeat for the ``employees_temp`` table.
+
+Data Retrieval Task
+-------------------
+
+Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps.
+
+.. code-block:: python
+
+  import os
+  import requests
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def get_data():
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
+      os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+      url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
+
+      response = requests.request("GET", url)
+
+      with open(data_path, "w") as file:
+          file.write(response.text)
+
+      postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+      conn = postgres_hook.get_conn()
+      cur = conn.cursor()
+      with open(data_path, "r") as file:
+          cur.copy_expert(
+              "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              file,
+          )
+      conn.commit()
+
+Data Merge Task
+---------------
+
+Here we select completely unique records from the retrieved data, then we check to see if any employee ``Serial Numbers`` are already in the database (if they are, we update those records with the new data).
+
+.. code-block:: python
+
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def merge_data():
+      query = """
+          INSERT INTO employees
+          SELECT *
+          FROM (
+              SELECT DISTINCT *
+              FROM employees_temp
+          )
+          ON CONFLICT ("Serial Number") DO UPDATE
+          SET "Serial Number" = excluded."Serial Number";
+      """
+      try:
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          cur.execute(query)
+          conn.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+
+
+Completing our DAG
+------------------
+
+We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to:
+
+* run every day at midnight starting on Jan 1, 2021,
+* only run once in the event that days are missed, and
+* timeout after 60 minutes
+
+And from the last line in the definition of the ``Etl`` DAG, we see:
+
+.. code-block:: python
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
+
+* the ``merge_data()`` task depends on the ``get_data()`` task,
+* the ``get_data()`` depends on both the ``create_employees_table`` and ``create_employees_temp_table`` tasks, and
+* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can run independently.
+
+Putting all of the pieces together, we have our completed DAG.
+
+.. code-block:: python
+
+  import datetime
+  import pendulum
+  import os
+
+  import requests
+  from airflow.decorators import dag, task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+
+  @dag(
+      schedule="0 0 * * *",
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+      catchup=False,
+      dagrun_timeout=datetime.timedelta(minutes=60),
+  )
+  def Etl():
+      create_employees_table = PostgresOperator(
+          task_id="create_employees_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              CREATE TABLE IF NOT EXISTS employees (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      create_employees_temp_table = PostgresOperator(
+          task_id="create_employees_temp_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              DROP TABLE IF EXISTS employees_temp;
+              CREATE TABLE employees_temp (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      @task
+      def get_data():
+          # NOTE: configure this as appropriate for your airflow environment
+          data_path = "/opt/airflow/dags/files/employees.csv"
+          os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+          url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
+
+          response = requests.request("GET", url)
+
+          with open(data_path, "w") as file:
+              file.write(response.text)
+
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          with open(data_path, "r") as file:
+              cur.copy_expert(
+                  "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+                  file,
+              )
+          conn.commit()
+
+      @task
+      def merge_data():
+          query = """
+              INSERT INTO employees
+              SELECT *
+              FROM (
+                  SELECT DISTINCT *
+                  FROM employees_temp
+              )
+              ON CONFLICT ("Serial Number") DO UPDATE
+              SET "Serial Number" = excluded."Serial Number";
+          """
+          try:
+              postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+              conn = postgres_hook.get_conn()
+              cur = conn.cursor()
+              cur.execute(query)
+              conn.commit()
+              return 0
+          except Exception as e:
+              return 1
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
+
+
+  dag = Etl()
+
+Save this code to a python file in the ``/dags`` folder (e.g. ``dags/etl.py``) and (after a `brief delay <https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-dir-list-interval>`_), the ``Etl`` DAG will be included in the list of available DAGs on the web UI.
+
+.. image:: ../img/new_tutorial-1.png
+
+You can trigger the ``Etl`` DAG by unpausing it (via the slider on the left end) and running it (via the Run button under **Actions**).
+
+.. image:: ../img/new_tutorial-3.png

Review Comment:
   This image and the description should be re-done. It has TreeView which is gone now.



##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is hosted online and insert it into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start docker-compose installation <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for this example.

Review Comment:
   This is direct URL which is wrong . It should point to the moved page using :doc: directive and different name (it's not a quick-start any more).



##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is hosted online and insert it into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start docker-compose installation <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for this example.
+The steps below should be sufficient, but see the quick-start documentation for full instructions.
+
+.. code-block:: bash
+
+  # Download the docker-compose.yaml file
+  curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
+
+  # Make expected directories and set an expected environment variable
+  mkdir -p ./dags ./logs ./plugins
+  echo -e "AIRFLOW_UID=$(id -u)" > .env
+
+  # Initialize the database
+  docker-compose up airflow-init
+
+  # Start up all services
+  docker-compose up
+
+After all services have started up, the web UI will be available at: ``http://localhost:8080``. The default account has the username ``airflow`` and the password ``airflow``.
+
+We will also need to create a `connection <https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_ to the postgres db. To create one via the web UI, from the "Admin" menu, select "Connections", then click the Plus sign to "Add a new record" to the list of connections.
+
+Fill in the fields as shown below. Note the Connection Id value, which we'll pass as a parameter for the ``postgres_conn_id`` kwarg.
+
+- Connection Id: tutorial_pg_conn
+- Connection Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+Test your connection and if the test is successful, save your connection.
+
+Table Creation Tasks
+--------------------
+
+We can use the `PostgresOperator <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_ to define tasks that create tables in our postgres db.
+
+We'll create one table to facilitate data cleaning steps (``employees_temp``) and another table to store our cleaned data (``employees``).
+
+.. code-block:: python
+
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          CREATE TABLE IF NOT EXISTS employees (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+  create_employees_temp_table = PostgresOperator(
+      task_id="create_employees_temp_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          DROP TABLE IF EXISTS employees_temp;
+          CREATE TABLE employees_temp (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+Optional: Using SQL From Files
+------------------------------
+
+If you want to abstract these sql statements out of your DAG, you can move the statements sql files somewhere within the ``dags/`` directory and pass the sql file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in ``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
+
+.. code-block:: python
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="sql/employees_schema.sql",
+  )
+
+and repeat for the ``employees_temp`` table.
+
+Data Retrieval Task
+-------------------
+
+Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps.
+
+.. code-block:: python
+
+  import os
+  import requests
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def get_data():
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
+      os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+      url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
+
+      response = requests.request("GET", url)
+
+      with open(data_path, "w") as file:
+          file.write(response.text)
+
+      postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+      conn = postgres_hook.get_conn()
+      cur = conn.cursor()
+      with open(data_path, "r") as file:
+          cur.copy_expert(
+              "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              file,
+          )
+      conn.commit()
+
+Data Merge Task
+---------------
+
+Here we select completely unique records from the retrieved data, then we check to see if any employee ``Serial Numbers`` are already in the database (if they are, we update those records with the new data).
+
+.. code-block:: python
+
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def merge_data():
+      query = """
+          INSERT INTO employees
+          SELECT *
+          FROM (
+              SELECT DISTINCT *
+              FROM employees_temp
+          )
+          ON CONFLICT ("Serial Number") DO UPDATE
+          SET "Serial Number" = excluded."Serial Number";
+      """
+      try:
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          cur.execute(query)
+          conn.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+
+
+Completing our DAG
+------------------
+
+We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to:
+
+* run every day at midnight starting on Jan 1, 2021,
+* only run once in the event that days are missed, and
+* timeout after 60 minutes
+
+And from the last line in the definition of the ``Etl`` DAG, we see:
+
+.. code-block:: python
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
+
+* the ``merge_data()`` task depends on the ``get_data()`` task,
+* the ``get_data()`` depends on both the ``create_employees_table`` and ``create_employees_temp_table`` tasks, and
+* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can run independently.
+
+Putting all of the pieces together, we have our completed DAG.
+
+.. code-block:: python
+
+  import datetime
+  import pendulum
+  import os
+
+  import requests
+  from airflow.decorators import dag, task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+
+  @dag(
+      schedule="0 0 * * *",
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+      catchup=False,
+      dagrun_timeout=datetime.timedelta(minutes=60),
+  )
+  def Etl():
+      create_employees_table = PostgresOperator(
+          task_id="create_employees_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              CREATE TABLE IF NOT EXISTS employees (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      create_employees_temp_table = PostgresOperator(
+          task_id="create_employees_temp_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              DROP TABLE IF EXISTS employees_temp;
+              CREATE TABLE employees_temp (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      @task
+      def get_data():
+          # NOTE: configure this as appropriate for your airflow environment
+          data_path = "/opt/airflow/dags/files/employees.csv"
+          os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+          url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
+
+          response = requests.request("GET", url)
+
+          with open(data_path, "w") as file:
+              file.write(response.text)
+
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          with open(data_path, "r") as file:
+              cur.copy_expert(
+                  "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+                  file,
+              )
+          conn.commit()
+
+      @task
+      def merge_data():
+          query = """
+              INSERT INTO employees
+              SELECT *
+              FROM (
+                  SELECT DISTINCT *
+                  FROM employees_temp
+              )
+              ON CONFLICT ("Serial Number") DO UPDATE
+              SET "Serial Number" = excluded."Serial Number";
+          """
+          try:
+              postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+              conn = postgres_hook.get_conn()
+              cur = conn.cursor()
+              cur.execute(query)
+              conn.commit()
+              return 0
+          except Exception as e:
+              return 1
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
+
+
+  dag = Etl()
+
+Save this code to a python file in the ``/dags`` folder (e.g. ``dags/etl.py``) and (after a `brief delay <https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-dir-list-interval>`_), the ``Etl`` DAG will be included in the list of available DAGs on the web UI.
+
+.. image:: ../img/new_tutorial-1.png
+
+You can trigger the ``Etl`` DAG by unpausing it (via the slider on the left end) and running it (via the Run button under **Actions**).
+
+.. image:: ../img/new_tutorial-3.png
+
+In the ``Etl`` DAG's **Tree** view, we see all that all tasks ran successfully in all executed runs. Success!

Review Comment:
   We should have a "What's Next?" section here as well. 



##########
docs/apache-airflow/tutorial/fundamentals.rst:
##########
@@ -107,7 +107,7 @@ Operators
 
 An operator defines a unit of work for Airflow to complete. Using operators is the classic approach
 to defining work in Airflow. For some use cases, it's better to use the TaskFlow API to define
-work in a Pythonic context as described in :doc:`/tutorial_taskflow_api`. For now, using operators helps to
+work in a Pythonic context as described in :doc:`taskflow`. For now, using operators helps to

Review Comment:
   I think I would use a stronger wording here in favour of Task Flow and make it more of a "branch off" point here. 
   I believe we should put more effort  here in explaining WHEN TaskFlow is better and why people might want to use it - without them going to see the tutorial yet. This is - I think - the best place to do that and have a quick overview of where classic operators are better and where TaskFlow approach is. The current description does not help to make a decision for the reader, they have to read both tutorials to decide which one is better and since the "taskflow" is a sepearate doc, they will naturally go and check it less frequently, without realising that for them maybe TaskFlow-first is better. approach.
   
   I think when somone gets to this place in the tutorial, they should be presented with two options (broadly speaking it shoudl likely be formulated better) :
   
   1) If you are mostly interested in using the broad set of existing integrations and you are not interested in writing a lot of Python code and your Python experience is limitied, then likely chosing "classic"  black-box operator approach is easier to start with and you should continue reading, But checking out of Taskflow once you get familiar with classic operators should also be recommmended and we should add "taskfllow tutorial to "what's next" section at the end.
   
   2) But if you are experienced Python developer and your case is to create more customised and flexible pipelines that could also use the integrations but do not fall into the black-box operator approach, TaskFlow is for you and you'd do better starting with it rather than continuing to read the "classic" approach (you might still come back to it, but Taskflow-first should be the first thing to read so that you do not imprint the "classic" approach. DAGs written in TaskFlow look very differently from the Classic ones and for a number of people, unlearning what they've seen with the classic DAGs might be quite difficult.
   
   I think we should also mention the "classic" vs. "modern"  approach or be a bit more stonger on tasfklow being a more "Pythonic". Not sure about the wording. I think this approach will be much better in promoting the "taskflow" approach to more "Pythonic" users, who might be put off by the "classic" apprroach (when you look at the taskflow, and you are experienced Python developer, the classic approach simply looks terribly ugly and  kinda "you don't do it in Python". So those users should be made aware they do not havet to learn the classic way to use Airflow - and even discourage them from doing so.
   
   I think the words "black-box" vs. "flexible and Pythonic" very well describes the difference between the two. Both have benefits for different kinds of users and I think being able to respond to both of them here by properly redirecting them is crucial.



##########
docs/apache-airflow/tutorial/fundamentals.rst:
##########
@@ -694,7 +403,7 @@ Here's a few things you might want to do next:
     - Take an in-depth tour of the UI - click all the things!
     - Keep reading the docs!
 
-      - Review the :doc:`how-to guides<howto/index>`, which include a guide to writing your own operator
+      - Review the :doc:`how-to guides</howto/index>`, which include a guide to writing your own operator
       - Review the :ref:`Command Line Interface Reference<cli>`
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`

Review Comment:
   I thin we need to add "TaskFlow tutorial" here with explanation why you want to do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org