You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kasim (Jira)" <ji...@apache.org> on 2019/10/28 04:33:00 UTC

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

     [ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

kasim updated AIRFLOW-5795:
---------------------------
    Attachment: image-2019-10-28-12-32-55-531.png

> Airflow cached old Code and Veriables 
> --------------------------------------
>
>                 Key: AIRFLOW-5795
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, database
>    Affects Versions: 1.10.3
>            Reporter: kasim
>            Priority: Major
>         Attachments: 1)A]~UL_N@TVU072M)68WHP.png, image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):    
>     version = "V21"
>     
>     dag_start_date = datetime(2019, 10, 25)    
>     sf_schedule_report = "30 8 * * *"
>     sf_schedule_etl = '30 1 * * *'
>     sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
>     for k, v in sf_config.items():
>         print(f'Overwrite {k} by {v}')
>         if hasattr(Config, k):
>             if k == 'dag_start_date':
>                 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
>             else:
>                 setattr(Config, k, v)
> print('#### config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print('#### config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
>     start_date=Config.dag_start_date,
>     default_args=default_args, 
>     schedule_interval=schedule,
>     user_defined_filters={
>         'mod' : lambda s, d:s%d
>     },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-25-56-300.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing <Task(BranchPythonOperator): branch_task> on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     -------
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     spark-submit    xxxxx
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     -------
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task 
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     -------
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     spark-submit    xxxxx
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task 
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     -------
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task     
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task Config.forecast_output_path: /xxxxx/data/dm/sales_forecast/results/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }}
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task Config.s3_forecast_output_path: s3_path/data/dm/sales_forecast/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }}
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task export HADOOP_USER_NAME=hdfs & kinit -kt /etc/security/hdfs.keytab hdfs & hadoop distcp -update -delete /xxxxx/data/dm/sales_forecast/results/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }} s3_path/data/dm/sales_forecast/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }}
> [2019-10-17 02:45:16,705] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {cli.py:517} INFO - Running <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [running]> on host dc36
> [2019-10-17 02:45:16,728] {python_operator.py:104} INFO - Exporting the following env vars:
> AIRFLOW_CTX_DAG_ID=dm_sf_main_V21
> AIRFLOW_CTX_TASK_ID=branch_task
> AIRFLOW_CTX_EXECUTION_DATE=2019-10-17T01:45:00+08:00
> AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:16,728] {logging_mixin.py:95} INFO - ti.execution_date.hour: 1
> [2019-10-17 02:45:16,728] {python_operator.py:113} INFO - Done. Returned value was: sales_forecast_prophet_V21
> [2019-10-17 02:45:16,728] {python_operator.py:143} INFO - Following branch ['sales_forecast_prophet_V21']
> [2019-10-17 02:45:16,729] {python_operator.py:144} INFO - Marking other directly downstream tasks as skipped
> [2019-10-17 02:45:16,828] {python_operator.py:163} INFO - Done.
> [2019-10-17 02:45:20,634] {logging_mixin.py:95} INFO - [2019-10-17 02:45:20,633] {jobs.py:2562} INFO - Task exited with return code 0
> [2019-10-27 19:16:28,950] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-27 19:16:28,960] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-27 19:16:28,960] {__init__.py:1353} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-27 19:16:28,960] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-27 19:16:28,961] {__init__.py:1355} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-27 19:16:29,035] {__init__.py:1374} INFO - Executing <Task(BranchPythonOperator): branch_task> on 2019-10-17T01:45:00+08:00
> [2019-10-27 19:16:29,036] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '421646', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmpn1ew5nup']
> [2019-10-27 19:16:29,511] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task [2019-10-27 19:16:29,511] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28949
> [2019-10-27 19:16:29,680] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task [2019-10-27 19:16:29,679] {__init__.py:51} INFO - Using executor CeleryExecutor
> [2019-10-27 19:16:29,961] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task [2019-10-27 19:16:29,959] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-27 19:16:30,052] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task Overwrite dag_start_date by 2019-10-25
> [2019-10-27 19:16:30,053] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task Overwrite version by V21
> [2019-10-27 19:16:30,053] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task Overwrite sf_schedule_report by 30 9 * * *
> [2019-10-27 19:16:30,053] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task Overwrite sf_schedule_etl by 40 2 * * *
> [2019-10-27 19:16:30,053] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task Overwrite sf_schedule_main by 45 2,3,4,5,6,7 * * *
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task #### config ### 
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task CONFIG_KEY: sf_config_V21
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task CONFIG DICT: {'dag_start_date': '2019-10-25', 'version': 'V21', 'sf_schedule_report': '30 9 * * *', 'sf_schedule_etl': '40 2 * * *', 'sf_schedule_main': '45 2,3,4,5,6,7 * * *'}
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task #### config end ### 
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,054] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task schedule: 45 2,3,4,5,6,7 * * *
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task divisor: 5
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     -------
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     
> [2019-10-27 19:16:30,055] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     spark-submit      xxxxx
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     -------
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     -------
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     
> [2019-10-27 19:16:30,056] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     spark-submit     xxxxx
> [2019-10-27 19:16:30,057] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task 
> [2019-10-27 19:16:30,057] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     -------
> [2019-10-27 19:16:30,057] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task     
> [2019-10-27 19:16:30,057] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task export HADOOP_USER_NAME=hdfs && kinit -kt /etc/security/hdfs.keytab hdfs && hadoop distcp -update -delete /xxxxx/data/dm/sales_forecast/results/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }} s3_path/data/dm/sales_forecast/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }}
> [2019-10-27 19:16:30,057] {base_task_runner.py:101} INFO - Job 421646: Subtask branch_task [2019-10-27 19:16:30,051] {cli.py:517} INFO - Running <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [running]> on host dc36
> [2019-10-27 19:16:30,086] {python_operator.py:104} INFO - Exporting the following env vars:
> AIRFLOW_CTX_DAG_ID=dm_sf_main_V21
> AIRFLOW_CTX_TASK_ID=branch_task
> AIRFLOW_CTX_EXECUTION_DATE=2019-10-17T01:45:00+08:00
> AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-17T01:45:00+08:00
> [2019-10-27 19:16:30,086] {logging_mixin.py:95} INFO - ti.execution_date.hour: 1
> [2019-10-27 19:16:30,086] {python_operator.py:113} INFO - Done. Returned value was: sales_forecast_prophet
> [2019-10-27 19:16:30,086] {python_operator.py:143} INFO - Following branch ['sales_forecast_prophet']
> [2019-10-27 19:16:30,087] {python_operator.py:144} INFO - Marking other directly downstream tasks as skipped
> [2019-10-27 19:16:30,245] {python_operator.py:163} INFO - Done.
> [2019-10-27 19:16:33,935] {logging_mixin.py:95} INFO - [2019-10-27 19:16:33,933] {jobs.py:2562} INFO - Task exited with return code 0
> [2019-10-27 22:59:50,913] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-27 22:59:50,927] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-27 22:59:50,927] {__init__.py:1353} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-27 22:59:50,928] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-27 22:59:50,928] {__init__.py:1355} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-27 22:59:50,983] {__init__.py:1374} INFO - Executing <Task(BranchPythonOperator): branch_task> on 2019-10-17T01:45:00+08:00
> [2019-10-27 22:59:50,984] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '428425', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmperygo54p']
> [2019-10-27 22:59:51,432] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task [2019-10-27 22:59:51,431] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=15555
> [2019-10-27 22:59:51,584] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task [2019-10-27 22:59:51,584] {__init__.py:51} INFO - Using executor CeleryExecutor
> [2019-10-27 22:59:51,868] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task [2019-10-27 22:59:51,866] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-27 22:59:51,955] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task Overwrite dag_start_date by 2019-10-25
> [2019-10-27 22:59:51,957] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task Overwrite sf_schedule_report by 30 9 * * *
> [2019-10-27 22:59:51,957] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task Overwrite sf_schedule_etl by 40 2 * * *
> [2019-10-27 22:59:51,957] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task Overwrite sf_schedule_main by 45 2,3,4,5,6,7 * * *
> [2019-10-27 22:59:51,957] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task #### config ### 
> [2019-10-27 22:59:51,957] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,957] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task CONFIG_KEY: sf_config_V21
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task CONFIG DICT: {'dag_start_date': '2019-10-25', 'xxxx': 'xxxxxx', 'sf_schedule_report': '30 9 * * *', 'sf_schedule_etl': '40 2 * * *', 'sf_schedule_main': '45 2,3,4,5,6,7 * * *'}
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task #### config end ### 
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task schedule: 45 2,3,4,5,6,7 * * *
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task divisor: 5
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,958] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     -------
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     spark-submit   xxxxx
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     -------
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     
> [2019-10-27 22:59:51,959] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     -------
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     spark-submit   xxxxxx
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task 
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     -------
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task     
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task export HADOOP_USER_NAME=hdfs && kinit -kt /etc/security/hdfs.keytab hdfs && hadoop distcp -update -delete /xxxxx/data/dm/sales_forecast/results/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }} s3_path/data/dm/sales_forecast/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }}
> [2019-10-27 22:59:51,960] {base_task_runner.py:101} INFO - Job 428425: Subtask branch_task [2019-10-27 22:59:51,955] {cli.py:517} INFO - Running <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [running]> on host dc36
> [2019-10-27 22:59:51,992] {python_operator.py:104} INFO - Exporting the following env vars:
> AIRFLOW_CTX_DAG_ID=dm_sf_main_V21
> AIRFLOW_CTX_TASK_ID=branch_task
> AIRFLOW_CTX_EXECUTION_DATE=2019-10-17T01:45:00+08:00
> AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-17T01:45:00+08:00
> [2019-10-27 22:59:51,992] {logging_mixin.py:95} INFO - ti.execution_date.hour: 1
> [2019-10-27 22:59:51,992] {python_operator.py:113} INFO - Done. Returned value was: sales_forecast_prophet
> [2019-10-27 22:59:51,992] {python_operator.py:143} INFO - Following branch ['sales_forecast_prophet']
> [2019-10-27 22:59:51,993] {python_operator.py:144} INFO - Marking other directly downstream tasks as skipped
> [2019-10-27 22:59:52,043] {python_operator.py:163} INFO - Done.
> [2019-10-27 22:59:56,127] {logging_mixin.py:95} INFO - [2019-10-27 22:59:56,125] {jobs.py:2562} INFO - Task exited with return code 0
> [2019-10-28 11:48:58,546] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-28 11:48:58,589] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [queued]>
> [2019-10-28 11:48:58,589] {__init__.py:1353} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-28 11:48:58,590] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-28 11:48:58,590] {__init__.py:1355} INFO - 
> --------------------------------------------------------------------------------
> [2019-10-28 11:48:58,646] {__init__.py:1374} INFO - Executing <Task(BranchPythonOperator): branch_task> on 2019-10-17T01:45:00+08:00
> [2019-10-28 11:48:58,646] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '449843', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main_V21.py', '--cfg_path', '/tmp/tmp088wf7mr']
> [2019-10-28 11:48:59,108] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task [2019-10-28 11:48:59,107] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=1312
> [2019-10-28 11:48:59,262] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task [2019-10-28 11:48:59,261] {__init__.py:51} INFO - Using executor CeleryExecutor
> [2019-10-28 11:48:59,538] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task [2019-10-28 11:48:59,536] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main_V21.py
> [2019-10-28 11:48:59,618] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task Overwrite dag_start_date by 2019-10-25
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task Overwrite sf_schedule_report by 30 9 * * *
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task Overwrite sf_schedule_etl by 40 2 * * *
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task Overwrite sf_schedule_main by 45 2,3,4,5,6,7 * * *
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task #### config ### 
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task CONFIG_KEY: sf_config_V21
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task CONFIG DICT: {'dag_start_date': '2019-10-25', 'xxxxx': 'xxxxx', 'sf_schedule_report': '30 9 * * *', 'sf_schedule_etl': '40 2 * * *', 'sf_schedule_main': '45 2,3,4,5,6,7 * * *'}
> [2019-10-28 11:48:59,620] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task #### config end ### 
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task schedule: 45 2,3,4,5,6,7 * * *
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task divisor: 5
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,621] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     -------
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     spark-submit    xxxxxx
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     -------
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf !
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     ## Job[sales_forecast_prophet] Command-line :
> [2019-10-28 11:48:59,622] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     -------
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     spark-submit      xxxxxxx
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task 
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     -------
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task     
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task export HADOOP_USER_NAME=hdfs && kinit -kt /etc/security/hdfs.keytab hdfs && hadoop distcp -update -delete /xxxxx/data/dm/sales_forecast/results/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }} s3_path/data/dm/sales_forecast/fbprophet/version=V21/cur_time={{ execution_date.strftime('%Y%m%d') }}
> [2019-10-28 11:48:59,623] {base_task_runner.py:101} INFO - Job 449843: Subtask branch_task [2019-10-28 11:48:59,618] {cli.py:517} INFO - Running <TaskInstance: dm_sf_main_V21.branch_task 2019-10-17T01:45:00+08:00 [running]> on host dc36
> [2019-10-28 11:48:59,652] {python_operator.py:104} INFO - Exporting the following env vars:
> AIRFLOW_CTX_DAG_ID=dm_sf_main_V21
> AIRFLOW_CTX_TASK_ID=branch_task
> AIRFLOW_CTX_EXECUTION_DATE=2019-10-17T01:45:00+08:00
> AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-17T01:45:00+08:00
> [2019-10-28 11:48:59,652] {logging_mixin.py:95} INFO - ti.execution_date.hour: 1
> [2019-10-28 11:48:59,652] {python_operator.py:113} INFO - Done. Returned value was: sales_forecast_prophet
> [2019-10-28 11:48:59,652] {python_operator.py:143} INFO - Following branch ['sales_forecast_prophet']
> [2019-10-28 11:48:59,653] {python_operator.py:144} INFO - Marking other directly downstream tasks as skipped
> [2019-10-28 11:48:59,704] {python_operator.py:163} INFO - Done.
> [2019-10-28 11:49:03,532] {logging_mixin.py:95} INFO - [2019-10-28 11:49:03,530] {jobs.py:2562} INFO - Task exited with return code 0
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)