You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:20:17 UTC

[GitHub] stale[bot] closed pull request #1942: [AIRFLOW-697] Add exclusion of tasks.

stale[bot] closed pull request #1942: [AIRFLOW-697] Add exclusion of tasks.
URL: https://github.com/apache/incubator-airflow/pull/1942
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index a2d94e30cd..7321025cae 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -43,7 +43,7 @@
 from airflow import executors, models, settings
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagRun
+from airflow.models import DagRun, TaskExclusion
 from airflow.settings import Stats
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils.state import State
@@ -844,8 +844,15 @@ def _process_task_instances(self, dag, queue):
                 if ti.are_dependencies_met(
                         dep_context=DepContext(flag_upstream_failed=True),
                         session=session):
-                    self.logger.debug('Queuing task: {}'.format(ti))
-                    queue.append(ti.key)
+                    if TaskExclusion.should_exclude_task(
+                            dag_id=ti.dag_id,
+                            task_id=ti.task_id,
+                            execution_date=ti.execution_date):
+                        self.logger.debug('Excluding task: {}'.format(ti))
+                        ti.set_state(State.EXCLUDED, session)
+                    else:
+                        self.logger.debug('Queuing task: {}'.format(ti))
+                        queue.append(ti.key)
 
         session.close()
 
@@ -1733,7 +1740,7 @@ def get_task_instances_for_dag_run(dag_run):
                                       .format(ti, ti.state))
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
-                    if ti.state == State.SUCCESS:
+                    if ti.state_for_dependents() == State.SUCCESS:
                         succeeded.add(key)
                         self.logger.debug("Task instance {} succeeded. "
                                           "Don't rerun.".format(ti))
@@ -1831,7 +1838,7 @@ def get_task_instances_for_dag_run(dag_run):
                     elif state == State.SUCCESS:
 
                         # task reports success
-                        if ti.state == State.SUCCESS:
+                        if ti.state_for_dependents() == State.SUCCESS:
                             self.logger.info(
                                 'Task instance {} succeeded'.format(ti))
                             succeeded.add(key)
diff --git a/airflow/migrations/versions/bbb79aef5cac_create_task_exclusion_table.py b/airflow/migrations/versions/bbb79aef5cac_create_task_exclusion_table.py
new file mode 100644
index 0000000000..85e957ea72
--- /dev/null
+++ b/airflow/migrations/versions/bbb79aef5cac_create_task_exclusion_table.py
@@ -0,0 +1,62 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""create task_exclusion table
+
+Revision ID: bbb79aef5cac
+Revises: f2ca10b85618
+Create Date: 2016-11-18 13:38:34.653202
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'bbb79aef5cac'
+down_revision = 'f2ca10b85618'
+branch_labels = None
+depends_on = None
+
+from alembic import op, context
+import sqlalchemy as sa
+from sqlalchemy.dialects import mysql
+
+
+def upgrade():
+    if context.config.get_main_option('sqlalchemy.url').startswith('mysql'):
+        op.create_table(
+            'task_exclusion',
+            sa.Column('id', sa.Integer(), nullable=False),
+            sa.Column('dag_id', sa.String(length=250), nullable=False),
+            sa.Column('task_id', sa.String(length=250), nullable=False),
+            sa.Column('exclusion_type', sa.String(length=32), nullable=False),
+            sa.Column('exclusion_start_date', mysql.DATETIME(fsp=6),
+                      nullable=False),
+            sa.Column('exclusion_end_date', mysql.DATETIME(fsp=6),
+                      nullable=False),
+            sa.Column('created_by', sa.String(length=256), nullable=False),
+            sa.Column('created_on', mysql.DATETIME(fsp=6), nullable=False),
+            sa.PrimaryKeyConstraint('id'))
+    else:
+        op.create_table(
+            'task_exclusion',
+            sa.Column('id', sa.Integer(), nullable=False),
+            sa.Column('dag_id', sa.String(length=250), nullable=False),
+            sa.Column('task_id', sa.String(length=250), nullable=False),
+            sa.Column('exclusion_type', sa.String(length=32), nullable=False),
+            sa.Column('exclusion_start_date', sa.DateTime(), nullable=False),
+            sa.Column('exclusion_end_date', sa.DateTime(), nullable=False),
+            sa.Column('created_by', sa.String(length=256), nullable=False),
+            sa.Column('created_on', sa.DateTime(), nullable=False),
+            sa.PrimaryKeyConstraint('id'))
+
+def downgrade():
+    op.drop_table('task_exclusion')
diff --git a/airflow/models.py b/airflow/models.py
index f46a3523b3..1314776283 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -761,6 +761,19 @@ def init_on_load(self):
         """ Initialize the attributes that aren't stored in the DB. """
         self.test_mode = False  # can be changed when calling 'run'
 
+    def state_for_dependents(self):
+        """
+        Helper function used to wire in the EXCLUDED state. For identifying
+        whether task dependencies are met, the EXCLUDED state should be treated
+        as SUCCESS. This function allows us to encompass this logic in one
+        place.
+        :return: the effective state of the task instance.
+        """
+        if self.state == State.EXCLUDED:
+            return State.SUCCESS
+        else:
+            return self.state
+
     def command(
             self,
             mark_success=False,
@@ -1013,7 +1026,7 @@ def are_dependents_done(self, session=None):
             TaskInstance.dag_id == self.dag_id,
             TaskInstance.task_id.in_(task.downstream_task_ids),
             TaskInstance.execution_date == self.execution_date,
-            TaskInstance.state == State.SUCCESS,
+            TaskInstance.state.in_([State.SUCCESS, State.EXCLUDED]),
         )
         count = ti[0][0]
         return count == len(task.downstream_task_ids)
@@ -1183,8 +1196,9 @@ def run(
         self.hostname = socket.getfqdn()
         self.operator = task.__class__.__name__
 
-        if not ignore_all_deps and not ignore_ti_state and self.state == State.SUCCESS:
-            Stats.incr('previously_succeeded', 1, 1)
+        if not ignore_all_deps and not ignore_ti_state:
+            if self.state_for_dependents() == State.SUCCESS:
+                Stats.incr('previously_succeeded', 1, 1)
 
         queue_dep_context = DepContext(
             deps=QUEUE_DEPS,
@@ -4044,3 +4058,247 @@ class ImportError(Base):
     timestamp = Column(DateTime)
     filename = Column(String(1024))
     stacktrace = Column(Text)
+
+
+class TaskExclusionType(object):
+    """
+    This class is used to define the different types of circumstances under
+    which to exclude tasks from execution. It should be used only for
+    interaction with the TaskExclusion class.
+    """
+
+    # SINGLE_DATE exclusion will prevent a task from executing only in the
+    # DagRun with execution_date matching the given datetime.
+    SINGLE_DATE = 'single_date'
+
+    # DATE_RANGE exclusion will prevent a task from executing in any DagRun
+    # with an execution_date that is between the start and end dates of the
+    # exclusion. These boundaries are inclusive.
+    DATE_RANGE = 'date_range'
+
+    # INDEFINITE exclusion will prevent a task from executing in any DagRun
+    # while the exclusion is in place.
+    INDEFINITE = 'indefinite'
+
+
+class TaskExclusion(Base):
+    """
+    This class is used to define objects that can be used to specify not to
+    run a given task in a given dag on a variety of execution date conditions.
+    These objects will be stored in the backend database in the task_exclusion
+    table.
+    Static methods are provided for the creation, removal and investigation of
+    these objects.
+    """
+
+    __tablename__ = "task_exclusion"
+
+    id = Column(Integer(), primary_key=True)
+    dag_id = Column(String(ID_LEN), nullable=False)
+    task_id = Column(String(ID_LEN), nullable=False)
+    exclusion_type = Column(String(32), nullable=False)
+    exclusion_start_date = Column(DateTime(), nullable=True)
+    exclusion_end_date = Column(DateTime(), nullable=True)
+    created_by = Column(String(256), nullable=False)
+    created_on = Column(DateTime(), nullable=False)
+
+    @classmethod
+    @provide_session
+    def set(
+            cls,
+            dag_id,
+            task_id,
+            exclusion_type,
+            exclusion_start_date,
+            exclusion_end_date,
+            created_by,
+            session=None):
+        """
+        Add a task exclusion to prevent a task running under certain
+        circumstances.
+        :param dag_id: The dag_id of the DAG containing the task to exclude
+         from execution.
+        :param task_id: The task_id of the task to exclude from execution.
+        :param exclusion_type: The type of circumstances to exclude the task
+         from execution under. See the TaskExclusionType class for more detail.
+        :param exclusion_start_date: The execution_date to start excluding on.
+         This will be ignored if the exclusion_type is INDEFINITE.
+        :param exclusion_end_date: The execution_date to stop excluding on.
+         This will be ignored if the exclusion_type is INDEFINITE or
+         SINGLE_DATE.
+        :param created_by: Who is creating this exclusion. Stored with the
+         exclusion record for auditing/debugging purposes.
+        :return: None.
+        """
+
+        session.expunge_all()
+
+        # Set up execution date range correctly
+        if exclusion_type == TaskExclusionType.SINGLE_DATE:
+            if exclusion_start_date:
+                exclusion_end_date = exclusion_start_date
+            else:
+                raise AirflowException(
+                    "No exclusion_start_date "
+                )
+        elif exclusion_type == TaskExclusionType.DATE_RANGE:
+            if exclusion_start_date > exclusion_end_date:
+                raise AirflowException(
+                    "The exclusion_start_date is after the exclusion_end_date"
+                )
+        elif exclusion_type == TaskExclusionType.INDEFINITE:
+            exclusion_start_date = None
+            exclusion_end_date = None
+        else:
+            raise AirflowException(
+                "The exclusion_type, {}, is not recognised."
+                .format(exclusion_type)
+            )
+
+        # remove any duplicate exclusions
+        session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            cls.exclusion_type == exclusion_type,
+            cls.exclusion_start_date == exclusion_start_date,
+            cls.exclusion_end_date == exclusion_end_date
+        ).delete()
+
+        # insert new exclusion
+        session.add(TaskExclusion(
+            dag_id=dag_id,
+            task_id=task_id,
+            exclusion_type=exclusion_type,
+            exclusion_start_date=exclusion_start_date,
+            exclusion_end_date=exclusion_end_date,
+            created_by=created_by,
+            created_on=datetime.now())
+        )
+
+        session.commit()
+
+    @classmethod
+    @provide_session
+    def remove(
+            cls,
+            dag_id,
+            task_id,
+            exclusion_type,
+            exclusion_start_date,
+            exclusion_end_date,
+            session=None):
+        """
+        Remove a task exclusion that would prevent a task running under certain
+        circumstances.
+        :param dag_id: The dag_id of the DAG containing the task that would be
+         excluded from execution.
+        :param task_id: The task_id of the task that would be excluded from
+         execution.
+        :param exclusion_type: The type of circumstances that the task would be
+         excluded from execution under. See the TaskExclusionType class for
+          more detail.
+        :param exclusion_start_date: The execution_date that the exclusion
+         starts on. This will be ignored if the exclusion_type is INDEFINITE.
+        :param exclusion_end_date: The execution_date that the exclusion ends
+         on. This will be ignored if the exclusion_type is INDEFINITE or
+         SINGLE_DATE.
+        :return: None.
+        """
+
+        session.expunge_all()
+
+        # Set up execution date range correctly
+        if exclusion_type == TaskExclusionType.SINGLE_DATE:
+            if exclusion_start_date:
+                exclusion_end_date = exclusion_start_date
+            else:
+                raise AirflowException(
+                    "No exclusion_start_date "
+                )
+        elif exclusion_type == TaskExclusionType.DATE_RANGE:
+            if exclusion_start_date > exclusion_end_date:
+                raise AirflowException(
+                    "The exclusion_start_date is after the exclusion_end_date"
+                )
+        elif exclusion_type == TaskExclusionType.INDEFINITE:
+            exclusion_start_date = None
+            exclusion_end_date = None
+        else:
+            raise AirflowException(
+                "The exclusion_type, {}, is not recognised."
+                .format(exclusion_type)
+            )
+
+        # remove any identified exclusion.
+        session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            cls.exclusion_type == exclusion_type,
+            cls.exclusion_start_date == exclusion_start_date,
+            cls.exclusion_end_date == exclusion_end_date
+        ).delete()
+
+        session.commit()
+
+    @classmethod
+    @provide_session
+    def should_exclude_task(
+            cls,
+            dag_id,
+            task_id,
+            execution_date,
+            session=None):
+        """
+        Identify whether any exclusions exist that apply to the given task in
+        the given DAG for the given execution date.
+        :param dag_id: The dag_id of the DAG containing the task instance to
+         check for exclusions for.
+        :param task_id: The task_id of the task instance to check for
+         exclusions for.
+        :param execution_date: The execution_date of the task instance to check
+         for exclusions for.
+        :return: True if an exclusion exists for the given task instance. False
+         otherwise.
+        """
+
+        session.expunge_all()
+
+        # Attempt to identify an INDEFINITE exclusion.
+        exclusion = session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            cls.exclusion_type == TaskExclusionType.INDEFINITE,
+        ).first()
+
+        # If an exclusion has been found, return True.
+        if exclusion:
+            return True
+
+        # Attempt to identify a SINGLE_DATE exclusion.
+        exclusion = session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            cls.exclusion_type == TaskExclusionType.SINGLE_DATE,
+            cls.exclusion_start_date == execution_date
+        ).first()
+
+        # If an exclusion has been found, return True.
+        if exclusion:
+            return True
+
+        # Attempt to identify a DATE_RANGE exclusion.
+        exclusion = session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            cls.exclusion_type == TaskExclusionType.DATE_RANGE,
+            cls.exclusion_start_date <= execution_date,
+            cls.exclusion_end_date >= execution_date
+        ).first()
+
+        # If an exclusion has been found, return True.
+        if exclusion:
+            return True
+
+        # No exclusion has been found, so return False.
+        return False
+
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 82355ec2fa..0b89e0e15e 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -51,7 +51,7 @@ def _get_dep_statuses(self, ti, session, dep_context):
                        "task instance has not run yet.")
             raise StopIteration
 
-        if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
+        if previous_ti.state_for_dependents not in {State.SKIPPED, State.SUCCESS}:
             yield self._failing_status(
                 reason="depends_on_past is true for this task, but the previous task "
                        "instance {0} is in the state '{1}' which is not a successful "
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 281ed51bf6..812936ea39 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -51,6 +51,8 @@ def _get_dep_statuses(self, ti, session, dep_context):
             .query(
                 func.coalesce(func.sum(
                     case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+                func.coalesce(func.sum(
+                    case([(TI.state == State.EXCLUDED, 1)], else_=0)), 0),
                 func.coalesce(func.sum(
                     case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
                 func.coalesce(func.sum(
@@ -64,12 +66,19 @@ def _get_dep_statuses(self, ti, session, dep_context):
                 TI.task_id.in_(ti.task.upstream_task_ids),
                 TI.execution_date == ti.execution_date,
                 TI.state.in_([
-                    State.SUCCESS, State.FAILED,
+                    State.SUCCESS, State.FAILED, State.EXCLUDED,
                     State.UPSTREAM_FAILED, State.SKIPPED]),
             )
         )
 
-        successes, skipped, failed, upstream_failed, done = qry.first()
+        successes, excluded, skipped, failed, upstream_failed, done = qry.first()
+
+        # Add excluded tasks into successful tasks as they are equivalent for
+        # dependency purposes. This is done in this way, not using the
+        # state_for_dependents function, due to the constraints of SQLAlchemy
+        # queries.
+        successes = successes + excluded
+
         for dep_status in self._evaluate_trigger_rule(
                 ti=ti,
                 successes=successes,
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 4a1dfb620a..5ee9a56ca0 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -40,6 +40,7 @@ class State(object):
     UP_FOR_RETRY = "up_for_retry"
     UPSTREAM_FAILED = "upstream_failed"
     SKIPPED = "skipped"
+    EXCLUDED = 'excluded'
 
     task_states = (
         SUCCESS,
@@ -48,6 +49,7 @@ class State(object):
         UPSTREAM_FAILED,
         UP_FOR_RETRY,
         QUEUED,
+        EXCLUDED,
     )
 
     dag_states = (
@@ -67,6 +69,7 @@ class State(object):
         SKIPPED: 'pink',
         REMOVED: 'lightgrey',
         SCHEDULED: 'white',
+        EXCLUDED: 'purple',
     }
 
     @classmethod
@@ -96,6 +99,7 @@ def finished(cls):
             cls.SHUTDOWN,
             cls.FAILED,
             cls.SKIPPED,
+            cls.EXCLUDED,
         ]
 
     @classmethod
diff --git a/airflow/www/static/graph.css b/airflow/www/static/graph.css
index e724b7af74..396589b779 100644
--- a/airflow/www/static/graph.css
+++ b/airflow/www/static/graph.css
@@ -36,6 +36,10 @@ g.node.queued rect {
     stroke: grey;
 }
 
+g.node.excluded rect {
+    stroke: purple;
+}
+
 g.node.running rect{
     stroke: lime;
 }
diff --git a/airflow/www/static/tree.css b/airflow/www/static/tree.css
index 18182505e2..49a927a6d7 100644
--- a/airflow/www/static/tree.css
+++ b/airflow/www/static/tree.css
@@ -41,6 +41,9 @@ rect.state {
 rect.null, rect.undefined {
     fill: white;
 }
+rect.excluded {
+    fill: purple;
+}
 rect.success {
     fill: green;
 }
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index b9b1afa18e..84457ad586 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -211,6 +211,16 @@ <h4 class="modal-title" id="myModalLabel">
               Recursive
             </button>
           </span>
+          <hr/>
+          <button id="btn_exclude" type="button" class="btn btn-primary">
+            Exclude from this DAG Run
+          </button>
+          <span class="btn-group">
+            <button id="btn_exclude_clear"
+              type="button" class="btn" data-toggle="button">
+              Clear Exclusion
+            </button>
+          </span>
         </div>
         <div class="modal-footer">
           <button type="button" class="btn btn-default" data-dismiss="modal">
@@ -347,6 +357,17 @@ <h4 class="modal-title" id="myModalLabel">
       window.location = url;
     });
 
+    $("#btn_exclude").click(function(){
+      url = "{{ url_for('airflow.exclude') }}" +
+        "?task_id=" + encodeURIComponent(task_id) +
+        "&dag_id=" + encodeURIComponent(dag_id) +
+        "&execution_date=" + execution_date +
+        "&clear=" + $('#btn_exclude_clear').hasClass('active') +
+        "&origin=" + encodeURIComponent(window.location);
+
+      window.location = url;
+    });
+
     $("#btn_gantt").click(function(){
       url = "{{ url_for('airflow.gantt') }}" +
         "?dag_id=" + dag_id +
diff --git a/airflow/www/templates/airflow/graph.html b/airflow/www/templates/airflow/graph.html
index 24fc508027..763817c91f 100644
--- a/airflow/www/templates/airflow/graph.html
+++ b/airflow/www/templates/airflow/graph.html
@@ -62,6 +62,7 @@
 
     <div style"background-color: blue;">
     <div class="legend_item state" style="border-color:white;">no status</div>
+    <div class="legend_item state" style="border-color:purple;">excluded</div>
     <div class="legend_item state" style="border-color:grey;">queued</div>
     <div class="legend_item state" style="border-color:gold;">retry</div>
     <div class="legend_item state" style="border-color:pink;">skipped</div>
diff --git a/airflow/www/templates/airflow/tree.html b/airflow/www/templates/airflow/tree.html
index be3655a860..def7467a98 100644
--- a/airflow/www/templates/airflow/tree.html
+++ b/airflow/www/templates/airflow/tree.html
@@ -1,13 +1,13 @@
-{# 
+{#
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  
+
     http://www.apache.org/licenses/LICENSE-2.0
-  
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,6 +43,8 @@
 <div>
     <div class="legend_item" style="border: none;">no status</div>
     <div class="square" style="background: white;"></div>
+    <div class="legend_item" style="border: none;">excluded</div>
+    <div class="square" style="background: purple;"></div>
     <div class="legend_item" style="border: none;">queued</div>
     <div class="square" style="background: grey;"></div>
     <div class="legend_item" style="border: none;">retry</div>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7134264536..379e0bdbba 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -42,7 +42,7 @@
 from flask_admin.actions import action
 from flask_admin.babel import lazy_gettext
 from flask_admin.tools import iterdecode
-from flask_login import flash
+from flask_login import flash, current_user as flask_user
 from flask._compat import PY2
 
 import jinja2
@@ -61,7 +61,7 @@
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.settings import Session
-from airflow.models import XCom, DagRun
+from airflow.models import XCom, DagRun, TaskExclusion, TaskExclusionType
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 
 from airflow.models import BaseOperator
@@ -1199,6 +1199,54 @@ def success(self):
                     details=details,)
             return response
 
+    @expose('/exclude')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def exclude(self):
+        # Get values from arguments
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        execution_date = request.args.get('execution_date')
+        clear = request.args.get('clear') == "true"
+        exclusion_type = TaskExclusionType.SINGLE_DATE
+
+        # Convert execution_date to Datetime object.
+        execution_date = dateutil.parser.parse(execution_date)
+
+        # Get current user
+        username = flask_user.username
+        if not username:
+            username = 'Username not found.'
+
+        if clear:
+            TaskExclusion.remove(dag_id,
+                                 task_id,
+                                 exclusion_type,
+                                 execution_date,
+                                 execution_date)
+
+            flash("Removed task exclusion for task {} in DAG {} for execution "
+                  "date {}".format(task_id,
+                                   dag_id,
+                                   execution_date.isoformat()))
+
+        else:
+            TaskExclusion.set(dag_id,
+                              task_id,
+                              exclusion_type,
+                              execution_date,
+                              execution_date,
+                              username)
+
+            flash("Added task exclusion for task {} in DAG {} for execution "
+                  "date {}".format(task_id,
+                                   dag_id,
+                                   execution_date.isoformat()))
+
+        return redirect(origin)
+
     @expose('/tree')
     @login_required
     @wwwutils.gzipped
diff --git a/tests/jobs.py b/tests/jobs.py
index 62e88e5b63..43c7d92229 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -600,6 +600,38 @@ def test_scheduler_add_new_task(self):
         tis = dr.get_task_instances()
         self.assertEquals(len(tis), 2)
 
+    def test_scheduler_does_not_run_excluded(self):
+        dag = DAG(
+            dag_id='test_scheduler_does_not_run_excluded',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+
+        tis = dr.get_task_instances(session=session)
+        for ti in tis:
+            ti.state = State.EXCLUDED
+
+        session.commit()
+        session.close()
+
+        queue = mock.Mock()
+        scheduler._process_task_instances(dag, queue=queue)
+
+        queue.put.assert_not_called()
+
     def test_scheduler_verify_max_active_runs(self):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs has been reached
diff --git a/tests/models.py b/tests/models.py
index 74103fea85..a3f3c71804 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -24,7 +24,7 @@
 
 from airflow import models, settings, AirflowException
 from airflow.exceptions import AirflowSkipException
-from airflow.models import DAG, TaskInstance as TI
+from airflow.models import DAG, TaskExclusion, TaskExclusionType, TaskInstance as TI
 from airflow.models import State as ST
 from airflow.models import DagModel
 from airflow.operators.dummy_operator import DummyOperator
@@ -35,6 +35,7 @@
 from mock import patch
 from nose_parameterized import parameterized
 
+
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
@@ -623,3 +624,101 @@ def test_xcom_pull_different_execution_date(self):
                                       key=key,
                                       include_prior_dates=True),
                          value)
+
+
+class TaskExclusionTest(unittest.TestCase):
+    session = settings.Session()
+    exec_date = datetime.datetime(2016, 1, 1, 1, 1, 1, 111111)
+    dag_id = 'test_task_exclude'
+    task_id = 'test_task_exclude'
+    exclusions = session.query(TaskExclusion).all()
+
+    def SetUp(self):
+        # Obtain all exclusions
+        self.exclusions = self.session.query(TaskExclusion).all()
+        # Clear the exclusions
+        self.session.query(TaskExclusion).delete()
+        self.session.commit()
+        self.session.expunge_all()
+
+    def TearDown(self):
+        self.session.query(TaskExclusion).delete()
+        self.session.query(TaskExclusion).add(self.exclusions)
+        self.session.commit()
+        self.session.expunge_all()
+
+    def test_set_exclusion(self):
+
+        TaskExclusion.set(dag_id=self.dag_id,
+                          task_id=self.task_id,
+                          exclusion_type=TaskExclusionType.SINGLE_DATE,
+                          exclusion_start_date=self.exec_date,
+                          exclusion_end_date=self.exec_date,
+                          created_by='airflow',
+                          session=self.session)
+
+        exclusion = self.session.query(TaskExclusion).first()
+
+        self.assertEqual(exclusion.dag_id, self.dag_id)
+        self.assertEqual(exclusion.task_id, self.task_id)
+        self.assertEqual(exclusion.exclusion_type,
+                         TaskExclusionType.SINGLE_DATE)
+        self.assertEqual(exclusion.exclusion_start_date, self.exec_date)
+        self.assertEqual(exclusion.exclusion_start_date, self.exec_date)
+        self.assertEqual(exclusion.created_by, 'airflow')
+
+    def test_remove_exclusion(self):
+        self.session.add(TaskExclusion(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            exclusion_type=TaskExclusionType.SINGLE_DATE,
+            exclusion_start_date=self.exec_date,
+            exclusion_end_date=self.exec_date,
+            created_by='airflow',
+            created_on=self.exec_date)
+        )
+
+        self.session.commit()
+
+        exclusion = self.session.query(TaskExclusion).first()
+
+        self.assertTrue(exclusion)
+
+        TaskExclusion.remove(dag_id=self.dag_id,
+                             task_id=self.task_id,
+                             exclusion_type=TaskExclusionType.SINGLE_DATE,
+                             exclusion_start_date=self.exec_date,
+                             exclusion_end_date=self.exec_date,
+                             session=self.session)
+
+        exclusion = self.session.query(TaskExclusion).first()
+
+        self.assertFalse(exclusion)
+
+    def test_should_exclude_task(self):
+        self.session.add(TaskExclusion(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            exclusion_type=TaskExclusionType.SINGLE_DATE,
+            exclusion_start_date=self.exec_date,
+            exclusion_end_date=self.exec_date,
+            created_by='airflow',
+            created_on=self.exec_date)
+        )
+
+        self.session.commit()
+
+        self.assertTrue(TaskExclusion.should_exclude_task(
+                              dag_id=self.dag_id,
+                              task_id=self.task_id,
+                              execution_date=self.exec_date))
+
+    def test_should_not_exclude_task(self):
+        self.session.query(TaskExclusion).delete()
+        self.session.commit()
+        should_exclude = TaskExclusion.should_exclude_task(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            execution_date=self.exec_date)
+
+        self.assertFalse(should_exclude)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services