You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/07/22 11:04:26 UTC
incubator-airflow git commit: [AIRFLOW-348] Fix code style warnings
Repository: incubator-airflow
Updated Branches:
refs/heads/master b6e609824 -> 189e6b887
[AIRFLOW-348] Fix code style warnings
Closes #1672 from skudriashev/airflow-348
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/189e6b88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/189e6b88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/189e6b88
Branch: refs/heads/master
Commit: 189e6b88742ace8c46e72d59d7662284e34b7a2e
Parents: b6e6098
Author: Stanislav Kudriashev <st...@gmail.com>
Authored: Fri Jul 22 13:03:45 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jul 22 13:04:03 2016 +0200
----------------------------------------------------------------------
airflow/configuration.py | 7 ++---
airflow/contrib/hooks/qubole_hook.py | 1 -
airflow/example_dags/example_python_operator.py | 11 +++----
airflow/hooks/__init__.py | 1 -
airflow/hooks/dbapi_hook.py | 15 +++++-----
airflow/hooks/hdfs_hook.py | 13 ++++-----
airflow/hooks/jdbc_hook.py | 8 ++----
airflow/hooks/webhdfs_hook.py | 16 +++++------
airflow/macros/__init__.py | 1 -
airflow/models.py | 4 +--
airflow/operators/__init__.py | 1 -
airflow/operators/dagrun_operator.py | 1 +
airflow/settings.py | 5 +++-
airflow/utils/file.py | 12 ++++----
airflow/utils/helpers.py | 2 ++
airflow/www/views.py | 30 +++++++++-----------
16 files changed, 58 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index e03b713..5a380ae 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -661,10 +661,9 @@ def mkdir_p(path):
else:
raise AirflowConfigException('Had trouble creating a directory')
-"""
-Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
-"~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.
-"""
+
+# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
+# "~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.
if 'AIRFLOW_HOME' not in os.environ:
AIRFLOW_HOME = expand_env_var('~/airflow')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 57d00b5..694b12f 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -84,7 +84,6 @@ class QuboleHook(BaseHook):
if self.cmd.status != 'done':
raise AirflowException('Command Id: {0} failed with Status: {1}'.format(self.cmd.id, self.cmd.status))
-
def kill(self, ti):
"""
Kill (cancel) a Qubole commmand
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 6c0b93f..c5d7193 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -34,7 +34,7 @@ dag = DAG(
def my_sleeping_function(random_base):
- '''This is a function that will run within the DAG execution'''
+ """This is a function that will run within the DAG execution"""
time.sleep(random_base)
@@ -49,15 +49,12 @@ run_this = PythonOperator(
python_callable=print_context,
dag=dag)
+# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
- '''
- Generating 10 sleeping task, sleeping from 0 to 9 seconds
- respectively
- '''
task = PythonOperator(
- task_id='sleep_for_'+str(i),
+ task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
- op_kwargs={'random_base': float(i)/10},
+ op_kwargs={'random_base': float(i) / 10},
dag=dag)
task.set_upstream(run_this)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 4c1891d..883018d 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -70,7 +70,6 @@ def _integrate_plugins():
sys.modules[_hook_module.__name__] = _hook_module
globals()[_hook_module._name] = _hook_module
-
##########################################################
# TODO FIXME Remove in Airflow 2.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 55f2d95..04af16e 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -56,9 +56,8 @@ class DbApiHook(BaseHook):
username=db.login,
schema=db.schema)
-
def get_pandas_df(self, sql, parameters=None):
- '''
+ """
Executes the sql and returns a pandas dataframe
:param sql: the sql statement to be executed (str) or a list of
@@ -66,7 +65,7 @@ class DbApiHook(BaseHook):
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
- '''
+ """
if sys.version_info[0] < 3:
sql = sql.encode('utf-8')
import pandas.io.sql as psql
@@ -76,7 +75,7 @@ class DbApiHook(BaseHook):
return df
def get_records(self, sql, parameters=None):
- '''
+ """
Executes the sql and returns a set of records.
:param sql: the sql statement to be executed (str) or a list of
@@ -84,7 +83,7 @@ class DbApiHook(BaseHook):
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
- '''
+ """
if sys.version_info[0] < 3:
sql = sql.encode('utf-8')
conn = self.get_conn()
@@ -99,7 +98,7 @@ class DbApiHook(BaseHook):
return rows
def get_first(self, sql, parameters=None):
- '''
+ """
Executes the sql and returns the first resulting row.
:param sql: the sql statement to be executed (str) or a list of
@@ -107,7 +106,7 @@ class DbApiHook(BaseHook):
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
- '''
+ """
if sys.version_info[0] < 3:
sql = sql.encode('utf-8')
conn = self.get_conn()
@@ -141,7 +140,7 @@ class DbApiHook(BaseHook):
sql = [sql]
if self.supports_autocommit:
- self.set_autocommit(conn, autocommit)
+ self.set_autocommit(conn, autocommit)
cur = conn.cursor()
for s in sql:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/hdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index e84595c..69dccd0 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -29,9 +29,9 @@ class HDFSHookException(AirflowException):
class HDFSHook(BaseHook):
- '''
+ """
Interact with HDFS. This class is a wrapper around the snakebite library.
- '''
+ """
def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None):
if not snakebite_imported:
raise ImportError(
@@ -43,18 +43,15 @@ class HDFSHook(BaseHook):
self.proxy_user = proxy_user
def get_conn(self):
- '''
+ """
Returns a snakebite HDFSClient object.
- '''
+ """
connections = self.get_connections(self.hdfs_conn_id)
-
use_sasl = False
if configuration.get('core', 'security') == 'kerberos':
use_sasl = True
- client = None
-
- ''' When using HAClient, proxy_user must be the same, so is ok to always take the first '''
+ # When using HAClient, proxy_user must be the same, so is ok to always take the first.
effective_user = self.proxy_user or connections[0].login
if len(connections) == 1:
autoconfig = connections[0].extra_dejson.get('autoconfig', False)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/jdbc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py
index d0091e1..bc1f352 100644
--- a/airflow/hooks/jdbc_hook.py
+++ b/airflow/hooks/jdbc_hook.py
@@ -27,8 +27,6 @@ class JdbcHook(DbApiHook):
Raises an airflow error if the given connection id doesn't exist.
Otherwise host, port, schema, username and password can be specified on the fly.
-
-
:param jdbc_url: jdbc connection url
:type jdbc_url: string
:param jdbc_driver_name: jdbc driver name
@@ -42,7 +40,6 @@ class JdbcHook(DbApiHook):
a '.sql' extensions.
"""
-
conn_name_attr = 'jdbc_conn_id'
default_conn_name = 'jdbc_default'
supports_autocommit = True
@@ -56,13 +53,14 @@ class JdbcHook(DbApiHook):
jdbc_driver_name = conn.extra_dejson.get('extra__jdbc__drv_clsname')
conn = jaydebeapi.connect(jdbc_driver_name,
- [str(host), str(login), str(psw)],
+ [str(host), str(login), str(psw)],
jdbc_driver_loc,)
return conn
def set_autocommit(self, conn, autocommit):
"""
- Enable or disable autocommit for the given connection
+ Enable or disable autocommit for the given connection.
+
:param conn: The connection
:return:
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index f808865..5e2a28d 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -20,11 +20,11 @@ from hdfs import InsecureClient, HdfsError
_kerberos_security_mode = configuration.get("core", "security") == "kerberos"
if _kerberos_security_mode:
- try:
- from hdfs.ext.kerberos import KerberosClient
- except ImportError:
- logging.error("Could not load the Kerberos extension for the WebHDFSHook.")
- raise
+ try:
+ from hdfs.ext.kerberos import KerberosClient
+ except ImportError:
+ logging.error("Could not load the Kerberos extension for the WebHDFSHook.")
+ raise
from airflow.exceptions import AirflowException
@@ -50,10 +50,10 @@ class WebHDFSHook(BaseHook):
logging.debug('Trying namenode {}'.format(nn.host))
connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
if _kerberos_security_mode:
- client = KerberosClient(connection_str)
+ client = KerberosClient(connection_str)
else:
- proxy_user = self.proxy_user or nn.login
- client = InsecureClient(connection_str, user=proxy_user)
+ proxy_user = self.proxy_user or nn.login
+ client = InsecureClient(connection_str, user=proxy_user)
client.status('/')
logging.debug('Using namenode {} for hook'.format(nn.host))
return client
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/macros/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py
index da4eb3d..0e1ba70 100644
--- a/airflow/macros/__init__.py
+++ b/airflow/macros/__init__.py
@@ -70,7 +70,6 @@ def _integrate_plugins():
sys.modules[_macro_module.__name__] = _macro_module
globals()[_macro_module._name] = _macro_module
-
##########################################################
# TODO FIXME Remove in Airflow 2.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7193765..4e6eb0f 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1160,7 +1160,6 @@ class TaskInstance(Base):
delay = min(self.task.max_retry_delay, delay)
return self.end_date + delay
-
def ready_for_retry(self):
"""
Checks on whether the task instance is in the right state and timeframe
@@ -2843,11 +2842,11 @@ class DAG(LoggingMixin):
include_subdags=True,
reset_dag_runs=True,
dry_run=False):
- session = settings.Session()
"""
Clears a set of task instances associated with the current dag for
a specified date range.
"""
+ session = settings.Session()
TI = TaskInstance
tis = session.query(TI)
if include_subdags:
@@ -3224,6 +3223,7 @@ class Variable(Base):
def val(cls):
return synonym('_val',
descriptor=property(cls.get_val, cls.set_val))
+
@classmethod
@provide_session
def get(cls, key, default_var=None, deserialize_json=False, session=None):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 71acfcd..67c4db9 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -106,7 +106,6 @@ def _integrate_plugins():
sys.modules[_operator_module.__name__] = _operator_module
globals()[_operator_module._name] = _operator_module
-
##########################################################
# TODO FIXME Remove in Airflow 2.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index dc42d67..d514acd 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -46,6 +46,7 @@ class TriggerDagRunOperator(BaseOperator):
template_fields = tuple()
template_ext = tuple()
ui_color = '#ffefeb'
+
@apply_defaults
def __init__(
self,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index b35205a..9f9bb14 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -28,15 +28,19 @@ from airflow import configuration as conf
class DummyStatsLogger(object):
+
@classmethod
def incr(cls, stat, count=1, rate=1):
pass
+
@classmethod
def decr(cls, stat, count=1, rate=1):
pass
+
@classmethod
def gauge(cls, stat, value, rate=1, delta=False):
pass
+
@classmethod
def timing(cls, stat, dt):
pass
@@ -54,7 +58,6 @@ else:
Stats = DummyStatsLogger
-
HEADER = """\
____________ _____________
____ |__( )_________ __/__ /________ __
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/utils/file.py
----------------------------------------------------------------------
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 183c834..d4526e9 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -28,9 +28,9 @@ def TemporaryDirectory(suffix='', prefix=None, dir=None):
try:
yield name
finally:
- try:
- shutil.rmtree(name)
- except OSError as e:
- # ENOENT - no such file or directory
- if e.errno != errno.ENOENT:
- raise e
+ try:
+ shutil.rmtree(name)
+ except OSError as e:
+ # ENOENT - no such file or directory
+ if e.errno != errno.ENOENT:
+ raise e
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index bddf0cb..517d5f1 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -164,8 +164,10 @@ def pprinttable(rows):
s += separator + '\n'
s += (hpattern % tuple(headers)) + '\n'
s += separator + '\n'
+
def f(t):
return "{}".format(t) if isinstance(t, basestring) else t
+
for line in rows:
s += pattern % tuple(f(t) for t in line) + '\n'
s += separator + '\n'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 122f0e0..aff6b54 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -304,7 +304,6 @@ class Airflow(BaseView):
session.commit()
session.close()
-
payload = {}
payload['state'] = 'ERROR'
payload['error'] = ''
@@ -424,7 +423,6 @@ class Airflow(BaseView):
except Exception as e:
payload['error'] = str(e)
-
payload['state'] = 'SUCCESS'
payload['request_dict'] = request_dict
return wwwutils.json_response(payload)
@@ -546,7 +544,6 @@ class Airflow(BaseView):
payload[dag.safe_dag_id].append(d)
return wwwutils.json_response(payload)
-
@expose('/code')
@login_required
def code(self):
@@ -857,7 +854,7 @@ class Airflow(BaseView):
task_id=task_id,
execution_date=execution_date,
form=form,
- dag=dag, title=title)\
+ dag=dag, title=title)
@expose('/run')
@login_required
@@ -1924,19 +1921,18 @@ class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView):
class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
pass
-'''
-# For debugging / troubleshooting
-mv = KnowEventTypeView(
- models.KnownEventType,
- Session, name="Known Event Types", category="Manage")
-admin.add_view(mv)
-class DagPickleView(SuperUserMixin, ModelView):
- pass
-mv = DagPickleView(
- models.DagPickle,
- Session, name="Pickles", category="Manage")
-admin.add_view(mv)
-'''
+
+# NOTE: For debugging / troubleshooting
+# mv = KnowEventTypeView(
+# models.KnownEventType,
+# Session, name="Known Event Types", category="Manage")
+# admin.add_view(mv)
+# class DagPickleView(SuperUserMixin, ModelView):
+# pass
+# mv = DagPickleView(
+# models.DagPickle,
+# Session, name="Pickles", category="Manage")
+# admin.add_view(mv)
class VariableView(wwwutils.LoginMixin, AirflowModelView):