You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/20 07:13:43 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #16084: Added new pipeline example for the tutorial docs (Issue #11208)

mik-laj commented on a change in pull request #16084:
URL: https://github.com/apache/airflow/pull/16084#discussion_r654889481



##########
File path: docs/apache-airflow/tutorial.rst
##########
@@ -376,3 +376,165 @@ Here's a few things you might want to do next:
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`
     - Write your first pipeline!
+
+Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+''''''''''''''''''''
+We need to have docker and postgres installed.
+We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
+Follow the instructions properly to set up Airflow.
+
+Create a Employee table in postgres using this
+
+.. code-block:: sql
+
+  create table "Employees"
+  (
+      "Serial Number" numeric not null
+   constraint employees_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+  create table "Employees_temp"
+  (
+      "Serial Number" numeric not null
+   constraint employees_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+
+Let's break this down into 3 steps: get data, insert data, merge data:
+
+.. code-block:: python
+
+  @task
+  def get_data():
+      url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
+
+      response = requests.request("GET", url)
+
+      with open("/usr/local/airflow/dags/files/employees.csv", "w") as file:
+          for row in response.text.split("\n"):
+              file.write(row)
+
+Here we are passing a``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance.
+
+.. code-block:: python
+
+  @task
+  def insert_data():
+      engine = create_engine(
+          "postgresql+psycopg2://postgres:password@localhost:5432/postgres"
+      )
+      df = pd.read_csv("/usr/local/airflow/dags/files/employees.csv")
+      df.to_sql("Employees_temp", con=engine, if_exists="replace", chunksize=1000)
+
+Here we are dumping the file into a temporary table before merging the data to the final employees table
+
+.. code-block:: python
+
+  @task
+  def merge_data():
+      query = """
+          delete
+          from "Employees" e using "Employees_temp" et
+          where e."Serial Number" = et."Serial Number";
+
+          insert into "Employees"
+          select *
+          from "Employees_temp";
+      """
+      try:
+          engine = create_engine(
+              "postgresql+psycopg2://postgres:password@localhost:5432/postgres"
+          )
+          session = sessionmaker(bind=engine)()
+          session.execute(query)
+          session.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+Here we are first looking for duplicate values and removing them before we insert new values in our final table
+
+
+Lets look at our DAG:
+
+.. code-block:: python
+
+  @dag(
+      schedule_interval="0 0 * * *",
+      start_date=datetime.today() - timedelta(days=2),
+      dagrun_timeout=timedelta(minutes=60),
+  )
+  def Etl():
+      @task
+      def get_data():
+          url = "https://docs.google.com/uc?export=download&id=1a0RGUW2oYxyhIQYuezG_u8cxgUaAQtZw"
+
+          response = requests.request("GET", url)
+
+          with open("employees.csv", "w") as file:
+              for row in response.text.split("\n"):
+                  file.write(row)
+
+      @task
+      def insert_data():
+          engine = create_engine(
+              "postgresql+psycopg2://postgres:password@localhost:5432/postgres"
+          )
+          df = pd.read_csv("employees.csv")
+          df.to_sql("Employees_temp", con=engine, if_exists="replace", chunksize=1000)
+
+      @task
+      def merge_data():
+          query = """
+      delete
+      from "Employees" e using "Employees_temp" et
+      where e."Serial Number" = et."Serial Number";
+
+      insert into "Employees"
+      select *
+      from "Employees_temp";
+      """
+          try:
+              engine = create_engine(
+                  "postgresql+psycopg2://postgres:password@localhost:5432/postgres"
+              )
+              session = sessionmaker(bind=engine)()
+              session.execute(query)
+              session.commit()
+              return 0
+          except Exception as e:
+              return 1
+
+      get_data() >> insert_data() >> merge_data()
+
+
+  dag = Etl()
+
+This dag runs daily at 00:00.
+Add this python file to airflow/dags folder and go back to the main folder and run
+
+.. code-block:: bash
+
+  docker-compose -f  docker-compose-LocalExecutor.yml up -d

Review comment:
       Where can I find this file? I think we only have the file for CeleryExecutor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org