You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/07/22 11:04:26 UTC

incubator-airflow git commit: [AIRFLOW-348] Fix code style warnings

Repository: incubator-airflow
Updated Branches:
  refs/heads/master b6e609824 -> 189e6b887


[AIRFLOW-348] Fix code style warnings

Closes #1672 from skudriashev/airflow-348


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/189e6b88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/189e6b88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/189e6b88

Branch: refs/heads/master
Commit: 189e6b88742ace8c46e72d59d7662284e34b7a2e
Parents: b6e6098
Author: Stanislav Kudriashev <st...@gmail.com>
Authored: Fri Jul 22 13:03:45 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jul 22 13:04:03 2016 +0200

----------------------------------------------------------------------
 airflow/configuration.py                        |  7 ++---
 airflow/contrib/hooks/qubole_hook.py            |  1 -
 airflow/example_dags/example_python_operator.py | 11 +++----
 airflow/hooks/__init__.py                       |  1 -
 airflow/hooks/dbapi_hook.py                     | 15 +++++-----
 airflow/hooks/hdfs_hook.py                      | 13 ++++-----
 airflow/hooks/jdbc_hook.py                      |  8 ++----
 airflow/hooks/webhdfs_hook.py                   | 16 +++++------
 airflow/macros/__init__.py                      |  1 -
 airflow/models.py                               |  4 +--
 airflow/operators/__init__.py                   |  1 -
 airflow/operators/dagrun_operator.py            |  1 +
 airflow/settings.py                             |  5 +++-
 airflow/utils/file.py                           | 12 ++++----
 airflow/utils/helpers.py                        |  2 ++
 airflow/www/views.py                            | 30 +++++++++-----------
 16 files changed, 58 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index e03b713..5a380ae 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -661,10 +661,9 @@ def mkdir_p(path):
         else:
             raise AirflowConfigException('Had trouble creating a directory')
 
-"""
-Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
-"~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.
-"""
+
+# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
+# "~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.
 
 if 'AIRFLOW_HOME' not in os.environ:
     AIRFLOW_HOME = expand_env_var('~/airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 57d00b5..694b12f 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -84,7 +84,6 @@ class QuboleHook(BaseHook):
         if self.cmd.status != 'done':
             raise AirflowException('Command Id: {0} failed with Status: {1}'.format(self.cmd.id, self.cmd.status))
 
-
     def kill(self, ti):
         """
         Kill (cancel) a Qubole commmand

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 6c0b93f..c5d7193 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -34,7 +34,7 @@ dag = DAG(
 
 
 def my_sleeping_function(random_base):
-    '''This is a function that will run within the DAG execution'''
+    """This is a function that will run within the DAG execution"""
     time.sleep(random_base)
 
 
@@ -49,15 +49,12 @@ run_this = PythonOperator(
     python_callable=print_context,
     dag=dag)
 
+# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
 for i in range(10):
-    '''
-    Generating 10 sleeping task, sleeping from 0 to 9 seconds
-    respectively
-    '''
     task = PythonOperator(
-        task_id='sleep_for_'+str(i),
+        task_id='sleep_for_' + str(i),
         python_callable=my_sleeping_function,
-        op_kwargs={'random_base': float(i)/10},
+        op_kwargs={'random_base': float(i) / 10},
         dag=dag)
 
     task.set_upstream(run_this)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 4c1891d..883018d 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -70,7 +70,6 @@ def _integrate_plugins():
         sys.modules[_hook_module.__name__] = _hook_module
         globals()[_hook_module._name] = _hook_module
 
-
         ##########################################################
         # TODO FIXME Remove in Airflow 2.0
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 55f2d95..04af16e 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -56,9 +56,8 @@ class DbApiHook(BaseHook):
             username=db.login,
             schema=db.schema)
 
-
     def get_pandas_df(self, sql, parameters=None):
-        '''
+        """
         Executes the sql and returns a pandas dataframe
 
         :param sql: the sql statement to be executed (str) or a list of
@@ -66,7 +65,7 @@ class DbApiHook(BaseHook):
         :type sql: str or list
         :param parameters: The parameters to render the SQL query with.
         :type parameters: mapping or iterable
-        '''
+        """
         if sys.version_info[0] < 3:
             sql = sql.encode('utf-8')
         import pandas.io.sql as psql
@@ -76,7 +75,7 @@ class DbApiHook(BaseHook):
         return df
 
     def get_records(self, sql, parameters=None):
-        '''
+        """
         Executes the sql and returns a set of records.
 
         :param sql: the sql statement to be executed (str) or a list of
@@ -84,7 +83,7 @@ class DbApiHook(BaseHook):
         :type sql: str or list
         :param parameters: The parameters to render the SQL query with.
         :type parameters: mapping or iterable
-        '''
+        """
         if sys.version_info[0] < 3:
             sql = sql.encode('utf-8')
         conn = self.get_conn()
@@ -99,7 +98,7 @@ class DbApiHook(BaseHook):
         return rows
 
     def get_first(self, sql, parameters=None):
-        '''
+        """
         Executes the sql and returns the first resulting row.
 
         :param sql: the sql statement to be executed (str) or a list of
@@ -107,7 +106,7 @@ class DbApiHook(BaseHook):
         :type sql: str or list
         :param parameters: The parameters to render the SQL query with.
         :type parameters: mapping or iterable
-        '''
+        """
         if sys.version_info[0] < 3:
             sql = sql.encode('utf-8')
         conn = self.get_conn()
@@ -141,7 +140,7 @@ class DbApiHook(BaseHook):
             sql = [sql]
 
         if self.supports_autocommit:
-           self.set_autocommit(conn, autocommit)
+            self.set_autocommit(conn, autocommit)
 
         cur = conn.cursor()
         for s in sql:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/hdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index e84595c..69dccd0 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -29,9 +29,9 @@ class HDFSHookException(AirflowException):
 
 
 class HDFSHook(BaseHook):
-    '''
+    """
     Interact with HDFS. This class is a wrapper around the snakebite library.
-    '''
+    """
     def __init__(self, hdfs_conn_id='hdfs_default', proxy_user=None):
         if not snakebite_imported:
             raise ImportError(
@@ -43,18 +43,15 @@ class HDFSHook(BaseHook):
         self.proxy_user = proxy_user
 
     def get_conn(self):
-        '''
+        """
         Returns a snakebite HDFSClient object.
-        '''
+        """
         connections = self.get_connections(self.hdfs_conn_id)
-
         use_sasl = False
         if configuration.get('core', 'security') == 'kerberos':
             use_sasl = True
 
-        client = None
-
-        ''' When using HAClient, proxy_user must be the same, so is ok to always take the first '''
+        # When using HAClient, proxy_user must be the same, so is ok to always take the first.
         effective_user = self.proxy_user or connections[0].login
         if len(connections) == 1:
             autoconfig = connections[0].extra_dejson.get('autoconfig', False)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/jdbc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py
index d0091e1..bc1f352 100644
--- a/airflow/hooks/jdbc_hook.py
+++ b/airflow/hooks/jdbc_hook.py
@@ -27,8 +27,6 @@ class JdbcHook(DbApiHook):
     Raises an airflow error if the given connection id doesn't exist.
     Otherwise host, port, schema, username and password can be specified on the fly.
 
-
-
     :param jdbc_url: jdbc connection url
     :type jdbc_url: string
     :param jdbc_driver_name: jdbc driver name
@@ -42,7 +40,6 @@ class JdbcHook(DbApiHook):
         a '.sql' extensions.
     """
 
-
     conn_name_attr = 'jdbc_conn_id'
     default_conn_name = 'jdbc_default'
     supports_autocommit = True
@@ -56,13 +53,14 @@ class JdbcHook(DbApiHook):
         jdbc_driver_name = conn.extra_dejson.get('extra__jdbc__drv_clsname')
 
         conn = jaydebeapi.connect(jdbc_driver_name,
-                           [str(host), str(login), str(psw)],
+                                  [str(host), str(login), str(psw)],
                                   jdbc_driver_loc,)
         return conn
 
     def set_autocommit(self, conn, autocommit):
         """
-        Enable or disable autocommit for the given connection
+        Enable or disable autocommit for the given connection.
+
         :param conn: The connection
         :return:
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index f808865..5e2a28d 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -20,11 +20,11 @@ from hdfs import InsecureClient, HdfsError
 
 _kerberos_security_mode = configuration.get("core", "security") == "kerberos"
 if _kerberos_security_mode:
-  try:
-    from hdfs.ext.kerberos import KerberosClient
-  except ImportError:
-    logging.error("Could not load the Kerberos extension for the WebHDFSHook.")
-    raise
+    try:
+        from hdfs.ext.kerberos import KerberosClient
+    except ImportError:
+        logging.error("Could not load the Kerberos extension for the WebHDFSHook.")
+        raise
 from airflow.exceptions import AirflowException
 
 
@@ -50,10 +50,10 @@ class WebHDFSHook(BaseHook):
                 logging.debug('Trying namenode {}'.format(nn.host))
                 connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
                 if _kerberos_security_mode:
-                  client = KerberosClient(connection_str)
+                    client = KerberosClient(connection_str)
                 else:
-                  proxy_user = self.proxy_user or nn.login
-                  client = InsecureClient(connection_str, user=proxy_user)
+                    proxy_user = self.proxy_user or nn.login
+                    client = InsecureClient(connection_str, user=proxy_user)
                 client.status('/')
                 logging.debug('Using namenode {} for hook'.format(nn.host))
                 return client

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/macros/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py
index da4eb3d..0e1ba70 100644
--- a/airflow/macros/__init__.py
+++ b/airflow/macros/__init__.py
@@ -70,7 +70,6 @@ def _integrate_plugins():
         sys.modules[_macro_module.__name__] = _macro_module
         globals()[_macro_module._name] = _macro_module
 
-
         ##########################################################
         # TODO FIXME Remove in Airflow 2.0
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7193765..4e6eb0f 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1160,7 +1160,6 @@ class TaskInstance(Base):
                 delay = min(self.task.max_retry_delay, delay)
         return self.end_date + delay
 
-
     def ready_for_retry(self):
         """
         Checks on whether the task instance is in the right state and timeframe
@@ -2843,11 +2842,11 @@ class DAG(LoggingMixin):
             include_subdags=True,
             reset_dag_runs=True,
             dry_run=False):
-        session = settings.Session()
         """
         Clears a set of task instances associated with the current dag for
         a specified date range.
         """
+        session = settings.Session()
         TI = TaskInstance
         tis = session.query(TI)
         if include_subdags:
@@ -3224,6 +3223,7 @@ class Variable(Base):
     def val(cls):
         return synonym('_val',
                        descriptor=property(cls.get_val, cls.set_val))
+
     @classmethod
     @provide_session
     def get(cls, key, default_var=None, deserialize_json=False, session=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 71acfcd..67c4db9 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -106,7 +106,6 @@ def _integrate_plugins():
         sys.modules[_operator_module.__name__] = _operator_module
         globals()[_operator_module._name] = _operator_module
 
-
         ##########################################################
         # TODO FIXME Remove in Airflow 2.0
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index dc42d67..d514acd 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -46,6 +46,7 @@ class TriggerDagRunOperator(BaseOperator):
     template_fields = tuple()
     template_ext = tuple()
     ui_color = '#ffefeb'
+
     @apply_defaults
     def __init__(
             self,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index b35205a..9f9bb14 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -28,15 +28,19 @@ from airflow import configuration as conf
 
 
 class DummyStatsLogger(object):
+
     @classmethod
     def incr(cls, stat, count=1, rate=1):
         pass
+
     @classmethod
     def decr(cls, stat, count=1, rate=1):
         pass
+
     @classmethod
     def gauge(cls, stat, value, rate=1, delta=False):
         pass
+
     @classmethod
     def timing(cls, stat, dt):
         pass
@@ -54,7 +58,6 @@ else:
     Stats = DummyStatsLogger
 
 
-
 HEADER = """\
   ____________       _____________
  ____    |__( )_________  __/__  /________      __

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/utils/file.py
----------------------------------------------------------------------
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 183c834..d4526e9 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -28,9 +28,9 @@ def TemporaryDirectory(suffix='', prefix=None, dir=None):
     try:
         yield name
     finally:
-            try:
-                shutil.rmtree(name)
-            except OSError as e:
-                # ENOENT - no such file or directory
-                if e.errno != errno.ENOENT:
-                    raise e
+        try:
+            shutil.rmtree(name)
+        except OSError as e:
+            # ENOENT - no such file or directory
+            if e.errno != errno.ENOENT:
+                raise e

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index bddf0cb..517d5f1 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -164,8 +164,10 @@ def pprinttable(rows):
     s += separator + '\n'
     s += (hpattern % tuple(headers)) + '\n'
     s += separator + '\n'
+
     def f(t):
         return "{}".format(t) if isinstance(t, basestring) else t
+
     for line in rows:
         s += pattern % tuple(f(t) for t in line) + '\n'
     s += separator + '\n'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/189e6b88/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 122f0e0..aff6b54 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -304,7 +304,6 @@ class Airflow(BaseView):
         session.commit()
         session.close()
 
-
         payload = {}
         payload['state'] = 'ERROR'
         payload['error'] = ''
@@ -424,7 +423,6 @@ class Airflow(BaseView):
                 except Exception as e:
                     payload['error'] = str(e)
 
-
             payload['state'] = 'SUCCESS'
             payload['request_dict'] = request_dict
         return wwwutils.json_response(payload)
@@ -546,7 +544,6 @@ class Airflow(BaseView):
                 payload[dag.safe_dag_id].append(d)
         return wwwutils.json_response(payload)
 
-
     @expose('/code')
     @login_required
     def code(self):
@@ -857,7 +854,7 @@ class Airflow(BaseView):
             task_id=task_id,
             execution_date=execution_date,
             form=form,
-            dag=dag, title=title)\
+            dag=dag, title=title)
 
     @expose('/run')
     @login_required
@@ -1924,19 +1921,18 @@ class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView):
 class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
     pass
 
-'''
-# For debugging / troubleshooting
-mv = KnowEventTypeView(
-    models.KnownEventType,
-    Session, name="Known Event Types", category="Manage")
-admin.add_view(mv)
-class DagPickleView(SuperUserMixin, ModelView):
-    pass
-mv = DagPickleView(
-    models.DagPickle,
-    Session, name="Pickles", category="Manage")
-admin.add_view(mv)
-'''
+
+# NOTE: For debugging / troubleshooting
+# mv = KnowEventTypeView(
+#     models.KnownEventType,
+#     Session, name="Known Event Types", category="Manage")
+# admin.add_view(mv)
+# class DagPickleView(SuperUserMixin, ModelView):
+#     pass
+# mv = DagPickleView(
+#     models.DagPickle,
+#     Session, name="Pickles", category="Manage")
+# admin.add_view(mv)
 
 
 class VariableView(wwwutils.LoginMixin, AirflowModelView):