You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/30 09:51:20 UTC

[GitHub] Fokko closed pull request #4378: AIRFLOW-3573 - Remove DagStat table

Fokko closed pull request #4378: AIRFLOW-3573 - Remove DagStat table
URL: https://github.com/apache/incubator-airflow/pull/4378
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index c8840011e3..f4bee90c52 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -55,7 +55,7 @@
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException, AirflowWebServerTimeout
 from airflow.executors import GetDefaultExecutor
-from airflow.models import DagModel, DagBag, TaskInstance, DagRun, Variable, DagStat, DAG
+from airflow.models import DagModel, DagBag, TaskInstance, DagRun, Variable, DAG
 from airflow.models.connection import Connection
 from airflow.models.dagpickle import DagPickle
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
@@ -1109,18 +1109,6 @@ def upgradedb(args):
     print("DB: " + repr(settings.engine.url))
     db_utils.upgradedb()
 
-    # Populate DagStats table
-    session = settings.Session()
-    ds_rows = session.query(DagStat).count()
-    if not ds_rows:
-        qry = (
-            session.query(DagRun.dag_id, DagRun.state, func.count('*'))
-                   .group_by(DagRun.dag_id, DagRun.state)
-        )
-        for dag_id, state, count in qry:
-            session.add(DagStat(dag_id=dag_id, state=state, count=count))
-        session.commit()
-
 
 @cli_utils.action_logging
 def version(args):
diff --git a/airflow/jobs.py b/airflow/jobs.py
index a82190fc9d..f71fa3cd63 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1446,8 +1446,6 @@ def _process_dags(self, dagbag, dags, tis_out):
             self._process_task_instances(dag, tis_out)
             self.manage_slas(dag)
 
-        models.DagStat.update([d.dag_id for d in dags])
-
     @provide_session
     def _process_executor_events(self, simple_dag_bag, session=None):
         """
@@ -2324,9 +2322,6 @@ def _process_backfill_task_instances(self,
                     ti_status.active_runs.remove(run)
                     executed_run_dates.append(run.execution_date)
 
-                if run.dag.is_paused:
-                    models.DagStat.update([run.dag_id], session=session)
-
             self._log_progress(ti_status)
 
         # return updated status
diff --git a/airflow/migrations/versions/a56c9515abdc_remove_dag_stat_table.py b/airflow/migrations/versions/a56c9515abdc_remove_dag_stat_table.py
new file mode 100644
index 0000000000..4cba5a0fbb
--- /dev/null
+++ b/airflow/migrations/versions/a56c9515abdc_remove_dag_stat_table.py
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Remove dag_stat table
+
+Revision ID: a56c9515abdc
+Revises: c8ffec048a3b
+Create Date: 2018-12-27 10:27:59.715872
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'a56c9515abdc'
+down_revision = 'c8ffec048a3b'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    op.drop_table("dag_stats")
+
+
+def downgrade():
+    op.create_table('dag_stats',
+                    sa.Column('dag_id', sa.String(length=250), nullable=False),
+                    sa.Column('state', sa.String(length=50), nullable=False),
+                    sa.Column('count', sa.Integer(), nullable=False, default=0),
+                    sa.Column('dirty', sa.Boolean(), nullable=False, default=False),
+                    sa.PrimaryKeyConstraint('dag_id', 'state'))
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index cbcf586ac2..5b24bde81c 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -3718,7 +3718,6 @@ def set_dag_runs_state(
         for dr in drs:
             dr.state = state
             dirty_ids.append(dr.dag_id)
-        DagStat.update(dirty_ids, session=session)
 
     @provide_session
     def clear(
@@ -4149,8 +4148,6 @@ def create_dagrun(self,
         )
         session.add(run)
 
-        DagStat.set_dirty(dag_id=self.dag_id, session=session)
-
         session.commit()
 
         run.dag = self
@@ -4652,122 +4649,6 @@ def delete(cls, xcoms, session=None):
         session.commit()
 
 
-class DagStat(Base):
-    __tablename__ = "dag_stats"
-
-    dag_id = Column(String(ID_LEN), primary_key=True)
-    state = Column(String(50), primary_key=True)
-    count = Column(Integer, default=0, nullable=False)
-    dirty = Column(Boolean, default=False, nullable=False)
-
-    def __init__(self, dag_id, state, count=0, dirty=False):
-        self.dag_id = dag_id
-        self.state = state
-        self.count = count
-        self.dirty = dirty
-
-    @staticmethod
-    @provide_session
-    def set_dirty(dag_id, session=None):
-        """
-        :param dag_id: the dag_id to mark dirty
-        :param session: database session
-        :return:
-        """
-        DagStat.create(dag_id=dag_id, session=session)
-
-        try:
-            stats = session.query(DagStat).filter(
-                DagStat.dag_id == dag_id
-            ).with_for_update().all()
-
-            for stat in stats:
-                stat.dirty = True
-            session.commit()
-        except Exception as e:
-            session.rollback()
-            log = LoggingMixin().log
-            log.warning("Could not update dag stats for %s", dag_id)
-            log.exception(e)
-
-    @staticmethod
-    @provide_session
-    def update(dag_ids=None, dirty_only=True, session=None):
-        """
-        Updates the stats for dirty/out-of-sync dags
-
-        :param dag_ids: dag_ids to be updated
-        :type dag_ids: list
-        :param dirty_only: only updated for marked dirty, defaults to True
-        :type dirty_only: bool
-        :param session: db session to use
-        :type session: Session
-        """
-        try:
-            qry = session.query(DagStat)
-            if dag_ids:
-                qry = qry.filter(DagStat.dag_id.in_(set(dag_ids)))
-            if dirty_only:
-                qry = qry.filter(DagStat.dirty == True) # noqa
-
-            qry = qry.with_for_update().all()
-
-            ids = set([dag_stat.dag_id for dag_stat in qry])
-
-            # avoid querying with an empty IN clause
-            if len(ids) == 0:
-                session.commit()
-                return
-
-            dagstat_states = set(itertools.product(ids, State.dag_states))
-            qry = (
-                session.query(DagRun.dag_id, DagRun.state, func.count('*'))
-                .filter(DagRun.dag_id.in_(ids))
-                .group_by(DagRun.dag_id, DagRun.state)
-            )
-
-            counts = {(dag_id, state): count for dag_id, state, count in qry}
-            for dag_id, state in dagstat_states:
-                count = 0
-                if (dag_id, state) in counts:
-                    count = counts[(dag_id, state)]
-
-                session.merge(
-                    DagStat(dag_id=dag_id, state=state, count=count, dirty=False)
-                )
-
-            session.commit()
-        except Exception as e:
-            session.rollback()
-            log = LoggingMixin().log
-            log.warning("Could not update dag stat table")
-            log.exception(e)
-
-    @staticmethod
-    @provide_session
-    def create(dag_id, session=None):
-        """
-        Creates the missing states the stats table for the dag specified
-
-        :param dag_id: dag id of the dag to create stats for
-        :param session: database session
-        :return:
-        """
-        # unfortunately sqlalchemy does not know upsert
-        qry = session.query(DagStat).filter(DagStat.dag_id == dag_id).all()
-        states = {dag_stat.state for dag_stat in qry}
-        for state in State.dag_states:
-            if state not in states:
-                try:
-                    session.merge(DagStat(dag_id=dag_id, state=state))
-                    session.commit()
-                except Exception as e:
-                    session.rollback()
-                    log = LoggingMixin().log
-                    log.warning("Could not create stat record")
-                    log.exception(e)
-
-
 class DagRun(Base, LoggingMixin):
     """
     DagRun describes an instance of a Dag. It can be created
@@ -4814,13 +4695,6 @@ def set_state(self, state):
             self._state = state
             self.end_date = timezone.utcnow() if self._state in State.finished() else None
 
-            if self.dag_id is not None:
-                # FIXME: Due to the scoped_session factor we we don't get a clean
-                # session here, so something really weird goes on:
-                # if you try to close the session dag runs will end up detached
-                session = settings.Session()
-                DagStat.set_dirty(self.dag_id, session=session)
-
     @declared_attr
     def state(self):
         return synonym('_state',
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 642a63a2e2..5bde8d3ee8 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1872,19 +1872,16 @@ def paused(self, session=None):
     @wwwutils.action_logging
     @provide_session
     def refresh(self, session=None):
-        DagModel = models.DagModel
+        # TODO: Is this method still needed after AIRFLOW-3561?
+        dm = models.DagModel
         dag_id = request.args.get('dag_id')
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == dag_id).first()
+        orm_dag = session.query(dm).filter(dm.dag_id == dag_id).first()
 
         if orm_dag:
             orm_dag.last_expired = timezone.utcnow()
             session.merge(orm_dag)
         session.commit()
 
-        models.DagStat.update([dag_id], session=session, dirty_only=False)
-
-        dagbag.get_dag(dag_id)
         flash("DAG [{}] is now fresh as a daisy".format(dag_id))
         return redirect(request.referrer)
 
@@ -1892,7 +1889,7 @@ def refresh(self, session=None):
     @login_required
     @wwwutils.action_logging
     def refresh_all(self):
-        dagbag.collect_dags(only_if_updated=False)
+        # TODO: Is this method still needed after AIRFLOW-3561?
         flash("All DAGs are now up to date")
         return redirect('/')
 
@@ -2702,7 +2699,6 @@ def action_new_delete(self, ids, session=None):
         dirty_ids = []
         for row in deleted:
             dirty_ids.append(row.dag_id)
-        models.DagStat.update(dirty_ids, dirty_only=False, session=session)
 
     @action('set_running', "Set state to 'running'", None)
     @provide_session
@@ -2716,7 +2712,6 @@ def action_set_running(self, ids, session=None):
                 count += 1
                 dr.state = State.RUNNING
                 dr.start_date = timezone.utcnow()
-            models.DagStat.update(dirty_ids, session=session)
             flash(
                 "{count} dag runs were set to running".format(**locals()))
         except Exception as ex:
@@ -2741,7 +2736,6 @@ def action_set_failed(self, ids, session=None):
                                                 dr.execution_date,
                                                 commit=True,
                                                 session=session)
-            models.DagStat.update(dirty_ids, session=session)
             altered_ti_count = len(altered_tis)
             flash(
                 "{count} dag runs and {altered_ti_count} task instances "
@@ -2768,7 +2762,6 @@ def action_set_success(self, ids, session=None):
                                                  dr.execution_date,
                                                  commit=True,
                                                  session=session)
-            models.DagStat.update(dirty_ids, session=session)
             altered_ti_count = len(altered_tis)
             flash(
                 "{count} dag runs and {altered_ti_count} task instances "
@@ -2802,7 +2795,6 @@ def after_model_change(self, form, dagrun, is_created, session=None):
                 session=session)
 
         altered_ti_count = len(altered_tis)
-        models.DagStat.update([dagrun.dag_id], session=session)
         flash(
             "1 dag run and {altered_ti_count} task instances "
             "were set to '{dagrun.state}'".format(**locals()))
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 42328279a3..5c4476a667 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -1634,8 +1634,6 @@ def refresh(self, session=None):
         # sync dag permission
         appbuilder.sm.sync_perm_for_dag(dag_id)
 
-        models.DagStat.update([dag_id], session=session, dirty_only=False)
-
         dagbag.get_dag(dag_id)
         flash("DAG [{}] is now fresh as a daisy".format(dag_id))
         return redirect(request.referrer)
@@ -2172,7 +2170,6 @@ def action_muldelete(self, items, session=None):
         dirty_ids = []
         for item in items:
             dirty_ids.append(item.dag_id)
-        models.DagStat.update(dirty_ids, dirty_only=False, session=session)
         return redirect(self.get_redirect())
 
     @action('set_running', "Set state to 'running'", '', single=False)
@@ -2188,7 +2185,6 @@ def action_set_running(self, drs, session=None):
                 count += 1
                 dr.start_date = timezone.utcnow()
                 dr.state = State.RUNNING
-            models.DagStat.update(dirty_ids, session=session)
             session.commit()
             flash("{count} dag runs were set to running".format(**locals()))
         except Exception as ex:
@@ -2215,7 +2211,6 @@ def action_set_failed(self, drs, session=None):
                                                 dr.execution_date,
                                                 commit=True,
                                                 session=session)
-            models.DagStat.update(dirty_ids, session=session)
             altered_ti_count = len(altered_tis)
             flash(
                 "{count} dag runs and {altered_ti_count} task instances "
@@ -2243,7 +2238,6 @@ def action_set_success(self, drs, session=None):
                                                  dr.execution_date,
                                                  commit=True,
                                                  session=session)
-            models.DagStat.update(dirty_ids, session=session)
             altered_ti_count = len(altered_tis)
             flash(
                 "{count} dag runs and {altered_ti_count} task instances "
diff --git a/tests/api/common/experimental/test_delete_dag.py b/tests/api/common/experimental/test_delete_dag.py
index a012e5d3d0..70a2692c24 100644
--- a/tests/api/common/experimental/test_delete_dag.py
+++ b/tests/api/common/experimental/test_delete_dag.py
@@ -28,7 +28,6 @@
 from airflow.utils.state import State
 
 DM = models.DagModel
-DS = models.DagStat
 DR = models.DagRun
 TI = models.TaskInstance
 LOG = models.Log
@@ -51,7 +50,7 @@ def test_delete_dag_non_existent_dag(self):
             delete_dag("non-existent DAG")
 
     def test_delete_dag_dag_still_in_dagbag(self):
-        models_to_check = ['DagModel', 'DagStat', 'DagRun', 'TaskInstance']
+        models_to_check = ['DagModel', 'DagRun', 'TaskInstance']
         record_counts = {}
 
         for model_name in models_to_check:
@@ -84,7 +83,6 @@ def setUp(self):
                              owner='airflow')
 
         self.session.add(DM(dag_id=self.key))
-        self.session.add(DS(dag_id=self.key, state=State.SUCCESS))
         self.session.add(DR(dag_id=self.key))
         self.session.add(TI(task=task,
                             execution_date=days_ago(1),
@@ -96,7 +94,6 @@ def setUp(self):
 
     def tearDown(self):
         self.session.query(DM).filter(DM.dag_id == self.key).delete()
-        self.session.query(DS).filter(DS.dag_id == self.key).delete()
         self.session.query(DR).filter(DR.dag_id == self.key).delete()
         self.session.query(TI).filter(TI.dag_id == self.key).delete()
         self.session.query(LOG).filter(LOG.dag_id == self.key).delete()
@@ -107,7 +104,6 @@ def tearDown(self):
     def test_delete_dag_successful_delete(self):
 
         self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 1)
-        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 1)
         self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 1)
         self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 1)
         self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 1)
@@ -115,7 +111,6 @@ def test_delete_dag_successful_delete(self):
         delete_dag(dag_id=self.key)
 
         self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 0)
-        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 0)
         self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 0)
         self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 0)
         self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 1)
@@ -123,7 +118,6 @@ def test_delete_dag_successful_delete(self):
     def test_delete_dag_successful_delete_not_keeping_records_in_log(self):
 
         self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 1)
-        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 1)
         self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 1)
         self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 1)
         self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 1)
@@ -131,7 +125,6 @@ def test_delete_dag_successful_delete_not_keeping_records_in_log(self):
         delete_dag(dag_id=self.key, keep_records_in_log=False)
 
         self.assertEqual(self.session.query(DM).filter(DM.dag_id == self.key).count(), 0)
-        self.assertEqual(self.session.query(DS).filter(DS.dag_id == self.key).count(), 0)
         self.assertEqual(self.session.query(DR).filter(DR.dag_id == self.key).count(), 0)
         self.assertEqual(self.session.query(TI).filter(TI.dag_id == self.key).count(), 0)
         self.assertEqual(self.session.query(LOG).filter(LOG.dag_id == self.key).count(), 0)
diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py
index 9afe31c951..0cb8a4e888 100644
--- a/tests/api/common/experimental/test_mark_tasks.py
+++ b/tests/api/common/experimental/test_mark_tasks.py
@@ -515,7 +515,6 @@ def tearDown(self):
 
         self.session.query(models.DagRun).delete()
         self.session.query(models.TaskInstance).delete()
-        self.session.query(models.DagStat).delete()
         self.session.commit()
         self.session.close()
 
diff --git a/tests/core.py b/tests/core.py
index efae7f4b1e..556a0646dd 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -956,77 +956,6 @@ def test_task_fail_duration(self):
         self.assertEqual(1, len(f_fails))
         self.assertGreaterEqual(sum([f.duration for f in f_fails]), 3)
 
-    def test_dag_stats(self):
-        """Correctly sets/dirties/cleans rows of DagStat table"""
-
-        session = settings.Session()
-
-        session.query(models.DagRun).delete()
-        session.query(models.DagStat).delete()
-        session.commit()
-
-        models.DagStat.update([], session=session)
-
-        self.dag_bash.create_dagrun(
-            run_id="run1",
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING)
-
-        models.DagStat.update([self.dag_bash.dag_id], session=session)
-
-        qry = session.query(models.DagStat).all()
-
-        self.assertEqual(3, len(qry))
-        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
-        for stats in qry:
-            if stats.state == State.RUNNING:
-                self.assertEqual(stats.count, 1)
-            else:
-                self.assertEqual(stats.count, 0)
-            self.assertFalse(stats.dirty)
-
-        self.dag_bash.create_dagrun(
-            run_id="run2",
-            execution_date=DEFAULT_DATE + timedelta(days=1),
-            state=State.RUNNING)
-
-        models.DagStat.update([self.dag_bash.dag_id], session=session)
-
-        qry = session.query(models.DagStat).all()
-
-        self.assertEqual(3, len(qry))
-        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
-        for stats in qry:
-            if stats.state == State.RUNNING:
-                self.assertEqual(stats.count, 2)
-            else:
-                self.assertEqual(stats.count, 0)
-            self.assertFalse(stats.dirty)
-
-        session.query(models.DagRun).first().state = State.SUCCESS
-        session.commit()
-
-        models.DagStat.update([self.dag_bash.dag_id], session=session)
-
-        qry = session.query(models.DagStat).filter(models.DagStat.state == State.SUCCESS).all()
-        self.assertEqual(1, len(qry))
-        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
-        self.assertEqual(State.SUCCESS, qry[0].state)
-        self.assertEqual(1, qry[0].count)
-        self.assertFalse(qry[0].dirty)
-
-        qry = session.query(models.DagStat).filter(models.DagStat.state == State.RUNNING).all()
-        self.assertEqual(1, len(qry))
-        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
-        self.assertEqual(State.RUNNING, qry[0].state)
-        self.assertEqual(1, qry[0].count)
-        self.assertFalse(qry[0].dirty)
-
-        session.query(models.DagRun).delete()
-        session.query(models.DagStat).delete()
-        session.commit()
-        session.close()
-
     def test_run_command(self):
         if six.PY3:
             write = r'sys.stdout.buffer.write("\u1000foo".encode("utf8"))'
diff --git a/tests/models.py b/tests/models.py
index 2da84e6520..2b4e9ef797 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -42,7 +42,7 @@
 from airflow.exceptions import AirflowDagCycleException, AirflowSkipException
 from airflow.jobs import BackfillJob
 from airflow.models import DAG, TaskInstance as TI
-from airflow.models import DagModel, DagRun, DagStat
+from airflow.models import DagModel, DagRun
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier
 from airflow.models import SkipMixin
 from airflow.models import State as ST
@@ -782,75 +782,6 @@ def test_sync_to_db_default_view(self, mock_now):
         self.assertEqual(orm_dag.get_default_view(), "graph")
 
 
-class DagStatTest(unittest.TestCase):
-    def test_dagstats_crud(self):
-        DagStat.create(dag_id='test_dagstats_crud')
-
-        session = settings.Session()
-        qry = session.query(DagStat).filter(DagStat.dag_id == 'test_dagstats_crud')
-        self.assertEqual(len(qry.all()), len(State.dag_states))
-
-        DagStat.set_dirty(dag_id='test_dagstats_crud')
-        res = qry.all()
-
-        for stat in res:
-            self.assertTrue(stat.dirty)
-
-        # create missing
-        DagStat.set_dirty(dag_id='test_dagstats_crud_2')
-        qry2 = session.query(DagStat).filter(DagStat.dag_id == 'test_dagstats_crud_2')
-        self.assertEqual(len(qry2.all()), len(State.dag_states))
-
-        dag = DAG(
-            'test_dagstats_crud',
-            start_date=DEFAULT_DATE,
-            default_args={'owner': 'owner1'})
-
-        with dag:
-            DummyOperator(task_id='A')
-
-        now = timezone.utcnow()
-        dag.create_dagrun(
-            run_id='manual__' + now.isoformat(),
-            execution_date=now,
-            start_date=now,
-            state=State.FAILED,
-            external_trigger=False,
-        )
-
-        DagStat.update(dag_ids=['test_dagstats_crud'])
-        res = qry.all()
-        for stat in res:
-            if stat.state == State.FAILED:
-                self.assertEqual(stat.count, 1)
-            else:
-                self.assertEqual(stat.count, 0)
-
-        DagStat.update()
-        res = qry2.all()
-        for stat in res:
-            self.assertFalse(stat.dirty)
-
-    def test_update_exception(self):
-        session = Mock()
-        (session.query.return_value
-            .filter.return_value
-            .with_for_update.return_value
-            .all.side_effect) = RuntimeError('it broke')
-        DagStat.update(session=session)
-        session.rollback.assert_called()
-
-    def test_set_dirty_exception(self):
-        session = Mock()
-        session.query.return_value.filter.return_value.all.return_value = []
-        (session.query.return_value
-            .filter.return_value
-            .with_for_update.return_value
-            .all.side_effect) = RuntimeError('it broke')
-        DagStat.set_dirty('dag', session)
-        session.rollback.assert_called()
-
-
 class DagRunTest(unittest.TestCase):
 
     def create_dag_run(self, dag,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services