You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Krishna Garapati (Jira)" <ji...@apache.org> on 2019/09/04 14:04:00 UTC

[jira] [Commented] (AIRFLOW-3873) Issue with DAG dependency using ExternalTaskSensor

    [ https://issues.apache.org/jira/browse/AIRFLOW-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16922514#comment-16922514 ] 

Krishna Garapati commented on AIRFLOW-3873:
-------------------------------------------

No, I didn't use snakebite

> Issue with DAG dependency using ExternalTaskSensor
> --------------------------------------------------
>
>                 Key: AIRFLOW-3873
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3873
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG
>    Affects Versions: 1.10.1
>         Environment: Running on Redhat Linux box on which Airflow is Installed.
>            Reporter: Krishna Garapati
>            Priority: Critical
>
> I have two DAGs Created and want to set dependencies between them using externalTaskSensor as shown below. I am getting the error as "Broken DAG: [/data1/airflow/dags/testdagdependency.py] No module named snakebite.client". Please help me on this.
> ==================================================================
> *DAG 1:* 
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> default_args = {
>  'owner': 'Krishna Garapati',
>  'depends_on_past': False,
>  'start_date': datetime(2019, 2, 9),
>  'email': ['krishna.garapati@transamerica.com'],
>  'email_on_failure': True,
>  'email_on_retry': True,
>  'retries': 0,
>  'retry_delay': timedelta(minutes=5)
>  #'queue': 'finance-ingestion',
>  # 'run_as_user': 'sptfinactmodel'
>  # 'pool': 'backfill',
>  # 'priority_weight': 10,
>  # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG('pythontest',default_args=default_args,schedule_interval='27 2 * * *')
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
>  task_id='print_date',
>  bash_command='date',
>  dag=dag)
> t2 = BashOperator(
>  task_id='pythontest',
>  bash_command='\{{"python /preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
>  dag=dag)
> t2.set_upstream(t1)
>  
> ==========================================================
>  
> *DAG 2 ( Keeping dependency on DAG1)*
>  
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> from airflow.operators.sensors import ExternalTaskSensor
> default_args = {
>  'owner': 'Krishna Garapati',
>  'depends_on_past': False,
>  'start_date': datetime(2019, 2, 11),
>  'email': ['krishna.garapati@transamerica.com'],
>  'email_on_failure': True,
>  'email_on_retry': True,
>  'retries': 0,
>  'retry_delay': timedelta(minutes=5)
>  #'queue': 'finance-ingestion',
>  # 'run_as_user': 'sptfinactmodel'
>  # 'pool': 'backfill',
>  # 'priority_weight': 10,
>  # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG('testdagdependency',default_args=default_args,schedule_interval='27 15 * * *')
> wait_for_pythontest = ExternalTaskSensor(
>  task_id='wait_for_pythontest',
>  external_dag_id='pythontest',
>  external_task_id='pythontest',
>  execution_delta=None, # Same day as today
>  dag=dag)
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
>  task_id='print_date',
>  bash_command='date',
>  dag=dag)
> t2 = BashOperator(
>  task_id='testdependency',
>  bash_command='\{{"python /preprod/finance/financedatastagedev/scripts/airflowtest/hive/test.py"}}',
>  dag=dag)
> wait_for_pythontest >> testdagdependency
> t2.set_upstream(t1)
>  
> =====================================================



--
This message was sent by Atlassian Jira
(v8.3.2#803003)