You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/18 22:30:32 UTC
incubator-airflow git commit: [AIRFLOW-2429] Make Airflow flake8
compliant
Repository: incubator-airflow
Updated Branches:
refs/heads/master 76b68b82d -> 06aec8ea6
[AIRFLOW-2429] Make Airflow flake8 compliant
Closes #3342 from feng-tao/airflow-2429
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06aec8ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06aec8ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06aec8ea
Branch: refs/heads/master
Commit: 06aec8ea6b4d8f551a1360f79a1a58114f614753
Parents: 76b68b8
Author: Tao feng <tf...@lyft.com>
Authored: Sat May 19 00:29:59 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sat May 19 00:30:05 2018 +0200
----------------------------------------------------------------------
airflow/configuration.py | 24 +++----
airflow/models.py | 145 ++++++++++++++++++++++--------------------
airflow/version.py | 4 +-
3 files changed, 87 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06aec8ea/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 20ef067..e19a8b1 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -22,31 +22,27 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
+from builtins import str
+from collections import OrderedDict
import copy
import errno
+from future import standard_library
import os
-import subprocess
-import warnings
import shlex
-import sys
-
-from future import standard_library
-
import six
from six import iteritems
+import subprocess
+import sys
+import warnings
+
from backports.configparser import ConfigParser
from zope.deprecation import deprecated as _deprecated
+from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
-from builtins import str
-from collections import OrderedDict
-
-from airflow.exceptions import AirflowConfigException
-
-
log = LoggingMixin().log
# show Airflow's deprecation warnings
@@ -323,8 +319,8 @@ class AirflowConfigParser(ConfigParser):
opt = None
if opt:
if (
- not display_sensitive
- and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
+ not display_sensitive and
+ ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
if display_source:
opt = (opt, 'env var')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06aec8ea/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7aab4b5..4c1be8e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -28,6 +28,7 @@ from builtins import str
from builtins import object, bytes
import copy
from collections import namedtuple, defaultdict
+import cryptography
from datetime import timedelta
import dill
@@ -59,7 +60,6 @@ from sqlalchemy import (
Index, Float, LargeBinary)
from sqlalchemy import func, or_, and_, true as sqltrue
from sqlalchemy.ext.declarative import declarative_base, declared_attr
-from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import reconstructor, relationship, synonym
from sqlalchemy_utc import UtcDateTime
@@ -77,7 +77,6 @@ from airflow.lineage import apply_lineage, prepare_lineage
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
-from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils import timezone
@@ -86,7 +85,7 @@ from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.email import send_email
from airflow.utils.helpers import (
- as_tuple, is_container, is_in, validate_key, pprinttable)
+ as_tuple, is_container, validate_key, pprinttable)
from airflow.utils.operator_resources import Resources
from airflow.utils.state import State
from airflow.utils.timeout import timeout
@@ -103,6 +102,7 @@ XCOM_RETURN_KEY = 'return_value'
Stats = settings.Stats
+
def get_fernet():
"""
Deferred load of Fernet key.
@@ -115,7 +115,7 @@ def get_fernet():
"""
try:
from cryptography.fernet import Fernet
- except:
+ except ImportError:
raise AirflowException('Failed to import Fernet, it may not be installed')
try:
return Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8'))
@@ -126,6 +126,7 @@ def get_fernet():
# Used by DAG context_managers
_CONTEXT_MANAGER_DAG = None
+
def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
"""
Clears a set of task instances, but makes sure the running ones
@@ -247,7 +248,7 @@ class DagBag(BaseDagBag, LoggingMixin):
filepath=orm_dag.fileloc, only_if_updated=False)
# If the source file no longer exports `dag_id`, delete it from self.dags
- if found_dags and dag_id in [dag.dag_id for dag in found_dags]:
+ if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
return self.dags[dag_id]
elif dag_id in self.dags:
del self.dags[dag_id]
@@ -354,7 +355,6 @@ class DagBag(BaseDagBag, LoggingMixin):
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
-
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
@@ -429,7 +429,6 @@ class DagBag(BaseDagBag, LoggingMixin):
del self.dags[subdag.dag_id]
raise cycle_exception
-
def collect_dags(
self,
dag_folder=None,
@@ -644,7 +643,7 @@ class Connection(Base, LoggingMixin):
if self._password and self.is_encrypted:
try:
fernet = get_fernet()
- except:
+ except AirflowException:
raise AirflowException(
"Can't decrypt encrypted password for login={}, \
FERNET_KEY configuration is missing".format(self.login))
@@ -660,7 +659,7 @@ class Connection(Base, LoggingMixin):
self.is_encrypted = True
except AirflowException:
self.log.exception("Failed to load fernet while encrypting value, "
- "using non-encrypted value.")
+ "using non-encrypted value.")
self._password = value
self.is_encrypted = False
@@ -673,7 +672,7 @@ class Connection(Base, LoggingMixin):
if self._extra and self.is_extra_encrypted:
try:
fernet = get_fernet()
- except:
+ except AirflowException:
raise AirflowException(
"Can't decrypt `extra` params for login={},\
FERNET_KEY configuration is missing".format(self.login))
@@ -689,7 +688,7 @@ class Connection(Base, LoggingMixin):
self.is_extra_encrypted = True
except AirflowException:
self.log.exception("Failed to load fernet while encrypting value, "
- "using non-encrypted value.")
+ "using non-encrypted value.")
self._extra = value
self.is_extra_encrypted = False
else:
@@ -757,7 +756,7 @@ class Connection(Base, LoggingMixin):
elif self.conn_type == 'cassandra':
from airflow.contrib.hooks.cassandra_hook import CassandraHook
return CassandraHook(cassandra_conn_id=self.conn_id)
- except:
+ except Exception:
pass
def __repr__(self):
@@ -1330,8 +1329,11 @@ class TaskInstance(Base, LoggingMixin):
if self.task.retry_exponential_backoff:
min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2)))
# deterministic per task instance
- hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id,
- self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16)
+ hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,
+ self.task_id,
+ self.execution_date,
+ self.try_number)
+ .encode('utf-8')).hexdigest(), 16)
# between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number)
modded_hash = min_backoff + hash % min_backoff
# timedelta has a maximum representable value. The exponentiation
@@ -1453,7 +1455,7 @@ class TaskInstance(Base, LoggingMixin):
session.commit()
return False
- #TODO: Logging needs cleanup, not clear what is being printed
+ # TODO: Logging needs cleanup, not clear what is being printed
hr = "\n" + ("-" * 80) + "\n" # Line break
# For reporting purposes, we report based on 1-indexed,
@@ -1518,7 +1520,8 @@ class TaskInstance(Base, LoggingMixin):
settings.engine.dispose()
if verbose:
if mark_success:
- msg = "Marking success for {} on {}".format(self.task, self.execution_date)
+ msg = "Marking success for {} on {}".format(self.task,
+ self.execution_date)
self.log.info(msg)
else:
msg = "Executing {} on {}".format(self.task, self.execution_date)
@@ -1661,23 +1664,23 @@ class TaskInstance(Base, LoggingMixin):
pool=None,
session=None):
res = self._check_and_change_state_before_execution(
- verbose=verbose,
- ignore_all_deps=ignore_all_deps,
- ignore_depends_on_past=ignore_depends_on_past,
- ignore_task_deps=ignore_task_deps,
- ignore_ti_state=ignore_ti_state,
+ verbose=verbose,
+ ignore_all_deps=ignore_all_deps,
+ ignore_depends_on_past=ignore_depends_on_past,
+ ignore_task_deps=ignore_task_deps,
+ ignore_ti_state=ignore_ti_state,
+ mark_success=mark_success,
+ test_mode=test_mode,
+ job_id=job_id,
+ pool=pool,
+ session=session)
+ if res:
+ self._run_raw_task(
mark_success=mark_success,
test_mode=test_mode,
job_id=job_id,
pool=pool,
session=session)
- if res:
- self._run_raw_task(
- mark_success=mark_success,
- test_mode=test_mode,
- job_id=job_id,
- pool=pool,
- session=session)
def dry_run(self):
task = self.task
@@ -2074,7 +2077,7 @@ class SkipMixin(LoggingMixin):
TaskInstance.dag_id == dag_run.dag_id,
TaskInstance.execution_date == dag_run.execution_date,
TaskInstance.task_id.in_(task_ids)
- ).update({TaskInstance.state : State.SKIPPED,
+ ).update({TaskInstance.state: State.SKIPPED,
TaskInstance.start_date: now,
TaskInstance.end_date: now},
synchronize_session=False)
@@ -2732,8 +2735,7 @@ class BaseOperator(LoggingMixin):
return self._downstream_task_ids
@provide_session
- def clear(
- self,
+ def clear(self,
start_date=None,
end_date=None,
upstream=False,
@@ -2810,7 +2812,6 @@ class BaseOperator(LoggingMixin):
return list(map(lambda task_id: self._dag.task_dict[task_id],
self.get_flat_relative_ids(upstream)))
-
def run(
self,
start_date=None,
@@ -2970,8 +2971,9 @@ class DagModel(Base):
dag_id = Column(String(ID_LEN), primary_key=True)
# A DAG can be paused from the UI / DB
# Set this default value of is_paused based on a configuration value!
- is_paused_at_creation = configuration.conf.getboolean('core',
- 'dags_are_paused_at_creation')
+ is_paused_at_creation = configuration.conf\
+ .getboolean('core',
+ 'dags_are_paused_at_creation')
is_paused = Column(Boolean, default=is_paused_at_creation)
# Whether the DAG is a subdag
is_subdag = Column(Boolean, default=False)
@@ -3072,7 +3074,8 @@ class DAG(BaseDag, LoggingMixin):
:param sla_miss_callback: specify a function to call when reporting SLA
timeouts.
:type sla_miss_callback: types.FunctionType
- :param default_view: Specify DAG default view (tree, graph, duration, gantt, landing_times)
+ :param default_view: Specify DAG default view (tree, graph, duration,
+ gantt, landing_times)
:type default_view: string
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
:type orientation: string
@@ -3539,14 +3542,14 @@ class DAG(BaseDag, LoggingMixin):
# Check SubDag for class but don't check class directly, see
# https://github.com/airbnb/airflow/issues/1168
from airflow.operators.subdag_operator import SubDagOperator
- l = []
+ subdag_lst = []
for task in self.tasks:
if (isinstance(task, SubDagOperator) or
- #TODO remove in Airflow 2.0
- type(task).__name__ == 'SubDagOperator'):
- l.append(task.subdag)
- l += task.subdag.subdags
- return l
+ # TODO remove in Airflow 2.0
+ type(task).__name__ == 'SubDagOperator'):
+ subdag_lst.append(task.subdag)
+ subdag_lst += task.subdag.subdags
+ return subdag_lst
def resolve_template_files(self):
for t in self.tasks:
@@ -4276,13 +4279,13 @@ class Variable(Base, LoggingMixin):
if self._val and self.is_encrypted:
try:
fernet = get_fernet()
- except:
+ except Exception:
log.error("Can't decrypt _val for key={}, FERNET_KEY "
"configuration missing".format(self.key))
return None
try:
return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
- except:
+ except cryptography.fernet.InvalidToken:
log.error("Can't decrypt _val for key={}, invalid token "
"or value".format(self.key))
return None
@@ -4297,7 +4300,8 @@ class Variable(Base, LoggingMixin):
self.is_encrypted = True
except AirflowException:
self.log.exception(
- "Failed to load fernet while encrypting value, using non-encrypted value."
+ "Failed to load fernet while encrypting value, "
+ "using non-encrypted value."
)
self._val = value
self.is_encrypted = False
@@ -4323,7 +4327,8 @@ class Variable(Base, LoggingMixin):
:return: Mixed
"""
default_sentinel = object()
- obj = Variable.get(key, default_var=default_sentinel, deserialize_json=deserialize_json)
+ obj = Variable.get(key, default_var=default_sentinel,
+ deserialize_json=deserialize_json)
if obj is default_sentinel:
if default is not None:
Variable.set(key, default, serialize_json=deserialize_json)
@@ -4449,8 +4454,7 @@ class XCom(Base, LoggingMixin):
@classmethod
@provide_session
- def get_one(
- cls,
+ def get_one(cls,
execution_date,
key=None,
task_id=None,
@@ -4460,9 +4464,11 @@ class XCom(Base, LoggingMixin):
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria.
- TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0.
+ TODO: "pickling" has been deprecated and JSON is preferred.
+ "pickling" will be removed in Airflow 2.0.
- :param enable_pickling: If pickling is not enabled, the XCOM value will be parsed to JSON instead.
+ :param enable_pickling: If pickling is not enabled,
+ the XCOM value will be parsed to JSON instead.
:return: XCom value
"""
filters = []
@@ -4478,9 +4484,8 @@ class XCom(Base, LoggingMixin):
filters.append(cls.execution_date == execution_date)
query = (
- session.query(cls.value)
- .filter(and_(*filters))
- .order_by(cls.execution_date.desc(), cls.timestamp.desc()))
+ session.query(cls.value).filter(and_(*filters))
+ .order_by(cls.execution_date.desc(), cls.timestamp.desc()))
result = query.first()
if result:
@@ -4504,19 +4509,19 @@ class XCom(Base, LoggingMixin):
@classmethod
@provide_session
- def get_many(
- cls,
- execution_date,
- key=None,
- task_ids=None,
- dag_ids=None,
- include_prior_dates=False,
- limit=100,
- enable_pickling=None,
- session=None):
+ def get_many(cls,
+ execution_date,
+ key=None,
+ task_ids=None,
+ dag_ids=None,
+ include_prior_dates=False,
+ limit=100,
+ enable_pickling=None,
+ session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria
- TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0.
+ TODO: "pickling" has been deprecated and JSON is preferred.
+ "pickling" will be removed in Airflow 2.0.
"""
filters = []
if key:
@@ -4531,10 +4536,9 @@ class XCom(Base, LoggingMixin):
filters.append(cls.execution_date == execution_date)
query = (
- session.query(cls)
- .filter(and_(*filters))
- .order_by(cls.execution_date.desc(), cls.timestamp.desc())
- .limit(limit))
+ session.query(cls).filter(and_(*filters))
+ .order_by(cls.execution_date.desc(), cls.timestamp.desc())
+ .limit(limit))
results = query.all()
if enable_pickling is None:
enable_pickling = configuration.conf.getboolean(
@@ -4625,7 +4629,7 @@ class DagStat(Base):
if dag_ids:
qry = qry.filter(DagStat.dag_id.in_(set(dag_ids)))
if dirty_only:
- qry = qry.filter(DagStat.dirty == True)
+ qry = qry.filter(DagStat.dirty == True) # noqa
qry = qry.with_for_update().all()
@@ -4919,7 +4923,8 @@ class DagRun(Base, LoggingMixin):
session=session
)
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
- none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
+ none_task_concurrency = all(t.task.task_concurrency is None
+ for t in unfinished_tasks)
# small speed up
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
# todo: this can actually get pretty slow: one task costs between 0.01-015s
@@ -5036,7 +5041,7 @@ class DagRun(Base, LoggingMixin):
"""
qry = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
- DagRun.external_trigger == False,
+ DagRun.external_trigger == False, # noqa
DagRun.execution_date == execution_date,
)
return qry.first()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06aec8ea/airflow/version.py
----------------------------------------------------------------------
diff --git a/airflow/version.py b/airflow/version.py
index 750da36..d11d766 100644
--- a/airflow/version.py
+++ b/airflow/version.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY