You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "kasim (Jira)" <ji...@apache.org> on 2019/11/14 02:32:00 UTC
[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or
variable
[ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kasim updated AIRFLOW-5927:
---------------------------
Description:
I have a `config.py` pull configure from `Variable` and merge into default config :
```python
from datetime import datetime
from airflow.models import Variable
class Config:
version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
etl_dir = '/data/dm/sales_forecast/etl'
feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
etl_end_time = "{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"
train_start_time = "{{ (execution_date -macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
predict_start_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
predict_end_time = "{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"
report_start_date = "{{ (execution_date -macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
report_end_date = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
sf_schedule_report = "30 8 ** *"
sf_schedule_etl = '30 1 ** *'
sf_schedule_main_flow = "45 2 ** *"
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
for k, v in sf_config.items():
print(f'Overwrite \{k} by \{v}')
if hasattr(Config, k):
if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
elif v == 'None':
setattr(Config, k, None)
else:
setattr(Config, k, v)```
And I have 5 dag file import this Config . they have some similar code like
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable
from sf_dags_n.config import Config
default_args =
{ 'owner': 'mithril', 'depends_on_past': False, 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, }
dag = DAG('dm_sfx_etl_%s' % Config.version,
start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
default_args=default_args,
schedule_interval=Config.sf_schedule_etl,
user_defined_filters=
{ 'mod' : lambda s, d:s%d }
,
)
# other codes
```
The stange thing is :
Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it :
```
dag = DAG('dm_sfx_etl_%s' % Config.version,
start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
default_args=default_args,
schedule_interval='20 1 ** *',
user_defined_filters= \{ 'mod' : lambda s, d:s%d }
,
)
```
If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` .
PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem.
was:
I have a `config.py` pull configure from `Variable` and merge into default config :
```python
from datetime import datetime
from airflow.models import Variable
class Config:
version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
etl_dir = '/data/dm/sales_forecast/etl'
feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
etl_end_time = "\{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"
train_start_time = "\{{ (execution_date - macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
train_end_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
predict_start_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
predict_end_time = "\{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"
report_start_date = "\{{ (execution_date - macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
report_end_date = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
sf_schedule_report = "30 8 * * *"
sf_schedule_etl = '30 1 * * *'
sf_schedule_main_flow = "45 2 * * *"
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
for k, v in sf_config.items():
print(f'Overwrite \{k} by \{v}')
if hasattr(Config, k):
if k == 'dag_start_date':
print(datetime.strptime(v, '%Y-%m-%d'))
print(type(datetime.strptime(v, '%Y-%m-%d')))
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
print(Config.dag_start_date)
print(type(Config.dag_start_date))
if v == 'None':
setattr(Config, k, None)
else:
setattr(Config, k, v)```
And I have 5 dag file import this Config . they have some similar code like
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable
from sf_dags_n.config import Config
default_args = {
'owner': 'mithril',
'depends_on_past': False,
'email': ['mithril'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
}
print(Config.dag_start_date)
print(type(Config.dag_start_date))
dag = DAG('dm_sfx_etl_%s' % Config.version,
start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
default_args=default_args,
schedule_interval=Config.sf_schedule_etl,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
# other codes
```
The stange thing is :
1. At first , in dag file which `from sf_dags_n.config import Config` , Config.dag_start_date was datetime type . But it became str several days ago, I check it type in config.py and the dag file , Config.dag_start_date is still datetime type , but in dag file is str . As I remenber, I set default Config.dag_start_date with type str, but changed it when occured problem . Then it was fine for some time before went wrong.
2. Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it :
```
dag = DAG('dm_sfx_etl_%s' % Config.version,
start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
default_args=default_args,
schedule_interval='20 1 * * *',
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
```
If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` .
PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem.
> Airflow cache import file or variable
> -------------------------------------
>
> Key: AIRFLOW-5927
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5927
> Project: Apache Airflow
> Issue Type: Bug
> Components: DAG, database
> Affects Versions: 1.10.3
> Reporter: kasim
> Priority: Major
>
> I have a `config.py` pull configure from `Variable` and merge into default config :
>
> ```python
> from datetime import datetime
> from airflow.models import Variable
> class Config:
> version = "V21"
> etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
> forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
> forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
> forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
>
> etl_dir = '/data/dm/sales_forecast/etl'
> feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
> dag_start_date = datetime(2019, 10, 25)
> etl_start_time = "2019-06-01 00:00:00"
> etl_end_time = "{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"
> train_start_time = "{{ (execution_date -macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
> train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
> predict_start_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
> predict_end_time = "{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"
> report_start_date = "{{ (execution_date -macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
> report_end_date = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
> sf_schedule_report = "30 8 ** *"
> sf_schedule_etl = '30 1 ** *'
> sf_schedule_main_flow = "45 2 ** *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite \{k} by \{v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> elif v == 'None':
> setattr(Config, k, None)
> else:
> setattr(Config, k, v)```
>
> And I have 5 dag file import this Config . they have some similar code like
>
> ```python
> from datetime import datetime, timedelta
> from airflow import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dagrun_operator import TriggerDagRunOperator
> from airflow.models import Variable
> from sf_dags_n.config import Config
> default_args =
> { 'owner': 'mithril', 'depends_on_past': False, 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, }
> dag = DAG('dm_sfx_etl_%s' % Config.version,
> start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
> default_args=default_args,
> schedule_interval=Config.sf_schedule_etl,
> user_defined_filters=
> { 'mod' : lambda s, d:s%d }
> ,
> )
> # other codes
> ```
>
>
> The stange thing is :
>
> Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it :
>
> ```
> dag = DAG('dm_sfx_etl_%s' % Config.version,
> start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
> default_args=default_args,
> schedule_interval='20 1 ** *',
> user_defined_filters= \{ 'mod' : lambda s, d:s%d }
> ,
> )
> ```
>
> If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` .
>
> PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem.
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)