You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/26 20:12:28 UTC

[GitHub] Fokko closed pull request #4374: [AIRFLOW-3459] Move DagPickle to separate file

Fokko closed pull request #4374: [AIRFLOW-3459] Move DagPickle to separate file
URL: https://github.com/apache/incubator-airflow/pull/4374
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/delete_dag.py b/airflow/api/common/experimental/delete_dag.py
index e7d15772c9..d04355f940 100644
--- a/airflow/api/common/experimental/delete_dag.py
+++ b/airflow/api/common/experimental/delete_dag.py
@@ -47,7 +47,7 @@ def delete_dag(dag_id, keep_records_in_log=True):
     count = 0
 
     # noinspection PyUnresolvedReferences,PyProtectedMember
-    for m in models.Base._decl_class_registry.values():
+    for m in models.base.Base._decl_class_registry.values():
         if hasattr(m, "dag_id"):
             if keep_records_in_log and m.__name__ == 'Log':
                 continue
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 143e2b34aa..c8840011e3 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -34,7 +34,6 @@
 from builtins import input
 from collections import namedtuple
 
-from airflow.models.connection import Connection
 from airflow.utils.timezone import parse as parsedate
 import json
 from tabulate import tabulate
@@ -56,9 +55,9 @@
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException, AirflowWebServerTimeout
 from airflow.executors import GetDefaultExecutor
-from airflow.models import (DagModel, DagBag, TaskInstance,
-                            DagPickle, DagRun, Variable, DagStat, DAG)
-
+from airflow.models import DagModel, DagBag, TaskInstance, DagRun, Variable, DagStat, DAG
+from airflow.models.connection import Connection
+from airflow.models.dagpickle import DagPickle
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import cli as cli_utils
 from airflow.utils import db as db_utils
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8472ecd383..a82190fc9d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -43,6 +43,7 @@
 from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun
+from airflow.models.dagpickle import DagPickle
 from airflow.settings import Stats
 from airflow.task.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
@@ -61,7 +62,7 @@
 from airflow.utils.sqlalchemy import UtcDateTime
 from airflow.utils.state import State
 
-Base = models.Base
+Base = models.base.Base
 ID_LEN = models.ID_LEN
 
 
@@ -2428,7 +2429,7 @@ def _execute(self, session=None):
         pickle_id = None
         if not self.donot_pickle and self.executor.__class__ not in (
                 executors.LocalExecutor, executors.SequentialExecutor):
-            pickle = models.DagPickle(self.dag)
+            pickle = DagPickle(self.dag)
             session.add(pickle)
             session.commit()
             pickle_id = pickle.id
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 4e1977635a..76c2eb329b 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -36,7 +36,7 @@
 # for 'autogenerate' support
 # from myapp import mymodel
 # target_metadata = mymodel.Base.metadata
-target_metadata = models.Base.metadata
+target_metadata = models.base.Base.metadata
 
 # other values from the config, defined by the needs of env.py,
 # can be acquired:
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index aa93f5cb88..fcd23fe1fb 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -28,6 +28,8 @@
 from builtins import ImportError as BuiltinImportError, bytes, object, str
 from future.standard_library import install_aliases
 
+from airflow.models.base import Base
+
 try:
     # Fix Python > 3.7 deprecation
     from collections.abc import Hashable
@@ -64,10 +66,10 @@
 
 from sqlalchemy import (
     Boolean, Column, DateTime, Float, ForeignKey, ForeignKeyConstraint, Index,
-    Integer, LargeBinary, PickleType, String, Text, UniqueConstraint, MetaData,
-    and_, asc, func, or_, true as sqltrue
+    Integer, LargeBinary, PickleType, String, Text, UniqueConstraint, and_, asc,
+    func, or_, true as sqltrue
 )
-from sqlalchemy.ext.declarative import declarative_base, declared_attr
+from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.orm import reconstructor, relationship, synonym
 
 from croniter import (
@@ -84,6 +86,7 @@
 )
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.lineage import apply_lineage, prepare_lineage
+from airflow.models.dagpickle import DagPickle
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
@@ -108,13 +111,6 @@
 
 install_aliases()
 
-SQL_ALCHEMY_SCHEMA = configuration.get('core', 'SQL_ALCHEMY_SCHEMA')
-
-if not SQL_ALCHEMY_SCHEMA or SQL_ALCHEMY_SCHEMA.isspace():
-    Base = declarative_base()
-else:
-    Base = declarative_base(metadata=MetaData(schema=SQL_ALCHEMY_SCHEMA))
-
 ID_LEN = 250
 XCOM_RETURN_KEY = 'return_value'
 
@@ -613,32 +609,6 @@ def is_superuser(self):
         return self.superuser
 
 
-class DagPickle(Base):
-    """
-    Dags can originate from different places (user repos, master repo, ...)
-    and also get executed in different places (different executors). This
-    object represents a version of a DAG and becomes a source of truth for
-    a BackfillJob execution. A pickle is a native python serialized object,
-    and in this case gets stored in the database for the duration of the job.
-
-    The executors pick up the DagPickle id and read the dag definition from
-    the database.
-    """
-    id = Column(Integer, primary_key=True)
-    pickle = Column(PickleType(pickler=dill))
-    created_dttm = Column(UtcDateTime, default=timezone.utcnow)
-    pickle_hash = Column(Text)
-
-    __tablename__ = "dag_pickle"
-
-    def __init__(self, dag):
-        self.dag_id = dag.dag_id
-        if hasattr(dag, 'template_env'):
-            dag.template_env = None
-        self.pickle_hash = hash(dag)
-        self.pickle = dag
-
-
 class TaskInstance(Base, LoggingMixin):
     """
     Task instances store the state of a task instance. This table is the
diff --git a/airflow/models/base.py b/airflow/models/base.py
new file mode 100644
index 0000000000..1a06b0b625
--- /dev/null
+++ b/airflow/models/base.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import MetaData
+from sqlalchemy.ext.declarative import declarative_base
+
+import airflow
+
+SQL_ALCHEMY_SCHEMA = airflow.configuration.get("core", "SQL_ALCHEMY_SCHEMA")
+
+if not SQL_ALCHEMY_SCHEMA or SQL_ALCHEMY_SCHEMA.isspace():
+    Base = declarative_base()
+else:
+    Base = declarative_base(metadata=MetaData(schema=SQL_ALCHEMY_SCHEMA))
diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py
new file mode 100644
index 0000000000..e261ab1ac0
--- /dev/null
+++ b/airflow/models/dagpickle.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import dill
+from sqlalchemy import Column, Integer, PickleType, Text
+
+from airflow.models.base import Base
+from airflow.utils import timezone
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class DagPickle(Base):
+    """
+    Dags can originate from different places (user repos, master repo, ...)
+    and also get executed in different places (different executors). This
+    object represents a version of a DAG and becomes a source of truth for
+    a BackfillJob execution. A pickle is a native python serialized object,
+    and in this case gets stored in the database for the duration of the job.
+
+    The executors pick up the DagPickle id and read the dag definition from
+    the database.
+    """
+
+    id = Column(Integer, primary_key=True)
+    pickle = Column(PickleType(pickler=dill))
+    created_dttm = Column(UtcDateTime, default=timezone.utcnow)
+    pickle_hash = Column(Text)
+
+    __tablename__ = "dag_pickle"
+
+    def __init__(self, dag):
+        self.dag_id = dag.dag_id
+        if hasattr(dag, 'template_env'):
+            dag.template_env = None
+        self.pickle_hash = hash(dag)
+        self.pickle = dag
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 7bd67ce533..be459cf4eb 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -358,7 +358,7 @@ def resetdb(rbac):
 
     log.info("Dropping tables that exist")
 
-    models.Base.metadata.drop_all(settings.engine)
+    models.base.Base.metadata.drop_all(settings.engine)
     mc = MigrationContext.configure(settings.engine)
     if mc._version.exists(settings.engine):
         mc._version.drop(settings.engine)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services