You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/20 14:00:43 UTC
[GitHub] [airflow] anitakar opened a new pull request #7217: [AIRFLOW-NNNN]
Store DAG's source code in the serialized_dag table
anitakar opened a new pull request #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217
Store DAG's source code in the serialized_dag table and query from here when the Code view is opened for the DAG.
The webserver no longer needs access to the dags folder in the shared filesystem.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/c997cab42d8695ac444e63dfe4b948a7ea82ed89&el=desc) will **decrease** coverage by `0.00%`.
> The diff coverage is `90.40%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.93% 86.93% -0.01%
==========================================
Files 909 910 +1
Lines 43975 44066 +91
==========================================
+ Hits 38229 38308 +79
- Misses 5746 5758 +12
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.70%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `92.47% <92.47%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `88.88% <100.00%> (+1.05%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `76.27% <100.00%> (+0.01%)` | :arrow_up: |
| ... and [2 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [c997cab...a7fbf91](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391762832
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ old_version = session.query(
+ DagCode.fileloc, DagCode.fileloc_hash, DagCode.last_updated) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
Review comment:
```suggestion
.one_or_none()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383234120
##########
File path: airflow/models/serialized_dag.py
##########
@@ -72,23 +72,10 @@ class SerializedDagModel(Base):
def __init__(self, dag: DAG):
self.dag_id = dag.dag_id
self.fileloc = dag.full_filepath
- self.fileloc_hash = self.dag_fileloc_hash(self.fileloc)
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
self.data = SerializedDAG.to_dict(dag)
self.last_updated = timezone.utcnow()
- @staticmethod
- def dag_fileloc_hash(full_filepath: str) -> int:
- """"Hashing file location for indexing.
-
- :param full_filepath: full filepath of DAG file
- :return: hashed full_filepath
- """
- # hashing is needed because the length of fileloc is 2000 as an Airflow convention,
- # which is over the limit of indexing. If we can reduce the length of fileloc, then
- # hashing is not needed.
- return int.from_bytes(
- hashlib.sha1(full_filepath.encode('utf-8')).digest()[-2:], byteorder='big', signed=False)
-
@classmethod
@provide_session
def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session=None):
Review comment:
It is here: https://github.com/apache/airflow/pull/7217/files#diff-40e3d1f11ed05554009ad08fa603a1e8L111
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r392266491
##########
File path: airflow/config_templates/config.yml
##########
@@ -335,6 +335,15 @@
type: string
example: ~
default: "True"
+ - name: store_dag_code
Review comment:
I have moved it just below min_serialized_dag_update_interval because it seemed to me that it should stay just below store_serialized_dags. But I have no strong opinion here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383226439
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -40,7 +40,9 @@
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagRun, SlaMiss, errors
+from airflow.models.dagcode import DagCodeModel
Review comment:
```suggestion
from airflow.models.dagcode import DagCode
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383196267
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text())
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
+ return source_code
+
+ @provide_session
+ def write_code(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ session.merge(self)
Review comment:
I'm not a fan of this method -- it's a bit to un-obvious and out of line with what we do elsewhere in Airflow. Please just to `session.merge(dagcode)` etc at the calling locations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383200591
##########
File path: airflow/config_templates/config.yml
##########
@@ -323,6 +323,15 @@
type: string
example: ~
default: "True"
+ - name: store_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
Review comment:
Actually, that is how I started. But then @mik-laj pointed out that it may have huge performance impact. I believe it is safer to hide it behind a feature flag now, so that users can opt to turn it off in case there are problems with it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383225969
##########
File path: airflow/config_templates/config.yml
##########
@@ -323,6 +323,15 @@
type: string
example: ~
default: "True"
+ - name: store_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
Review comment:
Can we change it to `store_dag_code`? I just feel `store_code` is quite broad. What do you all think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391774159
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
In this case it seems to me that better would be enemy of good enough :)
There is no good place in current architecture. I agree that this solution is better than previous one. And it seems to me that DagBag will definitely gain functionality and this once proven to work will not be changed anymore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391639849
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
If you move the methods of reading code from DagModel to DagCode then you can avoid loading Dag here, because DagModel contains sufficient information.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383202707
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
Review comment:
Sure, it is a bad practice in any language to have heavy processing in something that seems to be a getter.
My question is, can you somehow cache value as for example _source_code in a class that represents a table in sql alchemy and whatever we have on top of it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/3bb60afc7b8319996385d681faac342afe2b3bd2&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.83%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.89% 86.41% -0.49%
==========================================
Files 906 907 +1
Lines 43906 44018 +112
==========================================
- Hits 38153 38039 -114
- Misses 5753 5979 +226
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.32%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `92.47% <92.47%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.97% <100.00%> (+0.16%)` | :arrow_up: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.55% <100.00%> (+0.35%)` | :arrow_up: |
| ... and [18 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [ccbaf57...50f4874](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r382566652
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1598,6 +1604,8 @@ def _execute_helper(self):
)
models.DAG.deactivate_stale_dags(execute_start_time)
+ if STORE_CODE:
Review comment:
It is not valid Python code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383340478
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
+
+ op.create_index( # pylint: disable=no-member
+ 'idx_fileloc_code_hash', 'dag_code', ['fileloc_hash'])
+
+ conn = op.get_bind() # pylint: disable=no-member
+ if conn.dialect.name == "mysql":
+ conn.execute("SET time_zone = '+00:00'")
+ cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
+ res = cur.fetchall()
+ if res[0][0] == 0:
+ raise Exception(
+ "Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
+ )
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False,
+ )
+ else:
+ # sqlite and mssql datetime are fine as is. Therefore, not converting
+ if conn.dialect.name in ("sqlite", "mssql"):
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == "postgresql":
+ conn.execute("set timezone=UTC")
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=sa.TIMESTAMP(timezone=True),
+ )
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] boring-cyborg[bot] commented on issue #7217:
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576286985
Congratulations on your first Pull Request and welcome to the Apache Airflow community!
If you have any issues or are unsure about any anything please check our
Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:
- Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits](
https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks)
will help you with that.
- In case of a new feature add useful documentation (in docstrings or in `docs/` directory).
Adding a new operator? Check this short [guide](https://github
.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows
how users should use it.
- Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing
locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from
Committers.
Apache Airflow is a community-driven project and together we are making it better 🚀.
In case of doubts contact the developers at:
Mailing List: dev@airflow.apache.org
Slack: https://apache-airflow-slack.herokuapp.com/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r381449658
##########
File path: airflow/serialization/schema.json
##########
@@ -84,6 +84,7 @@
"catchup": { "type": "boolean" },
"is_subdag": { "type": "boolean" },
"fileloc": { "type" : "string"},
+ "_source_code": { "type": "string" },
Review comment:
Yep, let's not serialize it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r392170192
##########
File path: airflow/config_templates/config.yml
##########
@@ -335,6 +335,15 @@
type: string
example: ~
default: "True"
+ - name: store_dag_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
+ version_added: 2.0.0
+ type: string
+ example: ~
+ default: "False"
Review comment:
To address Kaxil's comment I think we can do this
```suggestion
Whether to persist DAG files code in DB.
If set to True, Webserver reads file contents from DB instead of
trying to access files in a DAG folder. Defaults to same as the
store_serialized_dags setting
version_added: 2.0.0
type: string
example: ~
default: "%(store_serialized_dags)s"
```
This will use the "Basic interpolation" built in to config parser https://docs.python.org/3/library/configparser.html#configparser.BasicInterpolation
Example:
```ini
[Paths]
home_dir: /Users
my_dir: %(home_dir)s/lumberjack
my_pictures: %(my_dir)s/Pictures
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217: [WIP]
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/057f3ae3a4afedf6d462ecf58b01dd6304d3e135?src=pr&el=desc) will **decrease** coverage by `0.02%`.
> The diff coverage is `61.53%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 85.5% 85.48% -0.03%
==========================================
Files 864 864
Lines 40455 40488 +33
==========================================
+ Hits 34592 34611 +19
- Misses 5863 5877 +14
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `88.09% <100%> (+0.29%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `75.53% <44%> (-0.55%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `90.9% <90.9%> (ø)` | :arrow_up: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.93% <0%> (-0.2%)` | :arrow_down: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.37% <0%> (+0.34%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [057f3ae...901700f](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383045556
##########
File path: airflow/www/views.py
##########
@@ -520,16 +521,60 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
- try:
+
+ def get_serialized_dag(dag_id):
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r385374472
##########
File path: docs/dag-serialization.rst
##########
@@ -63,6 +63,8 @@ Add the following settings in ``airflow.cfg``:
If set to True, Webserver reads from DB instead of parsing DAG files
* ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
the serialized DAG in DB should be updated. This helps in reducing database write rate.
+* ``store_code``: This flag decides whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of trying to access files in a DAG folder.
If you are updating Airflow from <1.10.7, please do not forget to run ``airflow db upgrade``.
Review comment:
I am afraid I do not understand
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217: [WIP]
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/057f3ae3a4afedf6d462ecf58b01dd6304d3e135?src=pr&el=desc) will **decrease** coverage by `0.02%`.
> The diff coverage is `61.53%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 85.5% 85.48% -0.03%
==========================================
Files 864 864
Lines 40455 40488 +33
==========================================
+ Hits 34592 34611 +19
- Misses 5863 5877 +14
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `88.09% <100%> (+0.29%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `75.53% <44%> (-0.55%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `90.9% <90.9%> (ø)` | :arrow_up: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.93% <0%> (-0.2%)` | :arrow_down: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.37% <0%> (+0.34%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [057f3ae...901700f](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r368583211
##########
File path: airflow/models/serialized_dag.py
##########
@@ -65,6 +65,7 @@ class SerializedDagModel(Base):
fileloc_hash = Column(Integer, nullable=False)
data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(String(1000000))
Review comment:
Shouldn't we use TEXT here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on issue #7217: [AIRFLOW-5946] Store
source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-598765490
Waiting for the CI to complete and pass :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391747198
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Maybe we should introduce a new class DagCodeBag or something similar?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383232936
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
+
+ op.create_index( # pylint: disable=no-member
+ 'idx_fileloc_code_hash', 'dag_code', ['fileloc_hash'])
+
+ conn = op.get_bind() # pylint: disable=no-member
+ if conn.dialect.name == "mysql":
+ conn.execute("SET time_zone = '+00:00'")
+ cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
+ res = cur.fetchall()
+ if res[0][0] == 0:
+ raise Exception(
+ "Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
+ )
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False,
+ )
+ else:
+ # sqlite and mssql datetime are fine as is. Therefore, not converting
+ if conn.dialect.name in ("sqlite", "mssql"):
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == "postgresql":
+ conn.execute("set timezone=UTC")
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=sa.TIMESTAMP(timezone=True),
+ )
Review comment:
Yeah but we had a discussion in https://github.com/apache/airflow/pull/6788 that it wasn't required.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383231320
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,43 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
+ """
+ Opens the given file. If the path contains a folder with a .zip suffix, then
+ the folder is treated as a zip archive, opening the file inside the archive.
+
+ :return: a file object, as in `open`, or as in `ZipFile.open`.
+ """
+ ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)))
+ _, archive, filename = ZIP_REGEX.search(f).groups()
+ if archive and zipfile.is_zipfile(archive):
+ return zipfile.ZipFile(archive, mode=mode).open(filename)
+ else:
+ return io.open(f, mode=mode)
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCodeModel.dag_fileloc_hash(fileloc)
+ code = None
+ dag_code = session.query(DagCodeModel) \
+ .filter(DagCodeModel.fileloc_hash == fileloc_hash) \
Review comment:
```suggestion
.filter(DagCode.fileloc_hash == fileloc_hash) \
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7217: [AIRFLOW-5946] Store
source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-598804730
:tada: :cat2:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391772300
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ old_version = session.query(
+ DagCode.fileloc, DagCode.fileloc_hash, DagCode.last_updated) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+
+ if old_version and old_version.fileloc != self.fileloc:
+ raise AirflowException(
+ "Filename '{}' causes a hash collision in the database with "
+ "'{}'. Please rename the file.".format(
+ self.fileloc, old_version.fileloc))
+
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(self.fileloc)), tz=timezone.utc)
+
+ if old_version and (file_modified - timedelta(seconds=120)) < \
+ old_version.last_updated:
+ return
+
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ for file in filelocs:
+ DagCode(file).sync_to_db(session=session)
Review comment:
I believe if it is adding/updating session and inside transaction it is not committed until somebody calls session.commit().
But I am new to Python and SqlAlchemy.
But, anyway, I have modified the implementation, to do only one query for old records instead of multiple queries by fileloc.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383201934
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
Review comment:
For now the only query that uses this table to get the code uses fileloc_code_hash column in where.
Besides, correct me if I am wrong, most dbs create an index on primary key by default. but maybe it is better to stay on the saf-side. I have no strong opinion here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391706251
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Yep, applied.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383200712
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text(), nullable=False)
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
+ return source_code
+
+ @provide_session
+ def write_code(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ cls.__table__.delete().where(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))))
+
+ @classmethod
+ def dag_fileloc_hash(cls, full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing. If we can reduce the length of fileloc, then
+ # hashing is not needed.
+ import hashlib
+ return int.from_bytes(
+ hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-2:],
+ byteorder='big', signed=False)
Review comment:
Since we are using this in a PK we should use more than one hex-nibble to (greatly) reduce the risk of collisions.
```
hash = hashlib.sha1(full_filepath.encode('utf-8')).digest()
return int.from_bytes(
hash[-4:],
byteorder='big', signed=False)
```
(This goes from 0...64k possible values to 0..4billion possible values.)
Buut this does possibly mean we need to migrate the fileloc_hash value on serialized_dag table. Dang.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391630973
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ old_version = session.query(
+ DagCode.fileloc, DagCode.fileloc_hash, DagCode.last_updated) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+
+ if old_version and old_version.fileloc != self.fileloc:
+ raise AirflowException(
+ "Filename '{}' causes a hash collision in the database with "
+ "'{}'. Please rename the file.".format(
+ self.fileloc, old_version.fileloc))
+
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(self.fileloc)), tz=timezone.utc)
+
+ if old_version and (file_modified - timedelta(seconds=120)) < \
+ old_version.last_updated:
+ return
+
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ for file in filelocs:
+ DagCode(file).sync_to_db(session=session)
Review comment:
The sync_to_db method invokes one database query at a time. I think you can group it to get multiple records at once. Similar PR: https://github.com/apache/airflow/pull/7477/files
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383231168
##########
File path: airflow/models/dag.py
##########
@@ -44,6 +46,7 @@
from airflow.models.base import ID_LEN, Base
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagbag import DagBag
+from airflow.models.dagcode import DagCodeModel
Review comment:
```suggestion
from airflow.models.dagcode import DagCode
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383231287
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,43 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
+ """
+ Opens the given file. If the path contains a folder with a .zip suffix, then
+ the folder is treated as a zip archive, opening the file inside the archive.
+
+ :return: a file object, as in `open`, or as in `ZipFile.open`.
+ """
+ ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)))
+ _, archive, filename = ZIP_REGEX.search(f).groups()
+ if archive and zipfile.is_zipfile(archive):
+ return zipfile.ZipFile(archive, mode=mode).open(filename)
+ else:
+ return io.open(f, mode=mode)
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCodeModel.dag_fileloc_hash(fileloc)
+ code = None
+ dag_code = session.query(DagCodeModel) \
Review comment:
```suggestion
dag_code = session.query(DagCode) \
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r404946932
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException, DagCodeNotFound
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText, nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ self.bulk_sync_to_db([self.fileloc], session)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ filelocs_to_hashes = {
+ fileloc: DagCode.dag_fileloc_hash(fileloc) for fileloc in filelocs
+ }
+ existing_orm_dag_codes = (
+ session
+ .query(DagCode)
+ .filter(DagCode.fileloc_hash.in_(filelocs_to_hashes.values()))
+ .with_for_update(of=DagCode)
+ .all()
+ )
+ existing_orm_dag_codes_by_fileloc_hashes = {
+ orm.fileloc_hash: orm for orm in existing_orm_dag_codes
+ }
+ exisitng_orm_filelocs = {
+ orm.fileloc for orm in existing_orm_dag_codes_by_fileloc_hashes.values()
+ }
+ if not exisitng_orm_filelocs.issubset(filelocs):
+ conflicting_filelocs = exisitng_orm_filelocs.difference(filelocs)
+ hashes_to_filelocs = {
+ DagCode.dag_fileloc_hash(fileloc): fileloc for fileloc in filelocs
+ }
+ message = ""
+ for fileloc in conflicting_filelocs:
+ message += ("Filename '{}' causes a hash collision in the " +
+ "database with '{}'. Please rename the file.")\
+ .format(
+ hashes_to_filelocs[DagCode.dag_fileloc_hash(fileloc)],
+ fileloc)
+ raise AirflowException(message)
+
+ existing_filelocs = {
+ dag_code.fileloc for dag_code in existing_orm_dag_codes
+ }
+ missing_filelocs = filelocs.difference(existing_filelocs)
+
+ for fileloc in missing_filelocs:
+ orm_dag_code = DagCode(fileloc)
+ session.add(orm_dag_code)
+
+ for fileloc in existing_filelocs:
+ old_version = existing_orm_dag_codes_by_fileloc_hashes[
+ filelocs_to_hashes[fileloc]
+ ]
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(fileloc)), tz=timezone.utc)
+
+ if (file_modified - timedelta(seconds=120)) > old_version.last_updated:
+ orm_dag_code.last_updated = timezone.utcnow()
+ orm_dag_code.source_code = DagCode._read_code(orm_dag_code.fileloc)
+ session.update(orm_dag_code)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ session.query(cls).filter(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))).delete())
+
+ @classmethod
+ @provide_session
+ def has_dag(cls, fileloc: str, session=None) -> bool:
+ """Checks a file exist in dag_code table.
+
+ :param fileloc: the file to check
+ :param session: ORM Session
+ """
+ fileloc_hash = cls.dag_fileloc_hash(fileloc)
+ return session.query(exists().where(cls.fileloc_hash == fileloc_hash))\
+ .scalar()
+
+ @classmethod
+ def get_code_by_fileloc(cls, fileloc: str) -> str:
+ """Returns source code for a given fileloc.
+
+ :param fileloc: file path of a DAG
+ :return: source code as string
+ """
+ return DagCode(fileloc).code()
+
+ def code(self) -> str:
+ """Returns source code for this DagCode object.
+
+ :return: source code as string
+ """
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ dag_code = session.query(DagCode) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+ if not dag_code:
+ raise DagCodeNotFound()
+ else:
+ code = dag_code.source_code
+ return code
+
+ @staticmethod
+ def dag_fileloc_hash(full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # Hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing.
+ import hashlib
+ # Only 7 bytes because MySQL BigInteger can hold only 8 bytes (signed).
+ return struct.unpack('>Q', hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-8:])[0] >> 8
Review comment:
@anitakar Did you mean for this construct to ignore the least-significant byte?
Location: `/home/ash/airflow/dags/example.py`
SHA1 (hex) `0xx78288e229ef15e32a7a32e1fd123d9fc60b5eae0`
fileloc_hash: `58867689281598954`
fileloc_hash hex: `d123d9fc60b5ea`
i.e. it ignores/removes the `e0` from the end.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383231226
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,43 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
+ """
+ Opens the given file. If the path contains a folder with a .zip suffix, then
+ the folder is treated as a zip archive, opening the file inside the archive.
+
+ :return: a file object, as in `open`, or as in `ZipFile.open`.
+ """
+ ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)))
+ _, archive, filename = ZIP_REGEX.search(f).groups()
+ if archive and zipfile.is_zipfile(archive):
+ return zipfile.ZipFile(archive, mode=mode).open(filename)
+ else:
+ return io.open(f, mode=mode)
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCodeModel.dag_fileloc_hash(fileloc)
Review comment:
```suggestion
fileloc_hash = DagCode.dag_fileloc_hash(fileloc)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r382567801
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1598,6 +1604,8 @@ def _execute_helper(self):
)
models.DAG.deactivate_stale_dags(execute_start_time)
+ if STORE_CODE:
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383232048
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text(), nullable=False)
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
Review comment:
Either
```suggestion
self.source_code = cls._read_code(self.fileloc)
```
or
```suggestion
self.source_code = DagCode_read_code(self.fileloc)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383270251
##########
File path: airflow/api/common/experimental/__init__.py
##########
@@ -29,12 +29,10 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:
if dag_model is None:
raise DagNotFound("Dag id {} not found in DagModel".format(dag_id))
- def read_store_serialized_dags():
- from airflow.configuration import conf
- return conf.getboolean('core', 'store_serialized_dags')
+ from airflow.configuration import conf
Review comment:
Nope
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391767693
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
The `dag` object passed to `self.render_template` comes from DagModel: Line522
so we don't need to get entire Dag here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/c0c5f11ad11a5a38e0553c1a36aa75eb83efae51&el=desc) will **decrease** coverage by `0.06%`.
> The diff coverage is `89.60%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 87.00% 86.93% -0.07%
==========================================
Files 906 907 +1
Lines 43851 43946 +95
==========================================
+ Hits 38154 38206 +52
- Misses 5697 5740 +43
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.32%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `91.39% <91.39%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `88.88% <100.00%> (+1.05%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `76.17% <100.00%> (+0.01%)` | :arrow_up: |
| ... and [4 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [c0c5f11...f71aea9](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391756833
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -903,6 +903,7 @@ def process_file(
self.log.info("Creating / updating %s in ORM", ti)
session.merge(ti)
# commit batch
+
Review comment:
```suggestion
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383201308
##########
File path: airflow/models/serialized_dag.py
##########
@@ -72,23 +72,10 @@ class SerializedDagModel(Base):
def __init__(self, dag: DAG):
self.dag_id = dag.dag_id
self.fileloc = dag.full_filepath
- self.fileloc_hash = self.dag_fileloc_hash(self.fileloc)
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
self.data = SerializedDAG.to_dict(dag)
self.last_updated = timezone.utcnow()
- @staticmethod
- def dag_fileloc_hash(full_filepath: str) -> int:
- """"Hashing file location for indexing.
-
- :param full_filepath: full filepath of DAG file
- :return: hashed full_filepath
- """
- # hashing is needed because the length of fileloc is 2000 as an Airflow convention,
- # which is over the limit of indexing. If we can reduce the length of fileloc, then
- # hashing is not needed.
- return int.from_bytes(
- hashlib.sha1(full_filepath.encode('utf-8')).digest()[-2:], byteorder='big', signed=False)
-
@classmethod
@provide_session
def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session=None):
Review comment:
I would have expected to see something writing code form in here...?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r392169180
##########
File path: airflow/config_templates/config.yml
##########
@@ -335,6 +335,15 @@
type: string
example: ~
default: "True"
+ - name: store_dag_code
Review comment:
Let's move this to live next to store_serialized_dags as these two settings are related.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383248176
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
Review comment:
I definitely agree.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r382599573
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1598,6 +1604,8 @@ def _execute_helper(self):
)
models.DAG.deactivate_stale_dags(execute_start_time)
+ if STORE_CODE:
Review comment:
?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/78e48ba46a7f721384417ebf8a798dd320632fa8&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.58%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.99% 86.50% -0.49%
==========================================
Files 906 907 +1
Lines 43806 43878 +72
==========================================
- Hits 38110 37958 -152
- Misses 5696 5920 +224
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.68% <ø> (ø)` | |
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.70%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `70.00% <50.00%> (-6.93%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.26% <90.00%> (-0.05%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `94.91% <94.91%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `88.88% <100.00%> (+1.05%)` | :arrow_up: |
| ... and [19 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [78e48ba...5ac195a](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r368585965
##########
File path: airflow/serialization/schema.json
##########
@@ -84,6 +84,7 @@
"catchup": { "type": "boolean" },
"is_subdag": { "type": "boolean" },
"fileloc": { "type" : "string"},
+ "_source_code": { "type": "string" },
Review comment:
This is not part of DAG. DAG is created by DAG file. We should store references to the file here instead of saving the source code of the file to avoid duplication.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io commented on issue #7217: [WIP] [AIRFLOW-NNNN]
Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/057f3ae3a4afedf6d462ecf58b01dd6304d3e135?src=pr&el=desc) will **decrease** coverage by `0.02%`.
> The diff coverage is `61.53%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 85.5% 85.48% -0.03%
==========================================
Files 864 864
Lines 40455 40488 +33
==========================================
+ Hits 34592 34611 +19
- Misses 5863 5877 +14
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `88.09% <100%> (+0.29%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `75.53% <44%> (-0.55%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `90.9% <90.9%> (ø)` | :arrow_up: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.93% <0%> (-0.2%)` | :arrow_down: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.37% <0%> (+0.34%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [057f3ae...901700f](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383986117
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text())
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
Review comment:
I agree
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391774655
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Ok, I shall pass dag model then and test if it works in webserver.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391711605
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Dag is needed to render the page anyway
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on issue #7217: [WIP] [AIRFLOW-NNNN]
Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576680236
Hi Anita, I would appreciate if you can hold on to this PR before I get the TaskInstance and template rendering PR in
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r405339772
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException, DagCodeNotFound
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText, nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ self.bulk_sync_to_db([self.fileloc], session)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ filelocs_to_hashes = {
+ fileloc: DagCode.dag_fileloc_hash(fileloc) for fileloc in filelocs
+ }
+ existing_orm_dag_codes = (
+ session
+ .query(DagCode)
+ .filter(DagCode.fileloc_hash.in_(filelocs_to_hashes.values()))
+ .with_for_update(of=DagCode)
+ .all()
+ )
+ existing_orm_dag_codes_by_fileloc_hashes = {
+ orm.fileloc_hash: orm for orm in existing_orm_dag_codes
+ }
+ exisitng_orm_filelocs = {
+ orm.fileloc for orm in existing_orm_dag_codes_by_fileloc_hashes.values()
+ }
+ if not exisitng_orm_filelocs.issubset(filelocs):
+ conflicting_filelocs = exisitng_orm_filelocs.difference(filelocs)
+ hashes_to_filelocs = {
+ DagCode.dag_fileloc_hash(fileloc): fileloc for fileloc in filelocs
+ }
+ message = ""
+ for fileloc in conflicting_filelocs:
+ message += ("Filename '{}' causes a hash collision in the " +
+ "database with '{}'. Please rename the file.")\
+ .format(
+ hashes_to_filelocs[DagCode.dag_fileloc_hash(fileloc)],
+ fileloc)
+ raise AirflowException(message)
+
+ existing_filelocs = {
+ dag_code.fileloc for dag_code in existing_orm_dag_codes
+ }
+ missing_filelocs = filelocs.difference(existing_filelocs)
+
+ for fileloc in missing_filelocs:
+ orm_dag_code = DagCode(fileloc)
+ session.add(orm_dag_code)
+
+ for fileloc in existing_filelocs:
+ old_version = existing_orm_dag_codes_by_fileloc_hashes[
+ filelocs_to_hashes[fileloc]
+ ]
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(fileloc)), tz=timezone.utc)
+
+ if (file_modified - timedelta(seconds=120)) > old_version.last_updated:
+ orm_dag_code.last_updated = timezone.utcnow()
+ orm_dag_code.source_code = DagCode._read_code(orm_dag_code.fileloc)
+ session.update(orm_dag_code)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ session.query(cls).filter(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))).delete())
+
+ @classmethod
+ @provide_session
+ def has_dag(cls, fileloc: str, session=None) -> bool:
+ """Checks a file exist in dag_code table.
+
+ :param fileloc: the file to check
+ :param session: ORM Session
+ """
+ fileloc_hash = cls.dag_fileloc_hash(fileloc)
+ return session.query(exists().where(cls.fileloc_hash == fileloc_hash))\
+ .scalar()
+
+ @classmethod
+ def get_code_by_fileloc(cls, fileloc: str) -> str:
+ """Returns source code for a given fileloc.
+
+ :param fileloc: file path of a DAG
+ :return: source code as string
+ """
+ return DagCode(fileloc).code()
+
+ def code(self) -> str:
+ """Returns source code for this DagCode object.
+
+ :return: source code as string
+ """
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ dag_code = session.query(DagCode) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+ if not dag_code:
+ raise DagCodeNotFound()
+ else:
+ code = dag_code.source_code
+ return code
+
+ @staticmethod
+ def dag_fileloc_hash(full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # Hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing.
+ import hashlib
+ # Only 7 bytes because MySQL BigInteger can hold only 8 bytes (signed).
+ return struct.unpack('>Q', hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-8:])[0] >> 8
Review comment:
In all honesty I wanted to use all 8 bytes of mysql biginteger.
I used another construct before but it was not working with python 2, so I have changed it to use python struct. But I should have used signed long long instead of unsigned long long.
When I have noted it the code was too close to releasing and 7 bytes is enough anyway (https://cloud.google.com/composer/docs/release-notes).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r380316629
##########
File path: airflow/www/views.py
##########
@@ -520,16 +521,60 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
- try:
+
+ def get_serialized_dag(dag_id):
Review comment:
These functions look very complex. Can you move them to another module? This will allow their use also by other classes, e.g. API
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on issue #7217: [AIRFLOW-NNNN] Store
DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576325375
Hi @anitakar, thank you for your PR. There is a open Jira issue and I am working on it: https://issues.apache.org/jira/browse/AIRFLOW-5946 as a part of Dag Serialization.
We still need to render templates, which we are trying to solve now, the PR is open for it: https://github.com/apache/airflow/pull/6788
I am going to raise PRs for Code View this week.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383234691
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
Review comment:
Don't we already have `wwwutils.open_maybe_zipped`? maybe that should be moved to `airflow.utils.dag_processing`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/3bb60afc7b8319996385d681faac342afe2b3bd2&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.83%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.89% 86.41% -0.49%
==========================================
Files 906 907 +1
Lines 43906 44018 +112
==========================================
- Hits 38153 38039 -114
- Misses 5753 5979 +226
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.32%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `92.47% <92.47%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.97% <100.00%> (+0.16%)` | :arrow_up: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.55% <100.00%> (+0.35%)` | :arrow_up: |
| ... and [18 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [ccbaf57...50f4874](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r368638185
##########
File path: airflow/models/serialized_dag.py
##########
@@ -65,6 +65,7 @@ class SerializedDagModel(Base):
fileloc_hash = Column(Integer, nullable=False)
data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(String(1000000))
Review comment:
We should
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on issue #7217: [WIP] [AIRFLOW-NNNN]
Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576684107
> Sure. Kamil's database design makes much more sense. We avoid storing the same file source code multiple times, if file contains multiple DAGs.
>
> I kind of jumped in with this PR without creating a bug or seeing what is happening in community when it comes to DAG serialization. @kaxil I shall wait for your commit then.
Thanks Anita, appreciate it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil edited a comment on issue #7217: [AIRFLOW-NNNN]
Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576343258
Yup, that is exactly what we discussed initially as mentioned in https://issues.apache.org/jira/browse/AIRFLOW-5946 :)
To make Webserver not need DAG Files we need to find a way to get Code to display in Code View.
1) Store in lazy-loaded column in SerializedDag table
2) Save in a new table with DAG_id and store versions as well. Add a limit of last 10 versions (configurable). This is just needed by Code View so not a problem if we store in New table
I am also more towards Kamil's side and the PR will be available soon too
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383197191
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text())
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
+ return source_code
+
+ @provide_session
+ def write_code(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ cls.__table__.delete().where(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))))
+
+ @classmethod
Review comment:
```suggestion
@staticmethod
```
Since we don't look at `cls` inside :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217: [WIP]
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/58c3542ed25061320ce61dbe0adf451a44c738dd?src=pr&el=desc) will **decrease** coverage by `0.73%`.
> The diff coverage is `62.5%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
=========================================
- Coverage 86.53% 85.8% -0.74%
=========================================
Files 874 874
Lines 40868 40901 +33
=========================================
- Hits 35365 35094 -271
- Misses 5503 5807 +304
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/api/experimental/endpoints.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvYXBpL2V4cGVyaW1lbnRhbC9lbmRwb2ludHMucHk=) | `89.81% <100%> (ø)` | :arrow_up: |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.85% <100%> (+0.17%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `75.67% <44%> (-0.55%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `90.9% <90.9%> (ø)` | :arrow_up: |
| [...flow/providers/apache/cassandra/hooks/cassandra.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2Nhc3NhbmRyYS9ob29rcy9jYXNzYW5kcmEucHk=) | `21.51% <0%> (-72.16%)` | :arrow_down: |
| [...w/providers/apache/hive/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvb3BlcmF0b3JzL215c3FsX3RvX2hpdmUucHk=) | `35.84% <0%> (-64.16%)` | :arrow_down: |
| [airflow/operators/generic\_transfer.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvZ2VuZXJpY190cmFuc2Zlci5weQ==) | `39.28% <0%> (-60.72%)` | :arrow_down: |
| [airflow/api/auth/backend/kerberos\_auth.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvYXV0aC9iYWNrZW5kL2tlcmJlcm9zX2F1dGgucHk=) | `28.16% <0%> (-54.93%)` | :arrow_down: |
| [airflow/providers/redis/operators/redis\_publish.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcmVkaXMvb3BlcmF0b3JzL3JlZGlzX3B1Ymxpc2gucHk=) | `50% <0%> (-50%)` | :arrow_down: |
| [airflow/providers/mongo/sensors/mongo.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbW9uZ28vc2Vuc29ycy9tb25nby5weQ==) | `53.33% <0%> (-46.67%)` | :arrow_down: |
| ... and [13 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [58c3542...662fab1](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/c0c5f11ad11a5a38e0553c1a36aa75eb83efae51&el=desc) will **decrease** coverage by `27.14%`.
> The diff coverage is `89.60%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
===========================================
- Coverage 87.00% 59.85% -27.15%
===========================================
Files 906 907 +1
Lines 43851 43946 +95
===========================================
- Hits 38154 26306 -11848
- Misses 5697 17640 +11943
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `83.07% <33.33%> (-5.26%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `91.39% <91.39%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `88.88% <100.00%> (+1.05%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `76.17% <100.00%> (+0.01%)` | :arrow_up: |
| ... and [313 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [c0c5f11...f71aea9](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383356331
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text())
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
+ return source_code
+
+ @provide_session
+ def write_code(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ session.merge(self)
Review comment:
I have changed it to sync_to_db.
I have applied your suggestion in production code.
Unfortunately, it was trickier in test code. That is where I have noticed that we have sync_to_db for Dag.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383231993
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
+
+ op.create_index( # pylint: disable=no-member
+ 'idx_fileloc_code_hash', 'dag_code', ['fileloc_hash'])
+
+ conn = op.get_bind() # pylint: disable=no-member
+ if conn.dialect.name == "mysql":
+ conn.execute("SET time_zone = '+00:00'")
+ cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
+ res = cur.fetchall()
+ if res[0][0] == 0:
+ raise Exception(
+ "Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
+ )
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False,
+ )
+ else:
+ # sqlite and mssql datetime are fine as is. Therefore, not converting
+ if conn.dialect.name in ("sqlite", "mssql"):
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == "postgresql":
+ conn.execute("set timezone=UTC")
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=sa.TIMESTAMP(timezone=True),
+ )
Review comment:
This was copied from your migration to add the serialized_dag table wasn't it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383233216
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
+
+ op.create_index( # pylint: disable=no-member
+ 'idx_fileloc_code_hash', 'dag_code', ['fileloc_hash'])
+
+ conn = op.get_bind() # pylint: disable=no-member
+ if conn.dialect.name == "mysql":
+ conn.execute("SET time_zone = '+00:00'")
+ cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
+ res = cur.fetchall()
+ if res[0][0] == 0:
+ raise Exception(
+ "Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
+ )
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False,
+ )
+ else:
+ # sqlite and mssql datetime are fine as is. Therefore, not converting
+ if conn.dialect.name in ("sqlite", "mssql"):
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == "postgresql":
+ conn.execute("set timezone=UTC")
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=sa.TIMESTAMP(timezone=True),
+ )
Review comment:
`explicit_defaults_for_timestamp` would be applied by previous migration
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383192657
##########
File path: airflow/config_templates/config.yml
##########
@@ -323,6 +323,15 @@
type: string
example: ~
default: "True"
+ - name: store_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
Review comment:
I don't think this should be a separate config setting. I would say if we have serialized_dags on then this is on. No option.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r404946932
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException, DagCodeNotFound
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText, nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ self.bulk_sync_to_db([self.fileloc], session)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ filelocs_to_hashes = {
+ fileloc: DagCode.dag_fileloc_hash(fileloc) for fileloc in filelocs
+ }
+ existing_orm_dag_codes = (
+ session
+ .query(DagCode)
+ .filter(DagCode.fileloc_hash.in_(filelocs_to_hashes.values()))
+ .with_for_update(of=DagCode)
+ .all()
+ )
+ existing_orm_dag_codes_by_fileloc_hashes = {
+ orm.fileloc_hash: orm for orm in existing_orm_dag_codes
+ }
+ exisitng_orm_filelocs = {
+ orm.fileloc for orm in existing_orm_dag_codes_by_fileloc_hashes.values()
+ }
+ if not exisitng_orm_filelocs.issubset(filelocs):
+ conflicting_filelocs = exisitng_orm_filelocs.difference(filelocs)
+ hashes_to_filelocs = {
+ DagCode.dag_fileloc_hash(fileloc): fileloc for fileloc in filelocs
+ }
+ message = ""
+ for fileloc in conflicting_filelocs:
+ message += ("Filename '{}' causes a hash collision in the " +
+ "database with '{}'. Please rename the file.")\
+ .format(
+ hashes_to_filelocs[DagCode.dag_fileloc_hash(fileloc)],
+ fileloc)
+ raise AirflowException(message)
+
+ existing_filelocs = {
+ dag_code.fileloc for dag_code in existing_orm_dag_codes
+ }
+ missing_filelocs = filelocs.difference(existing_filelocs)
+
+ for fileloc in missing_filelocs:
+ orm_dag_code = DagCode(fileloc)
+ session.add(orm_dag_code)
+
+ for fileloc in existing_filelocs:
+ old_version = existing_orm_dag_codes_by_fileloc_hashes[
+ filelocs_to_hashes[fileloc]
+ ]
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(fileloc)), tz=timezone.utc)
+
+ if (file_modified - timedelta(seconds=120)) > old_version.last_updated:
+ orm_dag_code.last_updated = timezone.utcnow()
+ orm_dag_code.source_code = DagCode._read_code(orm_dag_code.fileloc)
+ session.update(orm_dag_code)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ session.query(cls).filter(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))).delete())
+
+ @classmethod
+ @provide_session
+ def has_dag(cls, fileloc: str, session=None) -> bool:
+ """Checks a file exist in dag_code table.
+
+ :param fileloc: the file to check
+ :param session: ORM Session
+ """
+ fileloc_hash = cls.dag_fileloc_hash(fileloc)
+ return session.query(exists().where(cls.fileloc_hash == fileloc_hash))\
+ .scalar()
+
+ @classmethod
+ def get_code_by_fileloc(cls, fileloc: str) -> str:
+ """Returns source code for a given fileloc.
+
+ :param fileloc: file path of a DAG
+ :return: source code as string
+ """
+ return DagCode(fileloc).code()
+
+ def code(self) -> str:
+ """Returns source code for this DagCode object.
+
+ :return: source code as string
+ """
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ dag_code = session.query(DagCode) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+ if not dag_code:
+ raise DagCodeNotFound()
+ else:
+ code = dag_code.source_code
+ return code
+
+ @staticmethod
+ def dag_fileloc_hash(full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # Hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing.
+ import hashlib
+ # Only 7 bytes because MySQL BigInteger can hold only 8 bytes (signed).
+ return struct.unpack('>Q', hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-8:])[0] >> 8
Review comment:
@anitakar Did you mean for this construct to ignore the least-significant byte?
Location: `/home/ash/airflow/dags/example.py`
SHA1 (hex) `0x78288e229ef15e32a7a32e1fd123d9fc60b5eae0`
fileloc_hash: `58867689281598954`
fileloc_hash hex: `d123d9fc60b5ea`
i.e. it ignores/removes the `e0` from the end.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on issue #7217: [WIP] [AIRFLOW-NNNN] Store
DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576762585
How calling the table dag_source - it's possible in the future that we'd have an API endpoint to submit a dag, at which point there is no "file"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/78e48ba46a7f721384417ebf8a798dd320632fa8&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.58%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.99% 86.50% -0.49%
==========================================
Files 906 907 +1
Lines 43806 43878 +72
==========================================
- Hits 38110 37958 -152
- Misses 5696 5920 +224
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.68% <ø> (ø)` | |
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.70%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `70.00% <50.00%> (-6.93%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.26% <90.00%> (-0.05%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `94.91% <94.91%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `88.88% <100.00%> (+1.05%)` | :arrow_up: |
| ... and [19 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [78e48ba...5ac195a](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383231052
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
Review comment:
yes I agree. It should live with `correct_maybe_zipped `
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391747198
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Maybe we should introduce a new class DagCodeBag or something similar?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383986936
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text(), nullable=False)
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
+ return source_code
+
+ @provide_session
+ def write_code(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ cls.__table__.delete().where(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))))
+
+ @classmethod
+ def dag_fileloc_hash(cls, full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing. If we can reduce the length of fileloc, then
+ # hashing is not needed.
+ import hashlib
+ return int.from_bytes(
+ hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-2:],
+ byteorder='big', signed=False)
Review comment:
I agree. I shall create this migration
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383357113
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text(), nullable=False)
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
+ return source_code
+
+ @provide_session
+ def write_code(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ cls.__table__.delete().where(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))))
+
+ @classmethod
+ def dag_fileloc_hash(cls, full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing. If we can reduce the length of fileloc, then
+ # hashing is not needed.
+ import hashlib
+ return int.from_bytes(
+ hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-2:],
+ byteorder='big', signed=False)
Review comment:
I have decided to add id column and create uniqness constraint on fileloc instead.
It sounds like a valid suggestion though.
Maybe in another PR?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383045579
##########
File path: airflow/models/__init__.py
##########
@@ -43,3 +43,19 @@
# Must be loaded after loading DAG model.
# noinspection PyUnresolvedReferences
import airflow.jobs # noqa: F401 isort # isort:skip
+
+
+def dag_fileloc_hash(full_filepath: str) -> int:
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383225969
##########
File path: airflow/config_templates/config.yml
##########
@@ -323,6 +323,15 @@
type: string
example: ~
default: "True"
+ - name: store_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
Review comment:
Can we change it to `store_dag_code`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on issue #7217: [WIP] [AIRFLOW-NNNN]
Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
anitakar commented on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576681041
Sure. Kamil's database design makes much more sense. We avoid storing the same file source code multiple times, if file contains multiple DAGs.
I kind of jumped in with this PR without creating a bug or seeing what is happening in community when it comes to DAG serialization. @kaxil I shall wait for your commit then.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383226483
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -869,6 +871,10 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
self.log.info("Creating / updating %s in ORM", ti)
session.merge(ti)
# commit batch
+
+ if STORE_CODE:
+ DagCodeModel(file_path).write_code(session=session)
Review comment:
```suggestion
DagCode(file_path).write_code(session=session)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r392275176
##########
File path: airflow/config_templates/config.yml
##########
@@ -335,6 +335,15 @@
type: string
example: ~
default: "True"
+ - name: store_dag_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
+ version_added: 2.0.0
+ type: string
+ example: ~
+ default: "False"
Review comment:
Thanks. I was too quick with committing your first suggestion so that I could not accept your first suggestion anymore. But I have manually added it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383227214
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
Review comment:
Yes index would be created for all primary keys
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/3bb60afc7b8319996385d681faac342afe2b3bd2&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.83%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.89% 86.41% -0.49%
==========================================
Files 906 907 +1
Lines 43906 44018 +112
==========================================
- Hits 38153 38039 -114
- Misses 5753 5979 +226
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.32%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `92.47% <92.47%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.97% <100.00%> (+0.16%)` | :arrow_up: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.55% <100.00%> (+0.35%)` | :arrow_up: |
| ... and [18 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [ccbaf57...50f4874](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391738524
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
We should move it to another place, but Airflow ... has problematic architecture for now. It will be harder to change so that the database objects do not contain logic. Currently, very often they have very complex functions, in particular SchedulerJob, but ... this is a separate topic for discussion.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391701019
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
I have moved them there, although I have doubts because what does method _get_code_from_file(self) does there.
But cyclic dependencies seem like a good reason to move.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217: [WIP]
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/057f3ae3a4afedf6d462ecf58b01dd6304d3e135?src=pr&el=desc) will **decrease** coverage by `0.02%`.
> The diff coverage is `61.53%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 85.5% 85.48% -0.03%
==========================================
Files 864 864
Lines 40455 40488 +33
==========================================
+ Hits 34592 34611 +19
- Misses 5863 5877 +14
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `88.09% <100%> (+0.29%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `75.53% <44%> (-0.55%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `90.9% <90.9%> (ø)` | :arrow_up: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.93% <0%> (-0.2%)` | :arrow_down: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.37% <0%> (+0.34%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [057f3ae...901700f](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383194773
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
+ """
+ Opens the given file. If the path contains a folder with a .zip suffix, then
+ the folder is treated as a zip archive, opening the file inside the archive.
+
+ :return: a file object, as in `open`, or as in `ZipFile.open`.
+ """
+ ZIP_REGEX = re.compile(r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)))
+ _, archive, filename = ZIP_REGEX.search(f).groups()
+ if archive and zipfile.is_zipfile(archive):
+ return zipfile.ZipFile(archive, mode=mode).open(filename)
+ else:
+ return io.open(f, mode=mode)
+
+ @provide_session
+ def _get_code_from_db(self, fileloc, session=None):
Review comment:
```suggestion
def _get_code_from_db(self, session=None):
```
Since this is a method on self we shouldn't pass in the property but instead just look at `self.fileloc`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383225298
##########
File path: airflow/api/common/experimental/__init__.py
##########
@@ -29,12 +29,10 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:
if dag_model is None:
raise DagNotFound("Dag id {} not found in DagModel".format(dag_id))
- def read_store_serialized_dags():
- from airflow.configuration import conf
- return conf.getboolean('core', 'store_serialized_dags')
+ from airflow.configuration import conf
Review comment:
Any reason this import is inside the function?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383193240
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
Review comment:
Is it worth also adding an index on `fileloc`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383196969
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
Review comment:
```suggestion
class DagCode(Base):
```
`Model` suffix is not needed here. We only have it on `Dag` to distinguish between the DAG class and the "ORM"/Database record type.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383336903
##########
File path: airflow/config_templates/config.yml
##########
@@ -323,6 +323,15 @@
type: string
example: ~
default: "True"
+ - name: store_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
Review comment:
Sure
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#discussion_r382630100
##########
File path: airflow/models/__init__.py
##########
@@ -43,3 +43,19 @@
# Must be loaded after loading DAG model.
# noinspection PyUnresolvedReferences
import airflow.jobs # noqa: F401 isort # isort:skip
+
+
+def dag_fileloc_hash(full_filepath: str) -> int:
Review comment:
We don't put the code in __init__.py. We used to do it and now we have serious problems with cyclical imports.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391761310
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
Review comment:
```suggestion
source_code = Column(UnicodeText, nullable=False)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on issue #7217: [AIRFLOW-5946] Store
source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-598799705
Good work @anitakar 🎉
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217: [WIP]
[AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [WIP] [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/057f3ae3a4afedf6d462ecf58b01dd6304d3e135?src=pr&el=desc) will **decrease** coverage by `0.02%`.
> The diff coverage is `61.53%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 85.5% 85.48% -0.03%
==========================================
Files 864 864
Lines 40455 40488 +33
==========================================
+ Hits 34592 34611 +19
- Misses 5863 5877 +14
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `88.09% <100%> (+0.29%)` | :arrow_up: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `75.53% <44%> (-0.55%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `90.9% <90.9%> (ø)` | :arrow_up: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.93% <0%> (-0.2%)` | :arrow_down: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.37% <0%> (+0.34%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [057f3ae...901700f](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383195827
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc = Column(String(2000), primary_key=True)
+ # The max length of fileloc exceeds the limit of indexing.
+ fileloc_hash = Column(Integer, primary_key=True)
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(Text())
+
+ __table_args__ = (
+ Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+ )
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCodeModel._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ try:
+ with open(fileloc, 'r') as source:
+ source_code = source.read()
+ except IOError:
+ source_code = "Couldn't read source file {}.".format(fileloc)
Review comment:
I'm not sure we should store this as the code, I think I'd rather we didn't store anything and exposed an error somewhere.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391772583
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ old_version = session.query(
+ DagCode.fileloc, DagCode.fileloc_hash, DagCode.last_updated) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+
+ if old_version and old_version.fileloc != self.fileloc:
+ raise AirflowException(
+ "Filename '{}' causes a hash collision in the database with "
+ "'{}'. Please rename the file.".format(
+ self.fileloc, old_version.fileloc))
+
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(self.fileloc)), tz=timezone.utc)
+
+ if old_version and (file_modified - timedelta(seconds=120)) < \
+ old_version.last_updated:
+ return
+
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ for file in filelocs:
+ DagCode(file).sync_to_db(session=session)
Review comment:
Resolve if you believe new batch update/insert implementation is correct.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r387325391
##########
File path: docs/dag-serialization.rst
##########
@@ -63,6 +63,8 @@ Add the following settings in ``airflow.cfg``:
If set to True, Webserver reads from DB instead of parsing DAG files
* ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
the serialized DAG in DB should be updated. This helps in reducing database write rate.
+* ``store_code``: This flag decides whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of trying to access files in a DAG folder.
If you are updating Airflow from <1.10.7, please do not forget to run ``airflow db upgrade``.
Review comment:
We can remove Line 79 and 80 about the following limitation:
```
* **Code View** will read the DAG File & show it using Pygments.
However, it does not need to Parse the Python file so it is still a small operation.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/78e48ba46a7f721384417ebf8a798dd320632fa8?src=pr&el=desc) will **decrease** coverage by `22.48%`.
> The diff coverage is `48.59%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
===========================================
- Coverage 86.99% 64.51% -22.49%
===========================================
Files 906 906
Lines 43806 43865 +59
===========================================
- Hits 38110 28299 -9811
- Misses 5696 15566 +9870
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `41.7% <ø> (-38.68%)` | :arrow_down: |
| [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `13.68% <ø> (-77.01%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `25.66% <0%> (-63.06%)` | :arrow_down: |
| [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `25.71% <0%> (-50.45%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `96.55% <100%> (-3.45%)` | :arrow_down: |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `32% <100%> (-60.6%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `40% <25%> (-36.93%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `57.4% <35%> (-33.91%)` | :arrow_down: |
| [airflow/utils/file.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9maWxlLnB5) | `74.07% <50%> (-13.77%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `59.32% <59.32%> (ø)` | |
| ... and [499 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [78e48ba...5ac195a](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r387324460
##########
File path: tests/www/api/experimental/test_endpoints.py
##########
@@ -347,12 +347,12 @@ def test_dagrun_status(self):
@parameterized_class([
- {"dag_serialzation": "False"},
- {"dag_serialzation": "True"},
+ {"dag_serialIzation": "False"},
+ {"dag_serialIzation": "True"},
Review comment:
```suggestion
{"dag_serialization": "False"},
{"dag_serialization": "True"},
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391694536
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ old_version = session.query(
+ DagCode.fileloc, DagCode.fileloc_hash, DagCode.last_updated) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+
+ if old_version and old_version.fileloc != self.fileloc:
+ raise AirflowException(
+ "Filename '{}' causes a hash collision in the database with "
+ "'{}'. Please rename the file.".format(
+ self.fileloc, old_version.fileloc))
+
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(self.fileloc)), tz=timezone.utc)
+
+ if old_version and (file_modified - timedelta(seconds=120)) < \
+ old_version.last_updated:
+ return
+
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ for file in filelocs:
+ DagCode(file).sync_to_db(session=session)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ cls.__table__.delete().where(
Review comment:
I have changed it to what you suggested but SO seems to suggest previous solution was faster.
Here, I will first do query and then execute delete where id in ()
And with previous solution it would have been delete from where
https://stackoverflow.com/questions/39773560/sqlalchemy-how-do-you-delete-multiple-rows-without-querying
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r405349916
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException, DagCodeNotFound
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText, nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ self.bulk_sync_to_db([self.fileloc], session)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ filelocs_to_hashes = {
+ fileloc: DagCode.dag_fileloc_hash(fileloc) for fileloc in filelocs
+ }
+ existing_orm_dag_codes = (
+ session
+ .query(DagCode)
+ .filter(DagCode.fileloc_hash.in_(filelocs_to_hashes.values()))
+ .with_for_update(of=DagCode)
+ .all()
+ )
+ existing_orm_dag_codes_by_fileloc_hashes = {
+ orm.fileloc_hash: orm for orm in existing_orm_dag_codes
+ }
+ exisitng_orm_filelocs = {
+ orm.fileloc for orm in existing_orm_dag_codes_by_fileloc_hashes.values()
+ }
+ if not exisitng_orm_filelocs.issubset(filelocs):
+ conflicting_filelocs = exisitng_orm_filelocs.difference(filelocs)
+ hashes_to_filelocs = {
+ DagCode.dag_fileloc_hash(fileloc): fileloc for fileloc in filelocs
+ }
+ message = ""
+ for fileloc in conflicting_filelocs:
+ message += ("Filename '{}' causes a hash collision in the " +
+ "database with '{}'. Please rename the file.")\
+ .format(
+ hashes_to_filelocs[DagCode.dag_fileloc_hash(fileloc)],
+ fileloc)
+ raise AirflowException(message)
+
+ existing_filelocs = {
+ dag_code.fileloc for dag_code in existing_orm_dag_codes
+ }
+ missing_filelocs = filelocs.difference(existing_filelocs)
+
+ for fileloc in missing_filelocs:
+ orm_dag_code = DagCode(fileloc)
+ session.add(orm_dag_code)
+
+ for fileloc in existing_filelocs:
+ old_version = existing_orm_dag_codes_by_fileloc_hashes[
+ filelocs_to_hashes[fileloc]
+ ]
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(fileloc)), tz=timezone.utc)
+
+ if (file_modified - timedelta(seconds=120)) > old_version.last_updated:
+ orm_dag_code.last_updated = timezone.utcnow()
+ orm_dag_code.source_code = DagCode._read_code(orm_dag_code.fileloc)
+ session.update(orm_dag_code)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ session.query(cls).filter(
+ and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs))).delete())
+
+ @classmethod
+ @provide_session
+ def has_dag(cls, fileloc: str, session=None) -> bool:
+ """Checks a file exist in dag_code table.
+
+ :param fileloc: the file to check
+ :param session: ORM Session
+ """
+ fileloc_hash = cls.dag_fileloc_hash(fileloc)
+ return session.query(exists().where(cls.fileloc_hash == fileloc_hash))\
+ .scalar()
+
+ @classmethod
+ def get_code_by_fileloc(cls, fileloc: str) -> str:
+ """Returns source code for a given fileloc.
+
+ :param fileloc: file path of a DAG
+ :return: source code as string
+ """
+ return DagCode(fileloc).code()
+
+ def code(self) -> str:
+ """Returns source code for this DagCode object.
+
+ :return: source code as string
+ """
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ dag_code = session.query(DagCode) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+ if not dag_code:
+ raise DagCodeNotFound()
+ else:
+ code = dag_code.source_code
+ return code
+
+ @staticmethod
+ def dag_fileloc_hash(full_filepath: str) -> int:
+ """"Hashing file location for indexing.
+
+ :param full_filepath: full filepath of DAG file
+ :return: hashed full_filepath
+ """
+ # Hashing is needed because the length of fileloc is 2000 as an Airflow convention,
+ # which is over the limit of indexing.
+ import hashlib
+ # Only 7 bytes because MySQL BigInteger can hold only 8 bytes (signed).
+ return struct.unpack('>Q', hashlib.sha1(
+ full_filepath.encode('utf-8')).digest()[-8:])[0] >> 8
Review comment:
Figured it was something like that, just wanted to check if there was a deeper reason. Thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391632828
##########
File path: airflow/models/dagcode.py
##########
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import struct
+from datetime import datetime, timedelta
+from typing import Iterable, List
+
+from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+
+from airflow.exceptions import AirflowException
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCode(Base):
+ """A table for DAGs code.
+
+ dag_code table contains code of DAG files synchronized by scheduler.
+ This feature is controlled by:
+
+ * ``[core] store_serialized_dags = True``: enable this feature
+ * ``[core] store_dag_code = True``: enable this feature
+
+ For details on dag serialization see SerializedDagModel
+ """
+ __tablename__ = 'dag_code'
+
+ fileloc_hash = Column(
+ BigInteger, nullable=False, primary_key=True, autoincrement=False)
+ fileloc = Column(String(2000), nullable=False)
+ # The max length of fileloc exceeds the limit of indexing.
+ last_updated = Column(UtcDateTime, nullable=False)
+ source_code = Column(UnicodeText(), nullable=False)
+
+ def __init__(self, full_filepath: str):
+ self.fileloc = full_filepath
+ self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
+ self.last_updated = timezone.utcnow()
+ self.source_code = DagCode._read_code(self.fileloc)
+
+ @classmethod
+ def _read_code(cls, fileloc: str):
+ with open_maybe_zipped(fileloc, 'r') as source:
+ source_code = source.read()
+ return source_code
+
+ @provide_session
+ def sync_to_db(self, session=None):
+ """Writes code into database.
+
+ :param session: ORM Session
+ """
+ old_version = session.query(
+ DagCode.fileloc, DagCode.fileloc_hash, DagCode.last_updated) \
+ .filter(DagCode.fileloc_hash == self.fileloc_hash) \
+ .first()
+
+ if old_version and old_version.fileloc != self.fileloc:
+ raise AirflowException(
+ "Filename '{}' causes a hash collision in the database with "
+ "'{}'. Please rename the file.".format(
+ self.fileloc, old_version.fileloc))
+
+ file_modified = datetime.fromtimestamp(
+ os.path.getmtime(correct_maybe_zipped(self.fileloc)), tz=timezone.utc)
+
+ if old_version and (file_modified - timedelta(seconds=120)) < \
+ old_version.last_updated:
+ return
+
+ session.merge(self)
+
+ @classmethod
+ @provide_session
+ def bulk_sync_to_db(cls, filelocs: Iterable[str], session=None):
+ """Writes code in bulk into database.
+
+ :param filelocs: file paths of DAGs to sync
+ :param session: ORM Session
+ """
+ filelocs = set(filelocs)
+ for file in filelocs:
+ DagCode(file).sync_to_db(session=session)
+
+ @classmethod
+ @provide_session
+ def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+ """Deletes code not included in alive_dag_filelocs.
+
+ :param alive_dag_filelocs: file paths of alive DAGs
+ :param session: ORM Session
+ """
+ alive_fileloc_hashes = [
+ cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+ log.debug("Deleting code from %s table ", cls.__tablename__)
+
+ session.execute(
+ cls.__table__.delete().where(
Review comment:
```suggestion
session.query(cls).filter(
```
This way you don't need to use magic methods.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391739979
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Why do we need DAG here? Is DagModel not enough? In the `dag_code.html` file we read only dag_id attibute.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383230521
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
+
+ op.create_index( # pylint: disable=no-member
+ 'idx_fileloc_code_hash', 'dag_code', ['fileloc_hash'])
+
+ conn = op.get_bind() # pylint: disable=no-member
+ if conn.dialect.name == "mysql":
+ conn.execute("SET time_zone = '+00:00'")
+ cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
+ res = cur.fetchall()
+ if res[0][0] == 0:
+ raise Exception(
+ "Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
+ )
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=mysql.TIMESTAMP(fsp=6),
+ nullable=False,
+ )
+ else:
+ # sqlite and mssql datetime are fine as is. Therefore, not converting
+ if conn.dialect.name in ("sqlite", "mssql"):
+ return
+
+ # we try to be database agnostic, but not every db (e.g. sqlserver)
+ # supports per session time zones
+ if conn.dialect.name == "postgresql":
+ conn.execute("set timezone=UTC")
+
+ op.alter_column( # pylint: disable=no-member
+ table_name="dag_code",
+ column_name="last_updated",
+ type_=sa.TIMESTAMP(timezone=True),
+ )
Review comment:
This can be simplified by directly setting `sa.Column('last_updated', sa.TIMESTAMP(timezone=True), nullable=False),` I think, @ashb ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383198519
##########
File path: airflow/config_templates/config.yml
##########
@@ -323,6 +323,15 @@
type: string
example: ~
default: "True"
+ - name: store_code
+ description: |
+ Whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of
+ trying to access files in a DAG folder.
Review comment:
Some users have the DAG File provided by CI/CD. In this case, they want to use the version from the local disk. They do not need a copy in the database, because access to the version from the disk is faster and generates less load.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on issue #7217: [AIRFLOW-NNNN] Store
DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576343258
Yup, that is exactly what we discussed initially as mentioned in https://issues.apache.org/jira/browse/AIRFLOW-5946 :)
To make Webserver not need DAG Files we need to find a way to get Code to display in Code View.
- Store in lazy-loaded column in SerializedDag table
- Save in a new table with DAG_id and store versions as well. Add a limit of last 10 versions (configurable). This is just needed by Code View so not a problem if we store in New table
Will be creating a PR soon
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383194247
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
+ if conf.getboolean('core', 'store_code', fallback=False):
+ return self._get_code_from_db(self.fileloc)
+ else:
+ return self._get_code_from_file(self.fileloc)
+
+ def _get_code_from_file(self, fileloc):
+ with self._open_maybe_zipped(fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @classmethod
+ def _open_maybe_zipped(cls, f, mode='r'):
Review comment:
Should this live in airflow.utils.file alongside `correct_maybe_zipped`, or in airflow.utils.dag_processing? This feels otherwise unrelated to the DAG model.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383046137
##########
File path: airflow/models/dag.py
##########
@@ -1753,6 +1756,44 @@ def get_last_dagrun(self, session=None, include_externally_triggered=False):
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
+ @property
+ def code(self):
Review comment:
The property that invokes a database query is very dangerous. This can result in a mistake that has a big impact on performance.
Here is an example:
https://github.com/apache/airflow/pull/7476
The property that calls the database query caused the scheduler to run 33% slower. While reviewing the code, we never could see this problem.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391637180
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
Here we create cyclical imports. These 3 methods should be in the DagCode class. I don't see the reason why the person who calls the method cannot call ``DagCode.get_by_fileloc (dag.fileloc)``
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7217: [AIRFLOW-NNNN] Store
DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576314662
I'm not sure if storing this source code in DagModel is a good idea. One file can contain many DAG definitions. This file can be very large. I think it is worth introducing support for saving DAGs, but using a different data model. I will try to prepare a document that will describe the proposed solutions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7217: [AIRFLOW-NNNN] Store
DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576324576
> The webserver no longer needs access to the dags folder in the shared filesystem.
This will still be required in several cases:
* To get the DAG structure, if DAG serialization is turned off
* **Rendered Template** tab will still have to parse Python file as it needs all the details like
the execution date and even the data passed by the upstream task using Xcom.
I think that this option should be optional. Many instances of Airflow do not need to store the source code in a database. This can have a negative impact on their performance.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/3bb60afc7b8319996385d681faac342afe2b3bd2&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.83%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.89% 86.41% -0.49%
==========================================
Files 906 907 +1
Lines 43906 44018 +112
==========================================
- Hits 38153 38039 -114
- Misses 5753 5979 +226
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.32%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `92.47% <92.47%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.97% <100.00%> (+0.16%)` | :arrow_up: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.55% <100.00%> (+0.35%)` | :arrow_up: |
| ... and [18 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [ccbaf57...50f4874](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391739979
##########
File path: airflow/www/views.py
##########
@@ -519,17 +519,23 @@ def last_dagruns(self, session=None):
@has_access
@provide_session
def code(self, session=None):
- dm = models.DagModel
- dag_id = request.args.get('dag_id')
- dag = session.query(dm).filter(dm.dag_id == dag_id).first()
+ all_errors = ""
+
try:
- with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
- code = f.read()
+ dag_id = request.args.get('dag_id')
+ dag_orm = DagModel.get_dagmodel(dag_id, session=session)
+ dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
Review comment:
Why do we need DAG here? Is DagModel not enough? In the `dag_code.html` file we read only dag_id.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383201934
##########
File path: airflow/migrations/versions/788fcbd36a03_add_source_code_table.py
##########
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add source code table
+
+Revision ID: 788fcbd36a03
+Revises: a4c2fd67d16b
+Create Date: 2020-02-20 14:40:23.257645
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '788fcbd36a03'
+down_revision = 'a4c2fd67d16b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply add source code table"""
+ op.create_table('dag_code', # pylint: disable=no-member
+ sa.Column('fileloc', sa.String(length=2000), nullable=False),
+ sa.Column('fileloc_hash', sa.Integer(), nullable=False),
+ sa.Column('source_code', sa.Text(), nullable=False),
+ sa.Column('last_updated', sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint('fileloc', 'fileloc_hash'))
Review comment:
For now the only query that uses this table to get the code uses fileloc_hash column in where condition.
Besides, correct me if I am wrong, most dbs create an index on primary key by default. But maybe it is better to stay on the safe side. I have no strong opinion here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on issue #7217: [AIRFLOW-5946] Store
source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-598186239
@kaxil @ashb @mik-laj The build is finally green. I believe I have addressed all your comments. Would you mind and have another look?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil merged pull request #7217: [AIRFLOW-5946] Store
source code in db
Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383235423
##########
File path: docs/dag-serialization.rst
##########
@@ -63,6 +63,8 @@ Add the following settings in ``airflow.cfg``:
If set to True, Webserver reads from DB instead of parsing DAG files
* ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
the serialized DAG in DB should be updated. This helps in reducing database write rate.
+* ``store_code``: This flag decides whether to persist DAG files code in DB.
+ If set to True, Webserver reads file contents from DB instead of trying to access files in a DAG folder.
If you are updating Airflow from <1.10.7, please do not forget to run ``airflow db upgrade``.
Review comment:
We can remove the limitation at https://github.com/apache/airflow/pull/7217/files#diff-e2bf1cfc6c93c9cada2dad85c28e4612R79-R80
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391747361
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
Maybe we should introduce a new class DagCodeBag or something similar?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] potiuk commented on issue #7217: [AIRFLOW-5946] Store
source code in db
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-598810193
:tada:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#issuecomment-579802747
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=h1) Report
> Merging [#7217](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/3bb60afc7b8319996385d681faac342afe2b3bd2&el=desc) will **decrease** coverage by `0.48%`.
> The diff coverage is `91.83%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7217/graphs/tree.svg?width=650&height=150&src=pr&token=WdLKlKHOAU)](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7217 +/- ##
==========================================
- Coverage 86.89% 86.41% -0.49%
==========================================
Files 906 907 +1
Lines 43906 44018 +112
==========================================
- Hits 38153 38039 -114
- Misses 5753 5979 +226
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/www/utils.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdXRpbHMucHk=) | `79.39% <ø> (-0.99%)` | :arrow_down: |
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.02% <33.33%> (-0.32%)` | :arrow_down: |
| [airflow/api/common/experimental/get\_code.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfY29kZS5weQ==) | `72.72% <60.00%> (-4.20%)` | :arrow_down: |
| [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.21% <66.66%> (-0.10%)` | :arrow_down: |
| [airflow/models/dagcode.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnY29kZS5weQ==) | `92.47% <92.47%> (ø)` | |
| [airflow/api/common/experimental/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9fX2luaXRfXy5weQ==) | `92.00% <100.00%> (-0.60%)` | :arrow_down: |
| [airflow/exceptions.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9leGNlcHRpb25zLnB5) | `100.00% <100.00%> (ø)` | |
| [airflow/models/serialized\_dag.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2VyaWFsaXplZF9kYWcucHk=) | `92.40% <100.00%> (-0.28%)` | :arrow_down: |
| [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.97% <100.00%> (+0.16%)` | :arrow_up: |
| [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.55% <100.00%> (+0.35%)` | :arrow_up: |
| ... and [18 more](https://codecov.io/gh/apache/airflow/pull/7217/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=footer). Last update [ccbaf57...50f4874](https://codecov.io/gh/apache/airflow/pull/7217?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7217: [AIRFLOW-NNNN] Store
DAG's source code in the serialized_dag table
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7217: [AIRFLOW-NNNN] Store DAG's source code in the serialized_dag table
URL: https://github.com/apache/airflow/pull/7217#issuecomment-576335959
# Schema database
The considerations focus on the collection table and the further diagrams will only be the following tables:
* dag
* serialized_dag
#Current schema
<img width="803" alt="Screenshot 2020-01-20 at 16 20 41" src="https://user-images.githubusercontent.com/12058428/72740623-54b55000-3ba6-11ea-8da3-55630b0e036f.png">
## Schema changes proposed by anitakar
<img width="803" alt="Screenshot 2020-01-20 at 16 20 41" src="https://user-images.githubusercontent.com/12058428/72740538-26377500-3ba6-11ea-870b-3f951682677c.png">
Anita suggests adding a new `source_code` field in the `serialzed_dag` table
# My proposition
<img width="438" alt="Screenshot 2020-01-20 at 16 56 23" src="https://user-images.githubusercontent.com/12058428/72740548-2df71980-3ba6-11ea-91e4-7056df323fc1.png">
I think, we should add new `dag_file` table to avoid duplication of source code. New table have fileloc and fileloc_hash as primary key. The dag table contains only the fileloc field, but I think it would also be helpful to add fileloc_hash. I also used the blob type, because we don't need to process this code by text functions in the database.
Migration script for PostgresSQL
```sql
create table dag_file
(
fileloc varchar(2000) not null,
fileloc_hash integer not null,
last_updated timestamp with time zone not null,
source_code BYTEA NOT NULL,
PRIMARY KEY (fileloc, fileloc_hash)
);
alter table dag add fileloc_hash integer not null DEFAULT 0;
alter table dag alter column fileloc_hash drop default;
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391747361
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
Maybe should we introduce a new class DagCodeBag or something similar?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217:
[AIRFLOW-5946] Store source code in db
Posted by GitBox <gi...@apache.org>.
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r391805786
##########
File path: airflow/models/dag.py
##########
@@ -591,6 +593,29 @@ def owner(self) -> str:
"""
return ", ".join({t.owner for t in self.tasks})
+ def code(self):
+ if conf.getboolean('core', 'store_dag_code', fallback=False):
+ return self._get_code_from_db()
+ else:
+ return self._get_code_from_file()
+
+ def _get_code_from_file(self):
+ with open_maybe_zipped(self.fileloc, 'r') as f:
+ code = f.read()
+ return code
+
+ @provide_session
+ def _get_code_from_db(self, session=None):
+ fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services