You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Krishna Garapati (Jira)" <ji...@apache.org> on 2019/09/04 14:04:00 UTC
[jira] [Commented] (AIRFLOW-3873) Issue with DAG dependency using
ExternalTaskSensor
[ https://issues.apache.org/jira/browse/AIRFLOW-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16922514#comment-16922514 ]
Krishna Garapati commented on AIRFLOW-3873:
-------------------------------------------
No, I didn't use snakebite
> Issue with DAG dependency using ExternalTaskSensor
> --------------------------------------------------
>
> Key: AIRFLOW-3873
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3873
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG
> Affects Versions: 1.10.1
> Environment: Running on Redhat Linux box on which Airflow is Installed.
> Reporter: Krishna Garapati
> Priority: Critical
>
> I have two DAGs Created and want to set dependencies between them using externalTaskSensor as shown below. I am getting the error as "Broken DAG: [/data1/airflow/dags/testdagdependency.py] No module named snakebite.client". Please help me on this.
> ==================================================================
> *DAG 1:*
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> default_args = {
> 'owner': 'Krishna Garapati',
> 'depends_on_past': False,
> 'start_date': datetime(2019, 2, 9),
> 'email': ['krishna.garapati@transamerica.com'],
> 'email_on_failure': True,
> 'email_on_retry': True,
> 'retries': 0,
> 'retry_delay': timedelta(minutes=5)
> #'queue': 'finance-ingestion',
> # 'run_as_user': 'sptfinactmodel'
> # 'pool': 'backfill',
> # 'priority_weight': 10,
> # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG('pythontest',default_args=default_args,schedule_interval='27 2 * * *')
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
> task_id='print_date',
> bash_command='date',
> dag=dag)
> t2 = BashOperator(
> task_id='pythontest',
> bash_command='\{{"python /preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
> dag=dag)
> t2.set_upstream(t1)
>
> ==========================================================
>
> *DAG 2 ( Keeping dependency on DAG1)*
>
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> from airflow.operators.sensors import ExternalTaskSensor
> default_args = {
> 'owner': 'Krishna Garapati',
> 'depends_on_past': False,
> 'start_date': datetime(2019, 2, 11),
> 'email': ['krishna.garapati@transamerica.com'],
> 'email_on_failure': True,
> 'email_on_retry': True,
> 'retries': 0,
> 'retry_delay': timedelta(minutes=5)
> #'queue': 'finance-ingestion',
> # 'run_as_user': 'sptfinactmodel'
> # 'pool': 'backfill',
> # 'priority_weight': 10,
> # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG('testdagdependency',default_args=default_args,schedule_interval='27 15 * * *')
> wait_for_pythontest = ExternalTaskSensor(
> task_id='wait_for_pythontest',
> external_dag_id='pythontest',
> external_task_id='pythontest',
> execution_delta=None, # Same day as today
> dag=dag)
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
> task_id='print_date',
> bash_command='date',
> dag=dag)
> t2 = BashOperator(
> task_id='testdependency',
> bash_command='\{{"python /preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
> dag=dag)
> wait_for_pythontest >> testdagdependency
> t2.set_upstream(t1)
>
> =====================================================
--
This message was sent by Atlassian Jira
(v8.3.2#803003)