You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/28 11:40:55 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #16084: Added new pipeline example for the tutorial docs (Issue #11208)

kaxil commented on a change in pull request #16084:
URL: https://github.com/apache/airflow/pull/16084#discussion_r641481316



##########
File path: docs/apache-airflow/tutorial.rst
##########
@@ -376,3 +376,133 @@ Here's a few things you might want to do next:
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`
     - Write your first pipeline!
+
+
+Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+'''''''''''''
+We need to have docker and postgres installed.
+We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
+Follow the instructions properly to set up Airflow.
+
+Create a Employee table in postgres using this
+
+.. code-block:: sql
+
+  create table "Employees"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+  create table "Employees_temp"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+I have broken this down into 3 steps get data, insert data, merge data.Lets look at the code:

Review comment:
       ```suggestion
   Let's break this down into 3 steps: get data, insert data, merge data:
   ```

##########
File path: docs/apache-airflow/tutorial.rst
##########
@@ -376,3 +376,133 @@ Here's a few things you might want to do next:
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`
     - Write your first pipeline!
+
+
+Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+'''''''''''''
+We need to have docker and postgres installed.
+We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
+Follow the instructions properly to set up Airflow.
+
+Create a Employee table in postgres using this
+
+.. code-block:: sql
+
+  create table "Employees"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+  create table "Employees_temp"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+I have broken this down into 3 steps get data, insert data, merge data.Lets look at the code:
+
+.. code-block:: python
+
+  def get_data():
+      url = "https://docs.google.com/uc?export=download&id=1a0RGUW2oYxyhIQYuezG_u8cxgUaAQtZw"
+
+      payload = {}
+      headers = {}
+
+      response = requests.request("GET", url, headers=headers, data=payload)
+
+      with open('/usr/local/airflow/dags/files/employees.csv', 'w') as file:
+          for row in response.text.split('\n'):
+              file.write(row)
+
+Here we are passing a get request to get the data from the url and save it employees.csv file on our airflow instance

Review comment:
       ```suggestion
   Here we are passing a`GET` request to get the data from the URL and save it in `employees.csv` file on our Airflow instance.
   ```

##########
File path: docs/apache-airflow/tutorial.rst
##########
@@ -376,3 +376,133 @@ Here's a few things you might want to do next:
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`
     - Write your first pipeline!
+
+
+Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+'''''''''''''
+We need to have docker and postgres installed.
+We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
+Follow the instructions properly to set up Airflow.
+
+Create a Employee table in postgres using this
+
+.. code-block:: sql
+
+  create table "Employees"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+  create table "Employees_temp"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+I have broken this down into 3 steps get data, insert data, merge data.Lets look at the code:
+
+.. code-block:: python
+
+  def get_data():
+      url = "https://docs.google.com/uc?export=download&id=1a0RGUW2oYxyhIQYuezG_u8cxgUaAQtZw"
+
+      payload = {}
+      headers = {}
+
+      response = requests.request("GET", url, headers=headers, data=payload)
+
+      with open('/usr/local/airflow/dags/files/employees.csv', 'w') as file:
+          for row in response.text.split('\n'):
+              file.write(row)
+
+Here we are passing a get request to get the data from the url and save it employees.csv file on our airflow instance
+
+.. code-block:: python
+
+  def insert_data():
+      engine = create_engine("postgresql+psycopg2://postgres:password@localhost:5432/postgres")
+      df = pd.read_csv('/usr/local/airflow/dags/files/employees.csv')
+      df.to_sql('Employees_temp', con=engine, if_exists='replace')
+
+Here we are dumping the file into a temporary table before merging the data to the final employees table
+
+.. code-block:: python
+
+  def merge_data():
+      query = """
+          delete
+          from "Employees" e using "Employees_temp" et
+          where e."Serial Number" = et."Serial Number";
+
+          insert into "Employees"
+          select *
+          from "Employees_temp";
+      """
+      try:
+          engine = create_engine("postgresql+psycopg2://postgres:password@localhost:5432/postgres")
+          session = sessionmaker(bind=engine)()
+          session.execute(query)
+          session.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+Here we are first looking for duplicate values and removing them before we insert new values in our final table
+
+
+Lets look at our DAG:
+
+.. code-block:: python
+
+  with DAG(
+          dag_id='AirflowExample',
+          schedule_interval='0 0 * * *',
+          start_date=datetime.today() - timedelta(days=2),
+          dagrun_timeout=timedelta(minutes=60)
+  ) as dag:
+      t1 = PythonOperator(
+          task_id='get_data',
+          python_callable=get_data,
+      )
+
+      t2 = PythonOperator(
+          task_id='insert_data',
+          python_callable=get_data,
+      )
+
+      t3 = PythonOperator(
+          task_id='merge_data',
+          python_callable=get_data,
+      )
+
+      t1 >> t2 >> t3
+
+This dag runs daily at 00:00.
+Add this python file to airflow/dags folder and go back to the main folder and run
+
+.. code-block:: bash
+
+  docker-compose -f  docker-compose-LocalExecutor.yml up -d
+
+Go to your browser and go to the site http://localhost:8080/admin/ and trigger your DAG Airflow Example
+
+.. image:: https://user-images.githubusercontent.com/35194828/119649317-1d148300-be40-11eb-9525-33ecf7eb6181.png
+
+
+.. image:: https://user-images.githubusercontent.com/35194828/119649304-1b4abf80-be40-11eb-8632-64f0d2c7dbb2.png
+

Review comment:
       Let's include these images in the PR and Airflow repo: https://github.com/apache/airflow/tree/master/docs/apache-airflow/img
   
   Otherwise when those images are deleted for whatever reason, this will be broken

##########
File path: docs/apache-airflow/tutorial.rst
##########
@@ -376,3 +376,133 @@ Here's a few things you might want to do next:
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`
     - Write your first pipeline!
+
+
+Lets look at another example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to look at removing duplicate rows while inserting.
+
+Initial setup
+'''''''''''''
+We need to have docker and postgres installed.
+We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
+Follow the instructions properly to set up Airflow.
+
+Create a Employee table in postgres using this
+
+.. code-block:: sql
+
+  create table "Employees"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+  create table "Employees_temp"
+  (
+      "Serial Number" numeric not null
+   constraint employees\_pk
+              primary key,
+      "Company Name" text,
+      "Employee Markme" text,
+      "Description" text,
+      "Leave" integer
+  );
+
+I have broken this down into 3 steps get data, insert data, merge data.Lets look at the code:
+
+.. code-block:: python
+
+  def get_data():
+      url = "https://docs.google.com/uc?export=download&id=1a0RGUW2oYxyhIQYuezG_u8cxgUaAQtZw"
+
+      payload = {}
+      headers = {}
+
+      response = requests.request("GET", url, headers=headers, data=payload)
+
+      with open('/usr/local/airflow/dags/files/employees.csv', 'w') as file:
+          for row in response.text.split('\n'):
+              file.write(row)

Review comment:
       ```suggestion
         response = requests.request("GET", url)
   
         with open('/usr/local/airflow/dags/files/employees.csv', 'w') as file:
             for row in response.text.split('\n'):
                 file.write(row)
   ```
   
   Since we are not passing them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org