You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/05/09 22:06:15 UTC

incubator-airflow git commit: [AIRFLOW-80] Move example_twitter dag to contrib/example_dags as it requires hive

Repository: incubator-airflow
Updated Branches:
  refs/heads/master eb0960975 -> 61f35782f


[AIRFLOW-80] Move example_twitter dag to contrib/example_dags as it requires hive


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61f35782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61f35782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61f35782

Branch: refs/heads/master
Commit: 61f35782fa75e13ebad0c3692de5475e3664d93b
Parents: eb09609
Author: Stephen Cattaneo <sc...@agari.com>
Authored: Mon May 9 11:43:06 2016 -0700
Committer: Stephen Cattaneo <sc...@agari.com>
Committed: Mon May 9 12:06:08 2016 -0700

----------------------------------------------------------------------
 .../example_dags/example_twitter_README.md      |  36 ++++
 .../contrib/example_dags/example_twitter_dag.py | 183 +++++++++++++++++++
 airflow/example_dags/example_twitter_README.md  |  36 ----
 airflow/example_dags/example_twitter_dag.py     | 183 -------------------
 tests/core.py                                   |   2 +-
 5 files changed, 220 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61f35782/airflow/contrib/example_dags/example_twitter_README.md
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_README.md b/airflow/contrib/example_dags/example_twitter_README.md
new file mode 100644
index 0000000..03ecc66
--- /dev/null
+++ b/airflow/contrib/example_dags/example_twitter_README.md
@@ -0,0 +1,36 @@
+# Example Twitter DAG
+
+***Introduction:*** This example dag depicts a typical ETL process and is a perfect use case automation scenario for Airflow. Please note that the main scripts associated with the tasks are returning None. The purpose of this DAG is to demonstrate how to write a functional DAG within Airflow.
+
+**Background:** Twitter is a social networking platform that enables users to send or broadcast short messages (140 Characters). A user has a user ID, i.e. JohnDoe, which is also known as a Twitter Handle. A short message, or tweet, can either be sent directed at another user using the @ symbol (i.e. @JohnDoe) or can be broadcast with a hashtag # followed by the topic name. *As most of the data on twitter is public, and twitter provides a generous API to retrieve these data, Twitter is the so called Gold Mine for Text Mining based data analytic.* This example DAG was driven out of our real use case, where we have used the SEARCH API from twitter to retrieve tweets from yesterday. The DAG is scheduled to run each day, and therefore works in an ETL fashion.
+
+***Overview:*** At first, we need tasks that will get the tweets of our interest and save them on the hard-disk. Then, we need subsequent tasks that will clean and analyze the tweets. Then we want to store these files into HDFS, and load them into a Data Warehousing platform like Hive or HBase. The main reason we have selected Hive here is because it gives us a familiar SQL like interface, and makes our life of writing different queries a lot easier. Finally, the DAG needs to store a summarized result to a traditional database, i.e. MySQL or PostgreSQL, which is used by a reporting or business intelligence application. In other words, we basically want to achieve the following steps:
+
+1. Fetch Tweets
+2. Clean Tweets
+3. Analyze Tweets
+4. Put Tweets to HDFS
+5. Load data to Hive
+6. Save Summary to MySQL
+
+***Screenshot:***
+<img src="http://i.imgur.com/rRpSO12.png" width="99%"/>
+
+***Example Structure:*** In this example dag, we are collecting tweets for four users account or twitter handle. Each twitter handle has two channels, incoming tweets and outgoing tweets. Hence, in this example, by running the fetch_tweet task, we should have eight output files. For better management, each of the eight output files should be saved with the yesterday's date (we are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are using three kind of operators: PythonOperator, BashOperator, and HiveOperator. However, for this example only the Python scripts are stored externally. Hence this example DAG only has the following directory structure:
+
+The python functions here are just placeholders. In case you are interested to actually make this DAG fully functional, first start with filling out the scripts as seperate files and importing them into the DAG with absolute or relative import. My approach was to store the retrieved data in memory using Pandas dataframe first, and then use the built in method to save the CSV file on hard-disk.
+The eight different CSV files are then put into eight different folders within HDFS. Each of the newly inserted files are then loaded into eight different external hive tables. Hive tables can be external or internal. In this case, we are inserting the data right into the table, and so we are making our tables internal. Each file is inserted into the respected Hive table named after the twitter channel, i.e. toTwitter_A or fromTwitter_A. It is also important to note that when we created the tables, we facilitated for partitioning by date using the variable dt and declared comma as the row deliminator. The partitioning is very handy and ensures our query execution time remains constant even with growing volume of data.
+As most probably these folders and hive tables doesn't exist in your system, you will get an error for these tasks within the DAG. If you rebuild a function DAG from this example, make sure those folders and hive tables exists. When you create the table, keep the consideration of table partitioning and declaring comma as the row deliminator in your mind. Furthermore, you may also need to skip headers on each read and ensure that the user under which you have Airflow running has the right permission access. Below is a sample HQL snippet on creating such table:
+```
+CREATE TABLE toTwitter_A(id BIGINT, id_str STRING
+						created_at STRING, text STRING)
+						PARTITIONED BY (dt STRING)
+						ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+						STORED AS TEXTFILE;
+						alter table toTwitter_A SET serdeproperties ('skip.header.line.count' = '1');
+```
+When you review the code for the DAG, you will notice that these tasks are generated using for loop. These two for loops could be combined into one loop. However, in most cases, you will be running different analysis on your incoming incoming and outgoing tweets, and hence they are kept seperated in this example.
+Final step is a running the broker script, brokerapi.py, which will run queries in Hive and store the summarized data to MySQL in our case. To connect to Hive, pyhs2 library is extremely useful and easy to use. To insert data into MySQL from Python, sqlalchemy is also a good one to use.
+I hope you find this tutorial useful. If you have question feel free to ask me on [Twitter](https://twitter.com/EkhtiarSyed) or via the live Airflow chatroom room in [Gitter](https://gitter.im/airbnb/airflow).<p>
+-Ekhtiar Syed
+Last Update: 8-April-2016

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61f35782/airflow/contrib/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py
new file mode 100644
index 0000000..d7fffd8
--- /dev/null
+++ b/airflow/contrib/example_dags/example_twitter_dag.py
@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# --------------------------------------------------------------------------------
+# Written By: Ekhtiar Syed
+# Last Update: 8th April 2016
+# Caveat: This Dag will not run because of missing scripts.
+# The purpose of this is to give you a sample of a real world example DAG!
+# --------------------------------------------------------------------------------
+
+# --------------------------------------------------------------------------------
+# Load The Dependencies
+# --------------------------------------------------------------------------------
+
+from airflow import DAG
+from airflow.operators import BashOperator, HiveOperator, PythonOperator
+from datetime import datetime, date, timedelta
+
+# --------------------------------------------------------------------------------
+# Create a few placeholder scripts. In practice these would be different python
+# script files, which are imported in this section with absolute or relative imports
+# --------------------------------------------------------------------------------
+
+
+def fetchtweets():
+    return None
+
+
+def cleantweets():
+    return None
+
+
+def analyzetweets():
+    return None
+
+
+def transfertodb():
+    return None
+
+
+# --------------------------------------------------------------------------------
+# set default arguments
+# --------------------------------------------------------------------------------
+
+default_args = {
+    'owner': 'Ekhtiar',
+    'depends_on_past': False,
+    'start_date': datetime(2016, 3, 13),
+    'email': ['airflow@airflow.com'],
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'retries': 1,
+    'retry_delay': timedelta(minutes=5),
+    # 'queue': 'bash_queue',
+    # 'pool': 'backfill',
+    # 'priority_weight': 10,
+    # 'end_date': datetime(2016, 1, 1),
+}
+
+dag = DAG(
+    'example_twitter_dag', default_args=default_args,
+    schedule_interval="@daily")
+
+# --------------------------------------------------------------------------------
+# This task should call Twitter API and retrieve tweets from yesterday from and to
+# for the four twitter users (Twitter_A,..,Twitter_D) There should be eight csv
+# output files generated by this task and naming convention
+# is direction(from or to)_twitterHandle_date.csv
+# --------------------------------------------------------------------------------
+
+fetch_tweets = PythonOperator(
+    task_id='fetch_tweets',
+    python_callable=fetchtweets,
+    dag=dag)
+
+# --------------------------------------------------------------------------------
+# Clean the eight files. In this step you can get rid of or cherry pick columns
+# and different parts of the text
+# --------------------------------------------------------------------------------
+
+clean_tweets = PythonOperator(
+    task_id='clean_tweets',
+    python_callable=cleantweets,
+    dag=dag)
+
+clean_tweets.set_upstream(fetch_tweets)
+
+# --------------------------------------------------------------------------------
+# In this section you can use a script to analyze the twitter data. Could simply
+# be a sentiment analysis through algorithms like bag of words or something more
+# complicated. You can also take a look at Web Services to do such tasks
+# --------------------------------------------------------------------------------
+
+analyze_tweets = PythonOperator(
+    task_id='analyze_tweets',
+    python_callable=analyzetweets,
+    dag=dag)
+
+analyze_tweets.set_upstream(clean_tweets)
+
+# --------------------------------------------------------------------------------
+# Although this is the last task, we need to declare it before the next tasks as we
+# will use set_downstream This task will extract summary from Hive data and store
+# it to MySQL
+# --------------------------------------------------------------------------------
+
+hive_to_mysql = PythonOperator(
+    task_id='hive_to_mysql',
+    python_callable=transfertodb,
+    dag=dag)
+
+# --------------------------------------------------------------------------------
+# The following tasks are generated using for loop. The first task puts the eight
+# csv files to HDFS. The second task loads these files from HDFS to respected Hive
+# tables. These two for loops could be combined into one loop. However, in most cases,
+# you will be running different analysis on your incoming incoming and outgoing tweets,
+# and hence they are kept seperated in this example.
+# --------------------------------------------------------------------------------
+
+from_channels = ['fromTwitter_A', 'fromTwitter_B', 'fromTwitter_C', 'fromTwitter_D']
+to_channels = ['toTwitter_A', 'toTwitter_B', 'toTwitter_C', 'toTwitter_D']
+yesterday = date.today() - timedelta(days=1)
+dt = yesterday.strftime("%Y-%m-%d")
+# define where you want to store the tweets csv file in your local directory
+local_dir = "/tmp/"
+# define the location where you want to store in HDFS
+hdfs_dir = " /tmp/"
+
+for channel in to_channels:
+
+    file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
+
+    load_to_hdfs = BashOperator(
+        task_id="put_" + channel + "_to_hdfs",
+        bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " +
+                     local_dir + file_name +
+                     hdfs_dir + channel + "/",
+        dag=dag)
+
+    load_to_hdfs.set_upstream(analyze_tweets)
+
+    load_to_hive = HiveOperator(
+        task_id="load_" + channel + "_to_hive",
+        hql="LOAD DATA INPATH '" +
+            hdfs_dir + channel + "/" + file_name + "' "
+            "INTO TABLE " + channel + " "
+            "PARTITION(dt='" + dt + "')",
+        dag=dag)
+    load_to_hive.set_upstream(load_to_hdfs)
+    load_to_hive.set_downstream(hive_to_mysql)
+
+for channel in from_channels:
+    file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
+    load_to_hdfs = BashOperator(
+        task_id="put_" + channel + "_to_hdfs",
+        bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " +
+                     local_dir + file_name +
+                     hdfs_dir + channel + "/",
+        dag=dag)
+
+    load_to_hdfs.set_upstream(analyze_tweets)
+
+    load_to_hive = HiveOperator(
+        task_id="load_" + channel + "_to_hive",
+        hql="LOAD DATA INPATH '" +
+            hdfs_dir + channel + "/" + file_name + "' "
+            "INTO TABLE " + channel + " "
+            "PARTITION(dt='" + dt + "')",
+        dag=dag)
+
+    load_to_hive.set_upstream(load_to_hdfs)
+    load_to_hive.set_downstream(hive_to_mysql)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61f35782/airflow/example_dags/example_twitter_README.md
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_twitter_README.md b/airflow/example_dags/example_twitter_README.md
deleted file mode 100644
index 03ecc66..0000000
--- a/airflow/example_dags/example_twitter_README.md
+++ /dev/null
@@ -1,36 +0,0 @@
-# Example Twitter DAG
-
-***Introduction:*** This example dag depicts a typical ETL process and is a perfect use case automation scenario for Airflow. Please note that the main scripts associated with the tasks are returning None. The purpose of this DAG is to demonstrate how to write a functional DAG within Airflow.
-
-**Background:** Twitter is a social networking platform that enables users to send or broadcast short messages (140 Characters). A user has a user ID, i.e. JohnDoe, which is also known as a Twitter Handle. A short message, or tweet, can either be sent directed at another user using the @ symbol (i.e. @JohnDoe) or can be broadcast with a hashtag # followed by the topic name. *As most of the data on twitter is public, and twitter provides a generous API to retrieve these data, Twitter is the so called Gold Mine for Text Mining based data analytic.* This example DAG was driven out of our real use case, where we have used the SEARCH API from twitter to retrieve tweets from yesterday. The DAG is scheduled to run each day, and therefore works in an ETL fashion.
-
-***Overview:*** At first, we need tasks that will get the tweets of our interest and save them on the hard-disk. Then, we need subsequent tasks that will clean and analyze the tweets. Then we want to store these files into HDFS, and load them into a Data Warehousing platform like Hive or HBase. The main reason we have selected Hive here is because it gives us a familiar SQL like interface, and makes our life of writing different queries a lot easier. Finally, the DAG needs to store a summarized result to a traditional database, i.e. MySQL or PostgreSQL, which is used by a reporting or business intelligence application. In other words, we basically want to achieve the following steps:
-
-1. Fetch Tweets
-2. Clean Tweets
-3. Analyze Tweets
-4. Put Tweets to HDFS
-5. Load data to Hive
-6. Save Summary to MySQL
-
-***Screenshot:***
-<img src="http://i.imgur.com/rRpSO12.png" width="99%"/>
-
-***Example Structure:*** In this example dag, we are collecting tweets for four users account or twitter handle. Each twitter handle has two channels, incoming tweets and outgoing tweets. Hence, in this example, by running the fetch_tweet task, we should have eight output files. For better management, each of the eight output files should be saved with the yesterday's date (we are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are using three kind of operators: PythonOperator, BashOperator, and HiveOperator. However, for this example only the Python scripts are stored externally. Hence this example DAG only has the following directory structure:
-
-The python functions here are just placeholders. In case you are interested to actually make this DAG fully functional, first start with filling out the scripts as seperate files and importing them into the DAG with absolute or relative import. My approach was to store the retrieved data in memory using Pandas dataframe first, and then use the built in method to save the CSV file on hard-disk.
-The eight different CSV files are then put into eight different folders within HDFS. Each of the newly inserted files are then loaded into eight different external hive tables. Hive tables can be external or internal. In this case, we are inserting the data right into the table, and so we are making our tables internal. Each file is inserted into the respected Hive table named after the twitter channel, i.e. toTwitter_A or fromTwitter_A. It is also important to note that when we created the tables, we facilitated for partitioning by date using the variable dt and declared comma as the row deliminator. The partitioning is very handy and ensures our query execution time remains constant even with growing volume of data.
-As most probably these folders and hive tables doesn't exist in your system, you will get an error for these tasks within the DAG. If you rebuild a function DAG from this example, make sure those folders and hive tables exists. When you create the table, keep the consideration of table partitioning and declaring comma as the row deliminator in your mind. Furthermore, you may also need to skip headers on each read and ensure that the user under which you have Airflow running has the right permission access. Below is a sample HQL snippet on creating such table:
-```
-CREATE TABLE toTwitter_A(id BIGINT, id_str STRING
-						created_at STRING, text STRING)
-						PARTITIONED BY (dt STRING)
-						ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
-						STORED AS TEXTFILE;
-						alter table toTwitter_A SET serdeproperties ('skip.header.line.count' = '1');
-```
-When you review the code for the DAG, you will notice that these tasks are generated using for loop. These two for loops could be combined into one loop. However, in most cases, you will be running different analysis on your incoming incoming and outgoing tweets, and hence they are kept seperated in this example.
-Final step is a running the broker script, brokerapi.py, which will run queries in Hive and store the summarized data to MySQL in our case. To connect to Hive, pyhs2 library is extremely useful and easy to use. To insert data into MySQL from Python, sqlalchemy is also a good one to use.
-I hope you find this tutorial useful. If you have question feel free to ask me on [Twitter](https://twitter.com/EkhtiarSyed) or via the live Airflow chatroom room in [Gitter](https://gitter.im/airbnb/airflow).<p>
--Ekhtiar Syed
-Last Update: 8-April-2016

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61f35782/airflow/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_twitter_dag.py b/airflow/example_dags/example_twitter_dag.py
deleted file mode 100644
index d7fffd8..0000000
--- a/airflow/example_dags/example_twitter_dag.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# --------------------------------------------------------------------------------
-# Written By: Ekhtiar Syed
-# Last Update: 8th April 2016
-# Caveat: This Dag will not run because of missing scripts.
-# The purpose of this is to give you a sample of a real world example DAG!
-# --------------------------------------------------------------------------------
-
-# --------------------------------------------------------------------------------
-# Load The Dependencies
-# --------------------------------------------------------------------------------
-
-from airflow import DAG
-from airflow.operators import BashOperator, HiveOperator, PythonOperator
-from datetime import datetime, date, timedelta
-
-# --------------------------------------------------------------------------------
-# Create a few placeholder scripts. In practice these would be different python
-# script files, which are imported in this section with absolute or relative imports
-# --------------------------------------------------------------------------------
-
-
-def fetchtweets():
-    return None
-
-
-def cleantweets():
-    return None
-
-
-def analyzetweets():
-    return None
-
-
-def transfertodb():
-    return None
-
-
-# --------------------------------------------------------------------------------
-# set default arguments
-# --------------------------------------------------------------------------------
-
-default_args = {
-    'owner': 'Ekhtiar',
-    'depends_on_past': False,
-    'start_date': datetime(2016, 3, 13),
-    'email': ['airflow@airflow.com'],
-    'email_on_failure': False,
-    'email_on_retry': False,
-    'retries': 1,
-    'retry_delay': timedelta(minutes=5),
-    # 'queue': 'bash_queue',
-    # 'pool': 'backfill',
-    # 'priority_weight': 10,
-    # 'end_date': datetime(2016, 1, 1),
-}
-
-dag = DAG(
-    'example_twitter_dag', default_args=default_args,
-    schedule_interval="@daily")
-
-# --------------------------------------------------------------------------------
-# This task should call Twitter API and retrieve tweets from yesterday from and to
-# for the four twitter users (Twitter_A,..,Twitter_D) There should be eight csv
-# output files generated by this task and naming convention
-# is direction(from or to)_twitterHandle_date.csv
-# --------------------------------------------------------------------------------
-
-fetch_tweets = PythonOperator(
-    task_id='fetch_tweets',
-    python_callable=fetchtweets,
-    dag=dag)
-
-# --------------------------------------------------------------------------------
-# Clean the eight files. In this step you can get rid of or cherry pick columns
-# and different parts of the text
-# --------------------------------------------------------------------------------
-
-clean_tweets = PythonOperator(
-    task_id='clean_tweets',
-    python_callable=cleantweets,
-    dag=dag)
-
-clean_tweets.set_upstream(fetch_tweets)
-
-# --------------------------------------------------------------------------------
-# In this section you can use a script to analyze the twitter data. Could simply
-# be a sentiment analysis through algorithms like bag of words or something more
-# complicated. You can also take a look at Web Services to do such tasks
-# --------------------------------------------------------------------------------
-
-analyze_tweets = PythonOperator(
-    task_id='analyze_tweets',
-    python_callable=analyzetweets,
-    dag=dag)
-
-analyze_tweets.set_upstream(clean_tweets)
-
-# --------------------------------------------------------------------------------
-# Although this is the last task, we need to declare it before the next tasks as we
-# will use set_downstream This task will extract summary from Hive data and store
-# it to MySQL
-# --------------------------------------------------------------------------------
-
-hive_to_mysql = PythonOperator(
-    task_id='hive_to_mysql',
-    python_callable=transfertodb,
-    dag=dag)
-
-# --------------------------------------------------------------------------------
-# The following tasks are generated using for loop. The first task puts the eight
-# csv files to HDFS. The second task loads these files from HDFS to respected Hive
-# tables. These two for loops could be combined into one loop. However, in most cases,
-# you will be running different analysis on your incoming incoming and outgoing tweets,
-# and hence they are kept seperated in this example.
-# --------------------------------------------------------------------------------
-
-from_channels = ['fromTwitter_A', 'fromTwitter_B', 'fromTwitter_C', 'fromTwitter_D']
-to_channels = ['toTwitter_A', 'toTwitter_B', 'toTwitter_C', 'toTwitter_D']
-yesterday = date.today() - timedelta(days=1)
-dt = yesterday.strftime("%Y-%m-%d")
-# define where you want to store the tweets csv file in your local directory
-local_dir = "/tmp/"
-# define the location where you want to store in HDFS
-hdfs_dir = " /tmp/"
-
-for channel in to_channels:
-
-    file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
-
-    load_to_hdfs = BashOperator(
-        task_id="put_" + channel + "_to_hdfs",
-        bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " +
-                     local_dir + file_name +
-                     hdfs_dir + channel + "/",
-        dag=dag)
-
-    load_to_hdfs.set_upstream(analyze_tweets)
-
-    load_to_hive = HiveOperator(
-        task_id="load_" + channel + "_to_hive",
-        hql="LOAD DATA INPATH '" +
-            hdfs_dir + channel + "/" + file_name + "' "
-            "INTO TABLE " + channel + " "
-            "PARTITION(dt='" + dt + "')",
-        dag=dag)
-    load_to_hive.set_upstream(load_to_hdfs)
-    load_to_hive.set_downstream(hive_to_mysql)
-
-for channel in from_channels:
-    file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv"
-    load_to_hdfs = BashOperator(
-        task_id="put_" + channel + "_to_hdfs",
-        bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " +
-                     local_dir + file_name +
-                     hdfs_dir + channel + "/",
-        dag=dag)
-
-    load_to_hdfs.set_upstream(analyze_tweets)
-
-    load_to_hive = HiveOperator(
-        task_id="load_" + channel + "_to_hive",
-        hql="LOAD DATA INPATH '" +
-            hdfs_dir + channel + "/" + file_name + "' "
-            "INTO TABLE " + channel + " "
-            "PARTITION(dt='" + dt + "')",
-        dag=dag)
-
-    load_to_hive.set_upstream(load_to_hdfs)
-    load_to_hive.set_downstream(hive_to_mysql)
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61f35782/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 651ee8b..8c33de2 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -48,7 +48,7 @@ from airflow.configuration import AirflowConfigException
 
 import six
 
-NUM_EXAMPLE_DAGS = 16
+NUM_EXAMPLE_DAGS = 15
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')