You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/11 10:19:29 UTC

[GitHub] Fokko closed pull request #4383: [AIRFLOW-3475] Move ImportError out of models.py

Fokko closed pull request #4383: [AIRFLOW-3475] Move ImportError out of models.py
URL: https://github.com/apache/airflow/pull/4383
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8771405c48..0fde7e732c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -42,7 +42,7 @@
 from airflow import configuration as conf
 from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
-from airflow.models import DAG, DagRun
+from airflow.models import DAG, DagRun, errors
 from airflow.models.dagpickle import DagPickle
 from airflow.settings import Stats
 from airflow.task.task_runner import get_task_runner
@@ -755,13 +755,13 @@ def update_import_errors(session, dagbag):
         """
         # Clear the errors of the processed files
         for dagbag_file in dagbag.file_last_changed:
-            session.query(models.ImportError).filter(
-                models.ImportError.filename == dagbag_file
+            session.query(errors.ImportError).filter(
+                errors.ImportError.filename == dagbag_file
             ).delete()
 
         # Add the errors of the processed files
         for filename, stacktrace in six.iteritems(dagbag.import_errors):
-            session.add(models.ImportError(
+            session.add(errors.ImportError(
                 filename=filename,
                 stacktrace=stacktrace))
         session.commit()
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index b2561e2fdb..51f7f057a1 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -5166,14 +5166,6 @@ def __repr__(self):
             self.dag_id, self.task_id, self.execution_date.isoformat()))
 
 
-class ImportError(Base):
-    __tablename__ = "import_error"
-    id = Column(Integer, primary_key=True)
-    timestamp = Column(UtcDateTime)
-    filename = Column(String(1024))
-    stacktrace = Column(Text)
-
-
 class KubeResourceVersion(Base):
     __tablename__ = "kube_resource_version"
     one_row_id = Column(Boolean, server_default=sqltrue(), primary_key=True)
diff --git a/airflow/models/errors.py b/airflow/models/errors.py
new file mode 100644
index 0000000000..6a3797ca3d
--- /dev/null
+++ b/airflow/models/errors.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import Integer, Column, String, Text
+
+from airflow.models.base import Base
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class ImportError(Base):
+    __tablename__ = "import_error"
+    id = Column(Integer, primary_key=True)
+    timestamp = Column(UtcDateTime)
+    filename = Column(String(1024))
+    stacktrace = Column(Text)
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6425f77745..63a21277fe 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -46,6 +46,7 @@
 from airflow import configuration as conf
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
+from airflow.models import errors
 from airflow.settings import logging_class_path
 from airflow.utils import timezone
 from airflow.utils.db import provide_session
@@ -929,10 +930,10 @@ def clear_nonexistent_import_errors(self, session):
         :param session: session for ORM operations
         :type session: sqlalchemy.orm.session.Session
         """
-        query = session.query(airflow.models.ImportError)
+        query = session.query(errors.ImportError)
         if self._file_paths:
             query = query.filter(
-                ~airflow.models.ImportError.filename.in_(self._file_paths)
+                ~errors.ImportError.filename.in_(self._file_paths)
             )
         query.delete(synchronize_session='fetch')
         session.commit()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 29b5931bce..d88c6c850a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -67,7 +67,7 @@
                                                         set_dag_run_state_to_success,
                                                         set_dag_run_state_to_failed)
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
+from airflow.models import BaseOperator, errors
 from airflow.models import XCom, DagRun
 from airflow.models.connection import Connection
 from airflow.operators.subdag_operator import SubDagOperator
@@ -2144,7 +2144,7 @@ def get_int_arg(value, default=0):
                     in sql_query
                     .all()}
 
-        import_errors = session.query(models.ImportError).all()
+        import_errors = session.query(errors.ImportError).all()
         for ie in import_errors:
             flash(
                 "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 55e160bc55..26a0db84c3 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -54,7 +54,7 @@
 from airflow import settings
 from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_success,
                                                         set_dag_run_state_to_failed)
-from airflow.models import XCom, DagRun
+from airflow.models import XCom, DagRun, errors
 from airflow.models.connection import Connection
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 from airflow.utils import timezone
@@ -189,7 +189,7 @@ def get_int_arg(value, default=0):
         if hide_paused:
             sql_query = sql_query.filter(~DM.is_paused)
 
-        import_errors = session.query(models.ImportError).all()
+        import_errors = session.query(errors.ImportError).all()
         for ie in import_errors:
             flash(
                 "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
diff --git a/tests/test_jobs.py b/tests/test_jobs.py
index 9d67753c35..7cb497c807 100644
--- a/tests/test_jobs.py
+++ b/tests/test_jobs.py
@@ -45,7 +45,8 @@
 import airflow.example_dags
 from airflow.executors import BaseExecutor, SequentialExecutor
 from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob
-from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI, \
+    errors
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
@@ -1262,7 +1263,7 @@ def setUp(self):
         self.dagbag = DagBag()
         with create_session() as session:
             session.query(models.DagRun).delete()
-            session.query(models.ImportError).delete()
+            session.query(errors.ImportError).delete()
             session.commit()
 
     @staticmethod
@@ -3229,7 +3230,7 @@ def test_add_unparseable_file_before_sched_start_creates_import_error(self):
             shutil.rmtree(dags_folder)
 
         with create_session() as session:
-            import_errors = session.query(models.ImportError).all()
+            import_errors = session.query(errors.ImportError).all()
 
         self.assertEqual(len(import_errors), 1)
         import_error = import_errors[0]
@@ -3251,7 +3252,7 @@ def test_add_unparseable_file_after_sched_start_creates_import_error(self):
             shutil.rmtree(dags_folder)
 
         with create_session() as session:
-            import_errors = session.query(models.ImportError).all()
+            import_errors = session.query(errors.ImportError).all()
 
         self.assertEqual(len(import_errors), 1)
         import_error = import_errors[0]
@@ -3272,7 +3273,7 @@ def test_no_import_errors_with_parseable_dag(self):
             shutil.rmtree(dags_folder)
 
         with create_session() as session:
-            import_errors = session.query(models.ImportError).all()
+            import_errors = session.query(errors.ImportError).all()
 
         self.assertEqual(len(import_errors), 0)
 
@@ -3297,7 +3298,7 @@ def test_new_import_error_replaces_old(self):
             shutil.rmtree(dags_folder)
 
         session = settings.Session()
-        import_errors = session.query(models.ImportError).all()
+        import_errors = session.query(errors.ImportError).all()
 
         self.assertEqual(len(import_errors), 1)
         import_error = import_errors[0]
@@ -3325,7 +3326,7 @@ def test_remove_error_clears_import_error(self):
             shutil.rmtree(dags_folder)
 
         session = settings.Session()
-        import_errors = session.query(models.ImportError).all()
+        import_errors = session.query(errors.ImportError).all()
 
         self.assertEqual(len(import_errors), 0)
 
@@ -3345,7 +3346,7 @@ def test_remove_file_clears_import_error(self):
         self.run_single_scheduler_loop_with_no_dags(dags_folder)
 
         with create_session() as session:
-            import_errors = session.query(models.ImportError).all()
+            import_errors = session.query(errors.ImportError).all()
 
         self.assertEqual(len(import_errors), 0)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services