You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "David Klosowski (JIRA)" <ji...@apache.org> on 2016/08/04 20:09:20 UTC

[jira] [Commented] (AIRFLOW-392) DAG runs on strange schedule in the past when deployed

    [ https://issues.apache.org/jira/browse/AIRFLOW-392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408413#comment-15408413 ] 

David Klosowski commented on AIRFLOW-392:
-----------------------------------------

OK, we fixed the issue.  I'll be quite honest, I'm not 100% sure what fixed the problem; however, the thought is inconsistency in dag_run state in the database; which was resolved by clearing the db and re-deploying the dag (changed the start_date).  We also adjusted the above creation of {{ExternalTaskSensor}} by changing delta to {{delta = relativedelta(weekday=weekday(1))} from -1 so it looks forward instead of backward.  The interesting thing is that behavior we were seeing is rather consistent with the existing code in version 1.7.1.3 (that we are running) in this block of {{jobs.py}}'s {{schedule_dag}} method:

{code}
       if dag.schedule_interval == '@once' and not last_scheduled_run:
                next_run_date = datetime.now()
            elif not last_scheduled_run:
                # First run
                TI = models.TaskInstance
                latest_run = (
                    session.query(func.max(TI.execution_date))
                    .filter_by(dag_id=dag.dag_id)
                    .scalar()
                )
                if latest_run:
                    # Migrating from previous version
                    # make the past 5 runs active
                    next_run_date = dag.date_range(latest_run, -5)[0]
                else:
                    task_start_dates = [t.start_date for t in dag.tasks]
                    if task_start_dates:
                        next_run_date = min(task_start_dates)
                    else:
                        next_run_date = None
{code}

Notice the block:

{code}
                    # Migrating from previous version
                    # make the past 5 runs active
                    next_run_date = dag.date_range(latest_run, -5)[0]
{code}

We were getting dag runs in the past.  My imagination is that the {{latest_run}} condition would be false if there are no {{TaskInstance}}s and thus no {{execution_date}}s for them on the first run of this (new Dag -> First DagRun); however, on subsequent invocations it would create the a {{DagRun}} from 5 prior schedule intervals.  Not sure why this didn't happen again though.

However, that logic has since changed in from AIRFLOW-168 (on master):

{code}
            if not last_scheduled_run:
                # First run
                task_start_dates = [t.start_date for t in dag.tasks]
                if task_start_dates:
                    next_run_date = min(task_start_dates)
            else:
                next_run_date = dag.following_schedule(last_scheduled_run)
{code}


> DAG runs on strange schedule in the past when deployed
> ------------------------------------------------------
>
>                 Key: AIRFLOW-392
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-392
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.7.1.3
>         Environment: AWS ElasticBeanstalk as a Docker application running in an Ubuntu-based container
>            Reporter: David Klosowski
>            Assignee: Norman Mu
>
> Just deployed a new DAG ('weekly-no-track') that depends on 7 DAG task runs of another DAG ('daily-no-track').  When the DAG is deployed the scheduler schedules and runs multiple runs in the past (yesterday it ran for 6/12/2016 and 6/05/2016), despite the start date set to the deployment date.  
> It would be a bit difficult to include all the code being used in the DAG since we have multiple libraries we've built in Python that are being referenced here that we want to eventually open source.  I've included some of the code here.  Let me know if this is all clear and what I can do to help or if any insight can be provided as to what it is occurring and how we might fix this.
> {code}
> from __future__ import division, print_function
> from airflow.models import DAG
> from airflow.operators import DummyOperator, ExternalTaskSensor, TimeDeltaSensor
> from tn_etl_tools.aws.emr import EmrConfig, HiveConfig, read_cluster_templates
> from tn_etl_tools.aws.emr import EmrService, EmrServiceWrapper, HiveStepBuilder
> from tn_etl_tools.datesupport import ts_add
> from tn_etl_tools.hive import HivePartitions
> from tn_etl_tools.yaml import YamlLoader
> from datetime import datetime, timedelta
> from dateutil.relativedelta import relativedelta, SU, MO , TU, WE, TH, FR, SA
> from common_args import merge_dicts, CommonHiveParams
> from operator_builders import add_tasks, emr_hive_operator
> import os
> # === configs
> config_dir = os.getenv('DAG_CONFIG_DIR', '/usr/local/airflow/config')
> alert_email = os.getenv('AIRFLOW_TO_EMAIL')
> app_properties = YamlLoader.load_yaml(config_dir + '/app.yml')
> emr_cluster_properties = YamlLoader.load_yaml(config_dir + '/emr_clusters.yml')
> emr_config = EmrConfig.load(STAGE=app_properties['STAGE'], **app_properties['EMR'])
> hive_config = HiveConfig.load(STAGE=app_properties['STAGE'], **app_properties['HIVE'])
> emr_cluster_templates = read_cluster_templates(emr_cluster_properties)
> # === /configs
> # TODO: force execution_date = sunday?
> run_for_date = datetime(2016, 8, 8)
> emr_service = EmrService()
> emr_service_wrapper = EmrServiceWrapper(emr_service=emr_service,
>                                         emr_config=emr_config, cluster_templates=emr_cluster_templates)
> hive_step_builder = HiveStepBuilder(hive_config=hive_config)
> hive_params = CommonHiveParams(app_properties_hive=app_properties['HIVE'])
> args = {'owner': 'airflow',
>         'depends_on_past': False,
>         'start_date': run_for_date,
>         'email': [alert_email],
>         'email_on_failure': True,
>         'email_on_retry': False,
>         'retries': 1,
>         'trigger_rule' : 'all_success',
>         'emr_service_wrapper': emr_service_wrapper,
>         'hive_step_builder': hive_step_builder}
> user_defined_macros = {'hive_partitions': HivePartitions,
>                        'ts_add': ts_add}
> params = {'stage': app_properties['STAGE']}
> dag = DAG(dag_id='weekly_no_track', default_args=args, user_defined_macros=user_defined_macros, params=params,
>           schedule_interval=timedelta(days=7),
>           max_active_runs=1)
> # === task definitions
> task_definitions = {
>     'wait-for-dailies': {
>         'operator_type': 'dummy_operator', # hub for custom defined dependencies
>         'operator_args': {},
>         'depends_on': []
>     },
>     'weekly-no-track': {
>         'operator_type': 'emr_hive_operator',
>         'operator_args': {
>             'hive_step': {
>                 'script': 'weekly-no-track-airflow',  # temporary modified script with separate output path
>                 'cluster_name': 'geoprofile',
>                 'script_vars': merge_dicts(hive_params.default_params(), hive_params.rundate_params(), {
>                     'PARTITIONS': '{{hive_partitions.by_day(ts_add(ts, days=-6), ts_add(ts, days=1))}}',
>                 }),
>             }
>         },
>         'depends_on': ['wait-for-dailies']
>     }
> }
> # === /task definitions
> operator_builders = {'emr_hive_operator': emr_hive_operator,
>                      'time_delta_sensor': TimeDeltaSensor,
>                      'dummy_operator': DummyOperator}
> add_tasks(task_definitions, dag=dag, operator_builders=operator_builders)
> # === custom tasks
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
>     task_id = 'wait-for-daily-{day}'.format(day=weekday)
>     # weekday(-1) subtracts 1 relative week from the given weekday, however if the calculated date is already Monday,
>     # for example, -1 won't change the day.
>     delta = relativedelta(weekday=weekday(-1))
>     sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
>                                 external_dag_id='daily_no_track', external_task_id='daily-no-track',
>                                 execution_delta=delta, timeout=86400)  # 86400 = 24 hours
>     sensor.set_downstream(downstream_task)
> # === /custom tasks
> {code}
> Some referenced code
> {{common_args.py}}
> {code}
> from __future__ import division, print_function
> from copy import copy
> class CommonHiveParams(object):
>     def __init__(self, app_properties_hive):
>         super(CommonHiveParams, self).__init__()
>         # TODO: this should be part of a config object
>         self.app_properties_hive = app_properties_hive
>     def default_params(self):
>         return {
>             'HIVE_LIBS_BUCKET': self.app_properties_hive['S3_HIVE_LIB_BUCKET'],
>             'STAGE': '{{params.stage}}',
>         }
>     @staticmethod
>     def rundate_params():
>         return {
>             'YEAR': '{{execution_date.strftime("%Y")}}',
>             'MONTH': '{{execution_date.strftime("%m")}}',
>             'DAY': '{{execution_date.strftime("%d")}}',
>             'HOUR': '{{execution_date.strftime("%H")}}',
>             'MINUTE': '{{execution_date.strftime("%M")}}',
>         }
> def merge_dicts(*dicts):
>     """ Merge provided dicts without modification.
>     Duplicate keys are overwritten with values from the rightmost applicable dict.
>     """
>     if len(dicts) == 0:
>         return {}
>     result = copy(dicts[0])
>     for d in dicts[1:]:
>         result.update(d)
>     return result
> {code}
> {{operator_builders.py}}
> {code}
> """Functions for building operators from dict property definitions."""
> from __future__ import division, print_function
> from tn_airflow_components.operators.emr import EmrHiveOperator, create_emr_operator_with_step_sensor
> # TODO: this should not be a single package. Not every DAG needs EMR as a dependency, for example.
> def emr_hive_operator(task_id, dag, hive_step, **kwargs):
>     return create_emr_operator_with_step_sensor(task_id=task_id, dag=dag,
>                                                 main_operator_class=EmrHiveOperator, main_operator_kwargs=hive_step,
>                                                 **kwargs)
> def add_tasks(task_definitions, dag, operator_builders):
>     """Add tasks from dict definitions
>     :param task_definitions: dict of task definitions. Keys in the top-level dict are used as the task IDs
>     :type task_definitions: dict
>     :param dag: the DAG in which to define the tasks
>     :type dag: airflow.models.DAG
>     :param operator_builders: mapping of str 'operator_type' values to operator builder functions
>     :type operator_builders: dict
>     """
>     for task_id in task_definitions.keys():
>         task_definition = task_definitions[task_id]
>         operator_type = task_definition['operator_type']
>         operator = operator_builders[operator_type](task_id=task_id, dag=dag, **task_definition['operator_args'])
>         if task_definition['depends_on']:
>             for dependency in task_definition['depends_on']:
>                 operator.set_upstream(dag.get_task(dependency))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)