You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kasim (Jira)" <ji...@apache.org> on 2019/11/14 02:32:00 UTC

[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable

     [ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

kasim updated AIRFLOW-5927:
---------------------------
    Description: 
I have a `config.py`  pull configure from `Variable` and merge into default config :

 
 ```python

from datetime import datetime

from airflow.models import Variable

class Config:
     version = "V21"

    etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
     forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
     forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
     forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
     
     etl_dir = '/data/dm/sales_forecast/etl'
     feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'

    dag_start_date = datetime(2019, 10, 25)

    etl_start_time = "2019-06-01 00:00:00"
     etl_end_time = "{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"

    train_start_time = "{{ (execution_date  -macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
     train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

    predict_start_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
     predict_end_time = "{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"

    report_start_date = "{{ (execution_date  -macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
     report_end_date = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

    sf_schedule_report = "30 8  **  *"

    sf_schedule_etl = '30 1  **  *'

    sf_schedule_main_flow = "45 2  **  *"

CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
     for k, v in sf_config.items():
         print(f'Overwrite \{k} by \{v}')
         if hasattr(Config, k):
             if k == 'dag_start_date':
                setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
            elif v == 'None':
                 setattr(Config, k, None)
             else:
                 setattr(Config, k, v)```

 
 And I have 5 dag file import this Config . they have some similar code like 
  
 ```python

from datetime import datetime, timedelta

from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable

from sf_dags_n.config import Config

default_args = 

{     'owner': 'mithril',     'depends_on_past': False,     'email': ['mithril'],     'email_on_failure': False,     'email_on_retry': False,     'retries': 2, }

dag = DAG('dm_sfx_etl_%s' % Config.version, 
     start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
     default_args=default_args, 
     schedule_interval=Config.sf_schedule_etl,
     user_defined_filters=

{         'mod' : lambda s, d:s%d     }

,
 )

# other codes
 ```
  
  
 The stange thing is : 
  
Change `sf_schedule_etl` in Variable  took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code  it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
     start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
     default_args=default_args, 
     schedule_interval='20 1  **  *',
     user_defined_filters= \{         'mod' : lambda s, d:s%d     }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine .   I think there must be some cache in airflow lead to this  problem.
  
  
  
  
  

  was:
I have a `config.py`  pull configure from `Variable` and merge into default config :

 
```python

from datetime import datetime

from airflow.models import Variable

class Config:
    version = "V21"

    etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
    forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
    forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
    forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
    
    etl_dir = '/data/dm/sales_forecast/etl'
    feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'

    dag_start_date = datetime(2019, 10, 25)

    etl_start_time = "2019-06-01 00:00:00"
    etl_end_time = "\{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"

    train_start_time = "\{{ (execution_date - macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
    train_end_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

    predict_start_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
    predict_end_time = "\{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"

    report_start_date = "\{{ (execution_date - macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
    report_end_date = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

    sf_schedule_report = "30 8 * * *"

    sf_schedule_etl = '30 1 * * *'


    sf_schedule_main_flow = "45 2 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})


if sf_config:
    for k, v in sf_config.items():
        print(f'Overwrite \{k} by \{v}')
        if hasattr(Config, k):
            if k == 'dag_start_date':
                print(datetime.strptime(v, '%Y-%m-%d'))
                print(type(datetime.strptime(v, '%Y-%m-%d')))

                setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
                
                print(Config.dag_start_date)
                print(type(Config.dag_start_date))

            if v == 'None':
                setattr(Config, k, None)
            else:
                setattr(Config, k, v)```


 
And I have 5 dag file import this Config . they have some similar code like 
 
```python


from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable

from sf_dags_n.config import Config

default_args = {
    'owner': 'mithril',
    'depends_on_past': False,
    'email': ['mithril'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
}

print(Config.dag_start_date)
print(type(Config.dag_start_date))


dag = DAG('dm_sfx_etl_%s' % Config.version, 
    start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
    default_args=default_args, 
    schedule_interval=Config.sf_schedule_etl,
    user_defined_filters={
        'mod' : lambda s, d:s%d
    },
)

# other codes
```
 
 
The stange thing is : 
 
1. At first , in dag file which `from sf_dags_n.config import Config` ,  Config.dag_start_date was datetime type .  But it became str several days ago,  I check it type in config.py and the dag file ,  Config.dag_start_date  is still datetime type , but  in dag file  is str .  As I remenber, I set default Config.dag_start_date with type str, but changed it when occured problem . Then it was fine for  some time before went wrong.
 
2. Change `sf_schedule_etl` in Variable  took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code  it : 
 
```
dag = DAG('dm_sfx_etl_%s' % Config.version, 
    start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
    default_args=default_args, 
    schedule_interval='20 1 * * *',
    user_defined_filters={
        'mod' : lambda s, d:s%d
    },
)
```
 
If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . 
 
PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine .   I think there must be some cache in airflow lead to this  problem.
 
 
 
 
 


> Airflow cache import file or variable
> -------------------------------------
>
>                 Key: AIRFLOW-5927
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5927
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, database
>    Affects Versions: 1.10.3
>            Reporter: kasim
>            Priority: Major
>
> I have a `config.py`  pull configure from `Variable` and merge into default config :
>  
>  ```python
> from datetime import datetime
> from airflow.models import Variable
> class Config:
>      version = "V21"
>     etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
>      forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
>      forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
>      forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
>      
>      etl_dir = '/data/dm/sales_forecast/etl'
>      feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
>     dag_start_date = datetime(2019, 10, 25)
>     etl_start_time = "2019-06-01 00:00:00"
>      etl_end_time = "{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"
>     train_start_time = "{{ (execution_date  -macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
>      train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
>     predict_start_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
>      predict_end_time = "{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"
>     report_start_date = "{{ (execution_date  -macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
>      report_end_date = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
>     sf_schedule_report = "30 8  **  *"
>     sf_schedule_etl = '30 1  **  *'
>     sf_schedule_main_flow = "45 2  **  *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
>      for k, v in sf_config.items():
>          print(f'Overwrite \{k} by \{v}')
>          if hasattr(Config, k):
>              if k == 'dag_start_date':
>                 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
>             elif v == 'None':
>                  setattr(Config, k, None)
>              else:
>                  setattr(Config, k, v)```
>  
>  And I have 5 dag file import this Config . they have some similar code like 
>   
>  ```python
> from datetime import datetime, timedelta
> from airflow import DAG
>  from airflow.operators.dummy_operator import DummyOperator
>  from airflow.operators.bash_operator import BashOperator
>  from airflow.operators.dagrun_operator import TriggerDagRunOperator
>  from airflow.models import Variable
> from sf_dags_n.config import Config
> default_args = 
> {     'owner': 'mithril',     'depends_on_past': False,     'email': ['mithril'],     'email_on_failure': False,     'email_on_retry': False,     'retries': 2, }
> dag = DAG('dm_sfx_etl_%s' % Config.version, 
>      start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
>      default_args=default_args, 
>      schedule_interval=Config.sf_schedule_etl,
>      user_defined_filters=
> {         'mod' : lambda s, d:s%d     }
> ,
>  )
> # other codes
>  ```
>   
>   
>  The stange thing is : 
>   
> Change `sf_schedule_etl` in Variable  took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code  it : 
>   
>  ```
>  dag = DAG('dm_sfx_etl_%s' % Config.version, 
>      start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
>      default_args=default_args, 
>      schedule_interval='20 1  **  *',
>      user_defined_filters= \{         'mod' : lambda s, d:s%d     }
> ,
>  )
>  ```
>   
>  If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . 
>   
>  PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine .   I think there must be some cache in airflow lead to this  problem.
>   
>   
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)