You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/08/01 14:49:56 UTC

incubator-airflow git commit: [AIRFLOW-383] Cleanup example qubole operator dag

Repository: incubator-airflow
Updated Branches:
  refs/heads/master fdb7e9491 -> eb989dcb5


[AIRFLOW-383] Cleanup example qubole operator dag

Closes #1698 from yogesh2021/AIRFLOW-383


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb989dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb989dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb989dcb

Branch: refs/heads/master
Commit: eb989dcb54387c218683de9dff0a12950dce147b
Parents: fdb7e94
Author: Yogesh Garg <yo...@qubole.com>
Authored: Mon Aug 1 07:49:54 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Aug 1 07:49:54 2016 -0700

----------------------------------------------------------------------
 .../example_dags/example_qubole_operator.py     | 23 +++++++++-----------
 1 file changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb989dcb/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index 63cccd3..ff363e6 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -32,7 +32,9 @@ default_args = {
     'email_on_retry': False
 }
 
-dag = DAG('example_qubole_operator', default_args=default_args)
+# NOTE:: This is only an example DAG to highlight usage of QuboleOperator in various scenarios,
+# some of the tasks may or may not work based on your QDS account setup
+dag = DAG('example_qubole_operator', default_args=default_args, schedule_interval='@daily')
 
 def compare_result(ds, **kwargs):
     ti = kwargs['ti']
@@ -45,14 +47,15 @@ t1 = QuboleOperator(
     command_type='hivecmd',
     query='show tables',
     cluster_label='default',
-    fetch_logs=True,
-    tags='aiflow_example_run',
+    fetch_logs=True, # If true, will fetch qubole command logs and concatenate them into corresponding airflow task logs
+    tags='aiflow_example_run',  # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
+    qubole_conn_id='qubole_default',  # Connection id to submit commands inside QDS, if not set "qubole_default" is used
     dag=dag)
 
 t2 = QuboleOperator(
     task_id='hive_s3_location',
     command_type="hivecmd",
-    script_location="s3n://dev.canopydata.com/airflow/show_table.hql",
+    script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
     notfiy=True,
     tags=['tag1', 'tag2'],
     trigger_rule="all_done",
@@ -76,14 +79,12 @@ branching = BranchPythonOperator(
     dag=dag)
 branching.set_upstream(t3)
 
-
 join = DummyOperator(
     task_id='join',
     trigger_rule='one_success',
     dag=dag
 )
 
-
 t4 = QuboleOperator(
     task_id='hadoop_jar_cmd',
     command_type='hadoopcmd',
@@ -95,7 +96,7 @@ t4 = QuboleOperator(
 t5 = QuboleOperator(
     task_id='pig_cmd',
     command_type="pigcmd",
-    script_location="s3://paid-qubole/PigAPIDemo/scripts/script1-hadoop-s3-small.pig",
+    script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
     parameters="key1=value1 key2=value2",
     trigger_rule="all_done",
     dag=dag)
@@ -104,7 +105,6 @@ t4.set_upstream(branching)
 t5.set_upstream(t4)
 t5.set_downstream(join)
 
-
 t6 = QuboleOperator(
     task_id='presto_cmd',
     command_type='prestocmd',
@@ -114,7 +114,7 @@ t6 = QuboleOperator(
 t7 = QuboleOperator(
     task_id='shell_cmd',
     command_type="shellcmd",
-    script_location="s3://paid-qubole/ShellDemo/data/excite-small.sh",
+    script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
     parameters="param1 param2",
     trigger_rule="all_done",
     dag=dag)
@@ -123,7 +123,6 @@ t6.set_upstream(branching)
 t7.set_upstream(t6)
 t7.set_downstream(join)
 
-
 t8 = QuboleOperator(
     task_id='db_query',
     command_type='dbtapquerycmd',
@@ -146,7 +145,6 @@ t8.set_upstream(branching)
 t9.set_upstream(t8)
 t9.set_downstream(join)
 
-
 t10 = QuboleOperator(
     task_id='db_import',
     command_type='dbimportcmd',
@@ -186,7 +184,7 @@ t11 = QuboleOperator(
     task_id='spark_cmd',
     command_type="sparkcmd",
     program=prog,
-    language='python',
+    language='scala',
     arguments='--class SparkPi',
     tags='aiflow_example_run',
     dag=dag)
@@ -194,4 +192,3 @@ t11 = QuboleOperator(
 t11.set_upstream(branching)
 t11.set_downstream(t10)
 t10.set_downstream(join)
-