You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/08/01 14:49:56 UTC
incubator-airflow git commit: [AIRFLOW-383] Cleanup example qubole
operator dag
Repository: incubator-airflow
Updated Branches:
refs/heads/master fdb7e9491 -> eb989dcb5
[AIRFLOW-383] Cleanup example qubole operator dag
Closes #1698 from yogesh2021/AIRFLOW-383
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb989dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb989dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb989dcb
Branch: refs/heads/master
Commit: eb989dcb54387c218683de9dff0a12950dce147b
Parents: fdb7e94
Author: Yogesh Garg <yo...@qubole.com>
Authored: Mon Aug 1 07:49:54 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Aug 1 07:49:54 2016 -0700
----------------------------------------------------------------------
.../example_dags/example_qubole_operator.py | 23 +++++++++-----------
1 file changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb989dcb/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index 63cccd3..ff363e6 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -32,7 +32,9 @@ default_args = {
'email_on_retry': False
}
-dag = DAG('example_qubole_operator', default_args=default_args)
+# NOTE:: This is only an example DAG to highlight usage of QuboleOperator in various scenarios,
+# some of the tasks may or may not work based on your QDS account setup
+dag = DAG('example_qubole_operator', default_args=default_args, schedule_interval='@daily')
def compare_result(ds, **kwargs):
ti = kwargs['ti']
@@ -45,14 +47,15 @@ t1 = QuboleOperator(
command_type='hivecmd',
query='show tables',
cluster_label='default',
- fetch_logs=True,
- tags='aiflow_example_run',
+ fetch_logs=True, # If true, will fetch qubole command logs and concatenate them into corresponding airflow task logs
+ tags='aiflow_example_run', # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
+ qubole_conn_id='qubole_default', # Connection id to submit commands inside QDS, if not set "qubole_default" is used
dag=dag)
t2 = QuboleOperator(
task_id='hive_s3_location',
command_type="hivecmd",
- script_location="s3n://dev.canopydata.com/airflow/show_table.hql",
+ script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
notfiy=True,
tags=['tag1', 'tag2'],
trigger_rule="all_done",
@@ -76,14 +79,12 @@ branching = BranchPythonOperator(
dag=dag)
branching.set_upstream(t3)
-
join = DummyOperator(
task_id='join',
trigger_rule='one_success',
dag=dag
)
-
t4 = QuboleOperator(
task_id='hadoop_jar_cmd',
command_type='hadoopcmd',
@@ -95,7 +96,7 @@ t4 = QuboleOperator(
t5 = QuboleOperator(
task_id='pig_cmd',
command_type="pigcmd",
- script_location="s3://paid-qubole/PigAPIDemo/scripts/script1-hadoop-s3-small.pig",
+ script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
parameters="key1=value1 key2=value2",
trigger_rule="all_done",
dag=dag)
@@ -104,7 +105,6 @@ t4.set_upstream(branching)
t5.set_upstream(t4)
t5.set_downstream(join)
-
t6 = QuboleOperator(
task_id='presto_cmd',
command_type='prestocmd',
@@ -114,7 +114,7 @@ t6 = QuboleOperator(
t7 = QuboleOperator(
task_id='shell_cmd',
command_type="shellcmd",
- script_location="s3://paid-qubole/ShellDemo/data/excite-small.sh",
+ script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
parameters="param1 param2",
trigger_rule="all_done",
dag=dag)
@@ -123,7 +123,6 @@ t6.set_upstream(branching)
t7.set_upstream(t6)
t7.set_downstream(join)
-
t8 = QuboleOperator(
task_id='db_query',
command_type='dbtapquerycmd',
@@ -146,7 +145,6 @@ t8.set_upstream(branching)
t9.set_upstream(t8)
t9.set_downstream(join)
-
t10 = QuboleOperator(
task_id='db_import',
command_type='dbimportcmd',
@@ -186,7 +184,7 @@ t11 = QuboleOperator(
task_id='spark_cmd',
command_type="sparkcmd",
program=prog,
- language='python',
+ language='scala',
arguments='--class SparkPi',
tags='aiflow_example_run',
dag=dag)
@@ -194,4 +192,3 @@ t11 = QuboleOperator(
t11.set_upstream(branching)
t11.set_downstream(t10)
t10.set_downstream(join)
-