You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/06/29 03:19:00 UTC

[1/3] incubator-airflow git commit: [AIRFLOW-200] Make hook/operator imports lazy, and print proper exceptions

Repository: incubator-airflow
Updated Branches:
  refs/heads/master be6766a6a -> 7b382b4e9


[AIRFLOW-200] Make hook/operator imports lazy, and print proper exceptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dce633ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dce633ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dce633ee

Branch: refs/heads/master
Commit: dce633ee53500f7a293a66b60e2c19307123d1ce
Parents: d15f8ca
Author: Chris Riccomini <ch...@wepay.com>
Authored: Mon Jun 27 13:32:07 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Jun 27 13:32:07 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py     |  22 +----
 airflow/contrib/operators/__init__.py |  22 ++---
 airflow/hooks/__init__.py             |  31 ++-----
 airflow/operators/__init__.py         |  70 +++++---------
 airflow/utils/helpers.py              | 143 +++++++++++++++++++++++------
 5 files changed, 155 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 83b505d..237c831 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -11,16 +11,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+#
 
 
 # Contrib hooks are not imported by default. They should be accessed
 # directly: from airflow.contrib.hooks.hook_module import Hook
 
 
-
-
-
+import sys
 
 
 # ------------------------------------------------------------------------
@@ -33,11 +31,6 @@
 # for compatibility.
 #
 # ------------------------------------------------------------------------
-
-# Imports the hooks dynamically while keeping the package API clean,
-# abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-
 _hooks = {
     'ftp_hook': ['FTPHook'],
     'ftps_hook': ['FTPSHook'],
@@ -54,12 +47,5 @@ _hooks = {
 
 import os as _os
 if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
-    from zope.deprecation import deprecated as _deprecated
-    _imported = _import_module_attrs(globals(), _hooks)
-    for _i in _imported:
-        _deprecated(
-            _i,
-            "Importing {i} directly from 'contrib.hooks' has been "
-            "deprecated. Please import from "
-            "'contrib.hooks.[hook_module]' instead. Support for direct imports "
-            "will be dropped entirely in Airflow 2.0.".format(i=_i))
+    from airflow.utils.helpers import AirflowImporter
+    airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index 5ac2d45..ae481ea 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -11,12 +11,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
 
 
 # Contrib operators are not imported by default. They should be accessed
 # directly: from airflow.contrib.operators.operator_module import Operator
 
 
+import sys
+
+
 # ------------------------------------------------------------------------
 #
 # #TODO #FIXME Airflow 2.0
@@ -27,27 +31,15 @@
 # for compatibility.
 #
 # ------------------------------------------------------------------------
-
-# Imports the operators dynamically while keeping the package API clean,
-# abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-
 _operators = {
     'ssh_execute_operator': ['SSHExecuteOperator'],
     'vertica_operator': ['VerticaOperator'],
     'vertica_to_hive': ['VerticaToHiveTransfer'],
     'qubole_operator': ['QuboleOperator'],
-    'fs': ['FileSensor']
+    'fs_operator': ['FileSensor']
 }
 
 import os as _os
 if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
-    from zope.deprecation import deprecated as _deprecated
-    _imported = _import_module_attrs(globals(), _operators)
-    for _i in _imported:
-        _deprecated(
-            _i,
-            "Importing {i} directly from 'contrib.operators' has been "
-            "deprecated. Please import from "
-            "'contrib.operators.[operator_module]' instead. Support for direct "
-            "imports will be dropped entirely in Airflow 2.0.".format(i=_i))
+    from airflow.utils.helpers import AirflowImporter
+    airflow_importer = AirflowImporter(sys.modules[__name__], _operators)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 5209a51..29d4379 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -11,13 +11,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
+
+
+import sys
 
-# Only import Core Airflow Operators that don't have extra requirements.
-# All other operators must be imported directly.
-from .base_hook import BaseHook
-from .dbapi_hook import DbApiHook
-from .http_hook import HttpHook
-from .sqlite_hook import SqliteHook
 
 # ------------------------------------------------------------------------
 #
@@ -34,10 +32,8 @@ from .sqlite_hook import SqliteHook
 # abstracting the underlying modules
 
 
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-from airflow.hooks.base_hook import BaseHook  # noqa to expose in package
-
 _hooks = {
+    'base_hook': ['BaseHook'],
     'hive_hooks': [
         'HiveCliHook',
         'HiveMetastoreHook',
@@ -50,9 +46,9 @@ _hooks = {
     'postgres_hook': ['PostgresHook'],
     'presto_hook': ['PrestoHook'],
     'samba_hook': ['SambaHook'],
-    # 'sqlite_hook': ['SqliteHook'],
+    'sqlite_hook': ['SqliteHook'],
     'S3_hook': ['S3Hook'],
-    # 'http_hook': ['HttpHook'],
+    'http_hook': ['HttpHook'],
     'druid_hook': ['DruidHook'],
     'jdbc_hook': ['JdbcHook'],
     'dbapi_hook': ['DbApiHook'],
@@ -60,19 +56,10 @@ _hooks = {
     'oracle_hook': ['OracleHook'],
 }
 
-
 import os as _os
 if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
-    from zope.deprecation import deprecated as _deprecated
-    _imported = _import_module_attrs(globals(), _hooks)
-    for _i in _imported:
-        _deprecated(
-            _i,
-            "Importing {i} directly from 'airflow.hooks' has been "
-            "deprecated. Please import from "
-            "'airflow.hooks.[hook_module]' instead. Support for direct imports "
-            "will be dropped entirely in Airflow 2.0.".format(i=_i))
-
+    from airflow.utils.helpers import AirflowImporter
+    airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)
 
 def _integrate_plugins():
     """Integrate plugins to the context"""

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 0d2c403..5e92b13 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -11,28 +11,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
 
 
-# Only import Core Airflow Operators that don't have extra requirements.
-# All other operators must be imported directly.
+import sys
 from airflow.models import BaseOperator
-from .bash_operator import BashOperator
-from .python_operator import (
-    BranchPythonOperator,
-    PythonOperator,
-    ShortCircuitOperator)
-from .check_operator import (
-    CheckOperator,
-    ValueCheckOperator,
-    IntervalCheckOperator)
-from .dagrun_operator import TriggerDagRunOperator
-from .dummy_operator import DummyOperator
-from .email_operator import EmailOperator
-from .http_operator import SimpleHttpOperator
-import airflow.operators.sensors
-from .subdag_operator import SubDagOperator
-
-
 
 
 # ------------------------------------------------------------------------
@@ -46,26 +29,23 @@ from .subdag_operator import SubDagOperator
 #
 # ------------------------------------------------------------------------
 
+
 # Imports operators dynamically while keeping the package API clean,
 # abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
 
-# These need to be integrated first as other operators depend on them
-# _import_module_attrs(globals(), {
-#     'check_operator': [
-#         'CheckOperator',
-#         'ValueCheckOperator',
-#         'IntervalCheckOperator',
-#     ],
-# })
 
 _operators = {
-    # 'bash_operator': ['BashOperator'],
-    # 'python_operator': [
-    #     'PythonOperator',
-    #     'BranchPythonOperator',
-    #     'ShortCircuitOperator',
-    # ],
+    'bash_operator': ['BashOperator'],
+    'check_operator': [
+        'CheckOperator',
+        'ValueCheckOperator',
+        'IntervalCheckOperator',
+    ],
+    'python_operator': [
+        'PythonOperator',
+        'BranchPythonOperator',
+        'ShortCircuitOperator',
+    ],
     'hive_operator': ['HiveOperator'],
     'pig_operator': ['PigOperator'],
     'presto_check_operator': [
@@ -73,9 +53,9 @@ _operators = {
         'PrestoValueCheckOperator',
         'PrestoIntervalCheckOperator',
     ],
-    # 'dagrun_operator': ['TriggerDagRunOperator'],
-    # 'dummy_operator': ['DummyOperator'],
-    # 'email_operator': ['EmailOperator'],
+    'dagrun_operator': ['TriggerDagRunOperator'],
+    'dummy_operator': ['DummyOperator'],
+    'email_operator': ['EmailOperator'],
     'hive_to_samba_operator': ['Hive2SambaOperator'],
     'mysql_operator': ['MySqlOperator'],
     'sqlite_operator': ['SqliteOperator'],
@@ -95,13 +75,13 @@ _operators = {
         'TimeSensor',
         'WebHdfsSensor',
     ],
-    # 'subdag_operator': ['SubDagOperator'],
+    'subdag_operator': ['SubDagOperator'],
     'hive_stats_operator': ['HiveStatsCollectionOperator'],
     's3_to_hive_operator': ['S3ToHiveTransfer'],
     'hive_to_mysql': ['HiveToMySqlTransfer'],
     'presto_to_mysql': ['PrestoToMySqlTransfer'],
     's3_file_transform_operator': ['S3FileTransformOperator'],
-    # 'http_operator': ['SimpleHttpOperator'],
+    'http_operator': ['SimpleHttpOperator'],
     'hive_to_druid': ['HiveToDruidTransfer'],
     'jdbc_operator': ['JdbcOperator'],
     'mssql_operator': ['MsSqlOperator'],
@@ -113,16 +93,8 @@ _operators = {
 
 import os as _os
 if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
-    from zope.deprecation import deprecated as _deprecated
-    _imported = _import_module_attrs(globals(), _operators)
-    for _i in _imported:
-        _deprecated(
-            _i,
-            "Importing {i} directly from 'airflow.operators' has been "
-            "deprecated. Please import from "
-            "'airflow.operators.[operator_module]' instead. Support for direct "
-            "imports will be dropped entirely in Airflow 2.0.".format(i=_i))
-
+    from airflow.utils.helpers import AirflowImporter
+    airflow_importer = AirflowImporter(sys.modules[__name__], _operators)
 
 def _integrate_plugins():
     """Integrate plugins to the context"""

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 527c9ac..c79ebee 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -24,6 +24,7 @@ import imp
 import logging
 import os
 import re
+import sys
 
 from airflow.exceptions import AirflowException
 
@@ -72,35 +73,6 @@ def ask_yesno(question):
         else:
             print("Please respond by yes or no.")
 
-
-def import_module_attrs(parent_module_globals, module_attrs_dict):
-    '''
-    Attempts to import a set of modules and specified attributes in the
-    form of a dictionary. The attributes are copied in the parent module's
-    namespace. The function returns a list of attributes names that can be
-    affected to __all__.
-
-    This is used in the context of ``operators`` and ``hooks`` and
-    silence the import errors for when libraries are missing. It makes
-    for a clean package abstracting the underlying modules and only
-    brings functional operators to those namespaces.
-    '''
-    imported_attrs = []
-    for mod, attrs in list(module_attrs_dict.items()):
-        try:
-            path = os.path.realpath(parent_module_globals['__file__'])
-            folder = os.path.dirname(path)
-            f, filename, description = imp.find_module(mod, [folder])
-            module = imp.load_module(mod, f, filename, description)
-            for attr in attrs:
-                parent_module_globals[attr] = getattr(module, attr)
-                imported_attrs += [attr]
-        except Exception as err:
-            logging.debug("Error importing module {mod}: {err}".format(
-                mod=mod, err=err))
-    return imported_attrs
-
-
 def is_in(obj, l):
     """
     Checks whether an object is one of the item in the list.
@@ -186,3 +158,116 @@ def pprinttable(rows):
         s += pattern % tuple(f(t) for t in line) + '\n'
     s += separator + '\n'
     return s
+
+
+class AirflowImporter(object):
+    """
+    Importer that dynamically loads a class and module from its parent. This
+    allows Airflow to support `from airflow.operators import BashOperator` even
+    though BashOperator is actually in airflow.operators.bash_operator.
+
+    The importer also takes over for the parent_module by wrapping it. This is
+    required to support attribute-based usage:
+
+        from airflow import operators
+        operators.BashOperator(...)
+    """
+
+    def __init__(self, parent_module, module_attributes):
+        """
+        :param parent_module: The string package name of the parent module. For
+            example, 'airflow.operators'
+        :type parent_module: string
+        :param module_attributes: The file to class mappings for all importable
+            classes.
+        :type module_attributes: string
+        """
+        self.parent_module = parent_module
+        self.attribute_modules = self._build_attribute_modules(module_attributes)
+        self.loaded_modules = {}
+
+        # Wrap the module so we can take over __getattr__.
+        sys.modules[parent_module.__name__] = self
+
+    @staticmethod
+    def _build_attribute_modules(module_attributes):
+        """
+        Flips and flattens the module_attributes dictionary from:
+
+            module => [Attribute, ...]
+
+        To:
+
+            Attribute => module
+
+        This is useful so that we can find the module to use, given an
+        attribute.
+        """
+        attribute_modules = {}
+
+        for module, attributes in list(module_attributes.items()):
+            for attribute in attributes:
+                attribute_modules[attribute] = module
+
+        return attribute_modules
+
+    def _load_attribute(self, attribute):
+        """
+        Load the class attribute if it hasn't been loaded yet, and return it.
+        """
+        module = self.attribute_modules.get(attribute, False)
+
+        if not module:
+            # This shouldn't happen. The check happens in find_modules, too.
+            raise ImportError(attribute)
+        elif module not in self.loaded_modules:
+            # Note that it's very important to only load a given modules once.
+            # If they are loaded more than once, the memory reference to the
+            # class objects changes, and Python thinks that an object of type
+            # Foo that was declared before Foo's module was reloaded is no
+            # longer the same type as Foo after it's reloaded.
+            path = os.path.realpath(self.parent_module.__file__)
+            folder = os.path.dirname(path)
+            f, filename, description = imp.find_module(module, [folder])
+            self.loaded_modules[module] = imp.load_module(module, f, filename, description)
+
+            # This functionality is deprecated, and AirflowImporter should be
+            # removed in 2.0.
+            from zope.deprecation import deprecated as _deprecated
+            _deprecated(
+                attribute,
+                "Importing {i} directly from {m} has been "
+                "deprecated. Please import from "
+                "'{m}.[operator_module]' instead. Support for direct "
+                "imports will be dropped entirely in Airflow 2.0.".format(
+                    i=attribute, m=self.parent_module))
+
+        loaded_module = self.loaded_modules[module]
+
+        return getattr(loaded_module, attribute)
+
+    def __getattr__(self, attribute):
+        """
+        Get an attribute from the wrapped module. If the attribute doesn't
+        exist, try and import it as a class from a submodule.
+
+        This is a Python trick that allows the class to pretend it's a module,
+        so that attribute-based usage works:
+
+            from airflow import operators
+            operators.BashOperator(...)
+
+        It also allows normal from imports to work:
+
+            from airflow.operators import BashOperator
+        """
+        if hasattr(self.parent_module, attribute):
+            # Always default to the parent module if the attribute exists.
+            return getattr(self.parent_module, attribute)
+        elif attribute in self.attribute_modules:
+            # Try and import the attribute if it's got a module defined.
+            loaded_attribute = self._load_attribute(attribute)
+            setattr(self, attribute, loaded_attribute)
+            return loaded_attribute
+
+        raise AttributeError


[2/3] incubator-airflow git commit: [AIRFLOW-285] Use Airflow 2.0 style imports for all remaining hooks/operators

Posted by cr...@apache.org.
[AIRFLOW-285] Use Airflow 2.0 style imports for all remaining hooks/operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dc84fdec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dc84fdec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dc84fdec

Branch: refs/heads/master
Commit: dc84fdecdfd9de12392c4e1c92005bd427d3ca37
Parents: dce633e
Author: Chris Riccomini <ch...@wepay.com>
Authored: Mon Jun 27 20:08:48 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Jun 28 13:34:47 2016 -0700

----------------------------------------------------------------------
 .../example_dags/example_qubole_operator.py     |  5 +-
 .../contrib/example_dags/example_twitter_dag.py |  3 +-
 .../operators/bigquery_check_operator.py        |  2 +-
 airflow/contrib/operators/qubole_operator.py    |  2 +-
 airflow/contrib/operators/vertica_operator.py   |  2 +-
 airflow/contrib/operators/vertica_to_hive.py    |  2 +-
 airflow/example_dags/example_bash_operator.py   |  3 +-
 airflow/example_dags/example_branch_operator.py |  3 +-
 .../example_branch_python_dop_operator_3.py     |  3 +-
 airflow/example_dags/example_http_operator.py   |  2 +-
 .../example_passing_params_via_test_command.py  |  3 +-
 airflow/example_dags/example_python_operator.py |  2 +-
 .../example_short_circuit_operator.py           |  3 +-
 airflow/example_dags/example_skip_dag.py        |  2 +-
 airflow/example_dags/example_subdag_operator.py |  3 +-
 .../example_trigger_controller_dag.py           |  2 +-
 .../example_dags/example_trigger_target_dag.py  |  3 +-
 airflow/example_dags/example_xcom.py            |  6 +-
 airflow/example_dags/subdags/subdag.py          |  2 +-
 airflow/example_dags/test_utils.py              |  2 +-
 airflow/example_dags/tutorial.py                |  2 +-
 airflow/operators/check_operator.py             |  2 +-
 airflow/operators/http_operator.py              |  2 +-
 airflow/operators/presto_check_operator.py      |  2 +-
 airflow/operators/sensors.py                    |  4 +-
 airflow/operators/sqlite_operator.py            |  2 +-
 airflow/utils/helpers.py                        | 37 +++++-----
 airflow/utils/logging.py                        |  2 +-
 airflow/www/views.py                            |  3 +-
 docs/concepts.rst                               |  4 +-
 docs/tutorial.rst                               |  6 +-
 tests/core.py                                   | 71 ++++++++++++--------
 tests/dags/test_backfill_pooled_tasks.py        |  2 +-
 tests/dags/test_issue_1225.py                   |  4 +-
 tests/dags/test_scheduler_dags.py               |  2 +-
 tests/jobs.py                                   |  2 +-
 tests/models.py                                 |  4 +-
 tests/operators/subdag_operator.py              |  4 +-
 38 files changed, 121 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index 4f974e2..63cccd3 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -13,8 +13,9 @@
 # limitations under the License.
 
 from airflow import DAG
-from airflow.operators import DummyOperator, PythonOperator, BranchPythonOperator
-from airflow.contrib.operators import QuboleOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
+from airflow.contrib.operators.qubole_operator import QuboleOperator
 from datetime import datetime, timedelta
 import filecmp
 import random

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py
index af1978e..d63b4e8 100644
--- a/airflow/contrib/example_dags/example_twitter_dag.py
+++ b/airflow/contrib/example_dags/example_twitter_dag.py
@@ -23,7 +23,8 @@
 # --------------------------------------------------------------------------------
 
 from airflow import DAG
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.operators.hive_operator import HiveOperator
 from datetime import datetime, date, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/bigquery_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py
index 87e0ad7..10f0b7c 100644
--- a/airflow/contrib/operators/bigquery_check_operator.py
+++ b/airflow/contrib/operators/bigquery_check_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
-from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 9923cec..cbf15c4 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -14,7 +14,7 @@
 
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.contrib.hooks import QuboleHook
+from airflow.contrib.hooks.qubole_hook import QuboleHook
 
 
 class QuboleOperator(BaseOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index 471018f..9266563 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -14,7 +14,7 @@
 
 import logging
 
-from airflow.contrib.hooks import VerticaHook
+from airflow.contrib.hooks.vertica_hook import VerticaHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 4071111..57e4fa8 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -19,7 +19,7 @@ import logging
 from tempfile import NamedTemporaryFile
 
 from airflow.hooks.hive_hooks import HiveCliHook
-from airflow.contrib.hooks import VerticaHook
+from airflow.contrib.hooks.vertica_hook import VerticaHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index c759f4d..0d18bcf 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -12,7 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from builtins import range
-from airflow.operators import BashOperator, DummyOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index edd177a..cc559d0 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -11,7 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators import BranchPythonOperator, DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 import random

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_python_dop_operator_3.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index ff959fc..19bb183 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -13,7 +13,8 @@
 # limitations under the License.
 #
 
-from airflow.operators import BranchPythonOperator, DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 41ea385..12b0448 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -15,7 +15,7 @@
 ### Example HTTP operator and sensor
 """
 from airflow import DAG
-from airflow.operators import SimpleHttpOperator
+from airflow.operators.http_operator import SimpleHttpOperator
 from airflow.operators.sensors import HttpSensor
 from datetime import datetime, timedelta
 import json

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_passing_params_via_test_command.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index af473d9..cd5a251 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -16,7 +16,8 @@
 from datetime import datetime, timedelta
 
 from airflow import DAG
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 
 dag = DAG("example_passing_params_via_test_command",
           default_args={"owner" : "airflow",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index a2f8abd..6c0b93f 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 from __future__ import print_function
 from builtins import range
-from airflow.operators import PythonOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 907cf51..92efe99 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -11,7 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators import ShortCircuitOperator, DummyOperator
+from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 import airflow.utils.helpers
 from datetime import datetime, timedelta

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_skip_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index b55c3a8..a38b126 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 from airflow.exceptions import AirflowSkipException

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index 57a62c6..b872f43 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -14,7 +14,8 @@
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator, SubDagOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.subdag_operator import SubDagOperator
 
 from airflow.example_dags.subdags.subdag import subdag
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_controller_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index b754d64..eb8cee0 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -29,7 +29,7 @@ This example illustrates the following features :
 """
 
 from airflow import DAG
-from airflow.operators import TriggerDagRunOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from datetime import datetime
 
 import pprint

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_target_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 41a3e36..a2a85f6 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -11,7 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
 from datetime import datetime
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 71cd44e..8dd2666 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -56,13 +56,13 @@ def puller(**kwargs):
     v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
     assert (v1, v2) == (value_1, value_2)
 
-push1 = airflow.operators.PythonOperator(
+push1 = airflow.operators.python_operator.PythonOperator(
     task_id='push', dag=dag, python_callable=push)
 
-push2 = airflow.operators.PythonOperator(
+push2 = airflow.operators.python_operator.PythonOperator(
     task_id='push_by_returning', dag=dag, python_callable=push_by_returning)
 
-pull = airflow.operators.PythonOperator(
+pull = airflow.operators.python_operator.PythonOperator(
     task_id='puller', dag=dag, python_callable=puller)
 
 pull.set_upstream([push1, push2])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/subdags/subdag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py
index c0e1326..82e1dd1 100644
--- a/airflow/example_dags/subdags/subdag.py
+++ b/airflow/example_dags/subdags/subdag.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 
 
 def subdag(parent_dag_name, child_dag_name, args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index 38e50d0..70391c3 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 """Used for unit tests"""
-from airflow.operators import BashOperator
+from airflow.operators.bash_operator import BashOperator
 from airflow.models import DAG
 from datetime import datetime
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/tutorial.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 6bb2cd3..7c89666 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -18,7 +18,7 @@ Documentation that goes along with the Airflow tutorial located
 [here](http://pythonhosted.org/airflow/tutorial.html)
 """
 from airflow import DAG
-from airflow.operators import BashOperator
+from airflow.operators.bash_operator import BashOperator
 from datetime import datetime, timedelta
 
 seven_days_ago = datetime.combine(datetime.today() - timedelta(7),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index e4c8262..83190eb 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -17,7 +17,7 @@ from builtins import str
 import logging
 
 from airflow.exceptions import AirflowException
-from airflow.hooks import BaseHook
+from airflow.hooks.base_hook import BaseHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index ad9bd4f..e5cf339 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -15,7 +15,7 @@
 import logging
 
 from airflow.exceptions import AirflowException
-from airflow.hooks import HttpHook
+from airflow.hooks.http_hook import HttpHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/presto_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py
index c1ac9cf..6207460 100644
--- a/airflow/operators/presto_check_operator.py
+++ b/airflow/operators/presto_check_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.hooks.presto_hook import PrestoHook
-from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 4e4cb3b..90a4d14 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -25,7 +25,7 @@ import airflow
 from airflow import hooks, settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
 from airflow.models import BaseOperator, TaskInstance, Connection as DB
-from airflow.hooks import BaseHook
+from airflow.hooks.base_hook import BaseHook
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
 
@@ -519,7 +519,7 @@ class HttpSensor(BaseSensorOperator):
         self.extra_options = extra_options or {}
         self.response_check = response_check
 
-        self.hook = hooks.HttpHook(method='GET', http_conn_id=http_conn_id)
+        self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id)
 
     def poke(self, context):
         logging.info('Poking: ' + self.endpoint)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 52b3b4b..0ff4d05 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -14,7 +14,7 @@
 
 import logging
 
-from airflow.hooks import SqliteHook
+from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index c79ebee..7e3426e 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -25,6 +25,7 @@ import logging
 import os
 import re
 import sys
+import warnings
 
 from airflow.exceptions import AirflowException
 
@@ -163,8 +164,9 @@ def pprinttable(rows):
 class AirflowImporter(object):
     """
     Importer that dynamically loads a class and module from its parent. This
-    allows Airflow to support `from airflow.operators import BashOperator` even
-    though BashOperator is actually in airflow.operators.bash_operator.
+    allows Airflow to support `from airflow.operators.bash_operator import
+    BashOperator` even though BashOperator is actually in
+    airflow.operators.bash_operator.
 
     The importer also takes over for the parent_module by wrapping it. This is
     required to support attribute-based usage:
@@ -182,9 +184,9 @@ class AirflowImporter(object):
             classes.
         :type module_attributes: string
         """
-        self.parent_module = parent_module
-        self.attribute_modules = self._build_attribute_modules(module_attributes)
-        self.loaded_modules = {}
+        self._parent_module = parent_module
+        self._attribute_modules = self._build_attribute_modules(module_attributes)
+        self._loaded_modules = {}
 
         # Wrap the module so we can take over __getattr__.
         sys.modules[parent_module.__name__] = self
@@ -215,34 +217,33 @@ class AirflowImporter(object):
         """
         Load the class attribute if it hasn't been loaded yet, and return it.
         """
-        module = self.attribute_modules.get(attribute, False)
+        module = self._attribute_modules.get(attribute, False)
 
         if not module:
             # This shouldn't happen. The check happens in find_modules, too.
             raise ImportError(attribute)
-        elif module not in self.loaded_modules:
+        elif module not in self._loaded_modules:
             # Note that it's very important to only load a given modules once.
             # If they are loaded more than once, the memory reference to the
             # class objects changes, and Python thinks that an object of type
             # Foo that was declared before Foo's module was reloaded is no
             # longer the same type as Foo after it's reloaded.
-            path = os.path.realpath(self.parent_module.__file__)
+            path = os.path.realpath(self._parent_module.__file__)
             folder = os.path.dirname(path)
             f, filename, description = imp.find_module(module, [folder])
-            self.loaded_modules[module] = imp.load_module(module, f, filename, description)
+            self._loaded_modules[module] = imp.load_module(module, f, filename, description)
 
             # This functionality is deprecated, and AirflowImporter should be
             # removed in 2.0.
-            from zope.deprecation import deprecated as _deprecated
-            _deprecated(
-                attribute,
+            warnings.warn(
                 "Importing {i} directly from {m} has been "
                 "deprecated. Please import from "
                 "'{m}.[operator_module]' instead. Support for direct "
                 "imports will be dropped entirely in Airflow 2.0.".format(
-                    i=attribute, m=self.parent_module))
+                    i=attribute, m=self._parent_module),
+                DeprecationWarning)
 
-        loaded_module = self.loaded_modules[module]
+        loaded_module = self._loaded_modules[module]
 
         return getattr(loaded_module, attribute)
 
@@ -259,12 +260,12 @@ class AirflowImporter(object):
 
         It also allows normal from imports to work:
 
-            from airflow.operators import BashOperator
+            from airflow.operators.bash_operator import BashOperator
         """
-        if hasattr(self.parent_module, attribute):
+        if hasattr(self._parent_module, attribute):
             # Always default to the parent module if the attribute exists.
-            return getattr(self.parent_module, attribute)
-        elif attribute in self.attribute_modules:
+            return getattr(self._parent_module, attribute)
+        elif attribute in self._attribute_modules:
             # Try and import the attribute if it's got a module defined.
             loaded_attribute = self._load_attribute(attribute)
             setattr(self, attribute, loaded_attribute)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 8f5fc51..79c6cbf 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -128,7 +128,7 @@ class GCSLog(object):
         self.hook = None
 
         try:
-            from airflow.contrib.hooks import GoogleCloudStorageHook
+            from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
             self.hook = GoogleCloudStorageHook(
                 google_cloud_storage_conn_id=remote_conn_id)
         except:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1fb3f91..0bd5b05 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -60,7 +60,8 @@ from airflow.exceptions import AirflowException
 from airflow.settings import Session
 from airflow.models import XCom
 
-from airflow.operators import BaseOperator, SubDagOperator
+from airflow.models import BaseOperator
+from airflow.operators.subdag_operator import SubDagOperator
 
 from airflow.utils.logging import LoggingMixin
 from airflow.utils.json import json_ser

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 6e15ff8..31e7d61 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -485,7 +485,7 @@ the main UI. For example:
 
   #dags/subdag.py
   from airflow.models import DAG
-  from airflow.operators import DummyOperator
+  from airflow.operators.dummy_operator import DummyOperator
 
 
   # Dag is returned by a factory method
@@ -510,7 +510,7 @@ This SubDAG can then be referenced in your main DAG file:
   # main_dag.py
   from datetime import datetime, timedelta
   from airflow.models import DAG
-  from airflow.operators import SubDagOperator
+  from airflow.operators.subdag_operator import SubDagOperator
   from dags.subdag import sub_dag
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/tutorial.rst
----------------------------------------------------------------------
diff --git a/docs/tutorial.rst b/docs/tutorial.rst
index e9d382b..a93479c 100644
--- a/docs/tutorial.rst
+++ b/docs/tutorial.rst
@@ -18,7 +18,7 @@ complicated, a line by line explanation follows below.
     https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
     """
     from airflow import DAG
-    from airflow.operators import BashOperator
+    from airflow.operators.bash_operator import BashOperator
     from datetime import datetime, timedelta
 
 
@@ -100,7 +100,7 @@ Airflow DAG object. Let's start by importing the libraries we will need.
     from airflow import DAG
 
     # Operators; we need this to operate!
-    from airflow.operators import BashOperator
+    from airflow.operators.bash_operator import BashOperator
 
 Default Arguments
 -----------------
@@ -270,7 +270,7 @@ something like this:
     http://airflow.readthedocs.org/en/latest/tutorial.html
     """
     from airflow import DAG
-    from airflow.operators import BashOperator
+    from airflow.operators.bash_operator import BashOperator
     from datetime import datetime, timedelta
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 9c688ea..002ad30 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -36,8 +36,17 @@ from airflow.executors import SequentialExecutor, LocalExecutor
 from airflow.models import Variable
 
 configuration.test_mode()
-from airflow import jobs, models, DAG, operators, hooks, utils, macros, settings, exceptions
-from airflow.hooks import BaseHook
+from airflow import jobs, models, DAG, utils, macros, settings, exceptions
+from airflow.models import BaseOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.http_operator import SimpleHttpOperator
+from airflow.operators import sensors
+from airflow.hooks.base_hook import BaseHook
+from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.bin import cli
 from airflow.www import app as application
 from airflow.settings import Session
@@ -85,7 +94,7 @@ def reset(dag_id=TEST_DAG_ID):
 reset()
 
 
-class OperatorSubclass(operators.BaseOperator):
+class OperatorSubclass(BaseOperator):
     """
     An operator to test template substitution
     """
@@ -305,7 +314,7 @@ class CoreTest(unittest.TestCase):
         assert hash(self.dag) != hash(dag_subclass)
 
     def test_time_sensor(self):
-        t = operators.sensors.TimeSensor(
+        t = sensors.TimeSensor(
             task_id='time_sensor_check',
             target_time=time(0),
             dag=self.dag)
@@ -319,14 +328,14 @@ class CoreTest(unittest.TestCase):
         captainHook.run("CREATE TABLE operator_test_table (a, b)")
         captainHook.run("insert into operator_test_table values (1,2)")
 
-        t = operators.CheckOperator(
+        t = CheckOperator(
             task_id='check',
             sql="select count(*) from operator_test_table",
             conn_id=conn_id,
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
-        t = operators.ValueCheckOperator(
+        t = ValueCheckOperator(
             task_id='value_check',
             pass_value=95,
             tolerance=0.1,
@@ -350,7 +359,7 @@ class CoreTest(unittest.TestCase):
         Tests that Operators reject illegal arguments
         """
         with warnings.catch_warnings(record=True) as w:
-            t = operators.BashOperator(
+            t = BashOperator(
                 task_id='test_illegal_args',
                 bash_command='echo success',
                 dag=self.dag,
@@ -362,14 +371,14 @@ class CoreTest(unittest.TestCase):
                 w[0].message.args[0])
 
     def test_bash_operator(self):
-        t = operators.BashOperator(
+        t = BashOperator(
             task_id='time_sensor_check',
             bash_command="echo success",
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_bash_operator_multi_byte_output(self):
-        t = operators.BashOperator(
+        t = BashOperator(
             task_id='test_multi_byte_bash_operator',
             bash_command=u"echo \u2600",
             dag=self.dag,
@@ -381,7 +390,7 @@ class CoreTest(unittest.TestCase):
             if True:
                 return obj
 
-        t = operators.TriggerDagRunOperator(
+        t = TriggerDagRunOperator(
             task_id='test_trigger_dagrun',
             trigger_dag_id='example_bash_operator',
             python_callable=trigga,
@@ -389,7 +398,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_dryrun(self):
-        t = operators.BashOperator(
+        t = BashOperator(
             task_id='time_sensor_check',
             bash_command="echo success",
             dag=self.dag)
@@ -404,14 +413,14 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_timedelta_sensor(self):
-        t = operators.sensors.TimeDeltaSensor(
+        t = sensors.TimeDeltaSensor(
             task_id='timedelta_sensor_check',
             delta=timedelta(seconds=2),
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_external_task_sensor(self):
-        t = operators.sensors.ExternalTaskSensor(
+        t = sensors.ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
@@ -419,7 +428,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_external_task_sensor_delta(self):
-        t = operators.sensors.ExternalTaskSensor(
+        t = sensors.ExternalTaskSensor(
             task_id='test_external_task_sensor_check_delta',
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
@@ -429,7 +438,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_timeout(self):
-        t = operators.PythonOperator(
+        t = PythonOperator(
             task_id='test_timeout',
             execution_timeout=timedelta(seconds=1),
             python_callable=lambda: sleep(5),
@@ -444,7 +453,7 @@ class CoreTest(unittest.TestCase):
             if not templates_dict['ds'] == ds:
                 raise Exception("failure")
 
-        t = operators.PythonOperator(
+        t = PythonOperator(
             task_id='test_py_op',
             provide_context=True,
             python_callable=test_py_op,
@@ -460,7 +469,7 @@ class CoreTest(unittest.TestCase):
             task_id='test_complex_template',
             some_templated_field={
                 'foo': '123',
-                'bar': ['baz', ' {{ ds }}']
+                'bar': ['baz', '{{ ds }}']
             },
             on_success_callback=verify_templated_field,
             dag=self.dag)
@@ -698,7 +707,7 @@ class CoreTest(unittest.TestCase):
 
     def test_bad_trigger_rule(self):
         with self.assertRaises(AirflowException):
-            operators.DummyOperator(
+            DummyOperator(
                 task_id='test_bad_trigger',
                 trigger_rule="non_existant",
                 dag=self.dag)
@@ -1195,7 +1204,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_get(self):
-        t = operators.SimpleHttpOperator(
+        t = SimpleHttpOperator(
             task_id='get_op',
             method='GET',
             endpoint='/search',
@@ -1206,7 +1215,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_get_response_check(self):
-        t = operators.SimpleHttpOperator(
+        t = SimpleHttpOperator(
             task_id='get_op',
             method='GET',
             endpoint='/search',
@@ -1218,7 +1227,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_sensor(self):
-        sensor = operators.sensors.HttpSensor(
+        sensor = sensors.HttpSensor(
             task_id='http_sensor_check',
             conn_id='http_default',
             endpoint='/search',
@@ -1256,7 +1265,7 @@ class ConnectionTest(unittest.TestCase):
                 del os.environ[ev]
 
     def test_using_env_var(self):
-        c = hooks.SqliteHook.get_connection(conn_id='test_uri')
+        c = SqliteHook.get_connection(conn_id='test_uri')
         assert c.host == 'ec2.compute.com'
         assert c.schema == 'the_database'
         assert c.login == 'username'
@@ -1264,7 +1273,7 @@ class ConnectionTest(unittest.TestCase):
         assert c.port == 5432
 
     def test_using_unix_socket_env_var(self):
-        c = hooks.SqliteHook.get_connection(conn_id='test_uri_no_creds')
+        c = SqliteHook.get_connection(conn_id='test_uri_no_creds')
         assert c.host == 'ec2.compute.com'
         assert c.schema == 'the_database'
         assert c.login is None
@@ -1282,12 +1291,12 @@ class ConnectionTest(unittest.TestCase):
         assert c.port is None
 
     def test_env_var_priority(self):
-        c = hooks.SqliteHook.get_connection(conn_id='airflow_db')
+        c = SqliteHook.get_connection(conn_id='airflow_db')
         assert c.host != 'ec2.compute.com'
 
         os.environ['AIRFLOW_CONN_AIRFLOW_DB'] = \
             'postgres://username:password@ec2.compute.com:5432/the_database'
-        c = hooks.SqliteHook.get_connection(conn_id='airflow_db')
+        c = SqliteHook.get_connection(conn_id='airflow_db')
         assert c.host == 'ec2.compute.com'
         assert c.schema == 'the_database'
         assert c.login == 'username'
@@ -1311,15 +1320,21 @@ class WebHDFSHookTest(unittest.TestCase):
         assert c.proxy_user == 'someone'
 
 
-@unittest.skipUnless("S3Hook" in dir(hooks),
-                     "Skipping test because S3Hook is not installed")
+try:
+    from airflow.hooks.S3_hook import S3Hook
+except ImportError:
+    S3Hook = None
+
+
+@unittest.skipIf(S3Hook is None,
+                 "Skipping test because S3Hook is not installed")
 class S3HookTest(unittest.TestCase):
     def setUp(self):
         configuration.test_mode()
         self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
 
     def test_parse_s3_url(self):
-        parsed = hooks.S3Hook.parse_s3_url(self.s3_test_url)
+        parsed = S3Hook.parse_s3_url(self.s3_test_url)
         self.assertEqual(parsed,
                          ("test", "this/is/not/a-real-key.txt"),
                          "Incorrect parsing of the s3 url")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_backfill_pooled_tasks.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_backfill_pooled_tasks.py b/tests/dags/test_backfill_pooled_tasks.py
index 306db7d..4b2ba8f 100644
--- a/tests/dags/test_backfill_pooled_tasks.py
+++ b/tests/dags/test_backfill_pooled_tasks.py
@@ -21,7 +21,7 @@ Addresses issue #1225.
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 
 dag = DAG(dag_id='test_backfill_pooled_task_dag')
 task = DummyOperator(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index ecfa646..8f43b08 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -21,7 +21,9 @@ Addresses issue #1225.
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils.trigger_rule import TriggerRule
 import time
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_scheduler_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py
index ac291e0..224e7c5 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -15,7 +15,7 @@
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 DEFAULT_DATE = datetime(2100, 1, 1)
 
 # DAG tests backfill with pooled tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 0619f3d..2f53fbc 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -26,7 +26,7 @@ from airflow.bin import cli
 from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 2aae476..e4f5aa8 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -27,7 +27,9 @@ from airflow.exceptions import AirflowSkipException
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
 from airflow.models import DagModel
-from airflow.operators import DummyOperator, BashOperator, PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 0006f60..0a7be23 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -18,7 +18,9 @@ import unittest
 
 import airflow
 from airflow.models import DAG, DagBag
-from airflow.operators import BashOperator, DummyOperator, SubDagOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.subdag_operator import SubDagOperator
 from airflow.jobs import BackfillJob
 from airflow.exceptions import AirflowException
 



[3/3] incubator-airflow git commit: Merge pull request #1586 from criccomini/AIRFLOW-200

Posted by cr...@apache.org.
Merge pull request #1586 from criccomini/AIRFLOW-200


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7b382b4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7b382b4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7b382b4e

Branch: refs/heads/master
Commit: 7b382b4e928b96c08890317b2b09d22163882a8a
Parents: be6766a dc84fde
Author: Chris Riccomini <ch...@wepay.com>
Authored: Tue Jun 28 20:18:46 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Jun 28 20:18:46 2016 -0700

----------------------------------------------------------------------
 .../example_dags/example_qubole_operator.py     |   5 +-
 .../contrib/example_dags/example_twitter_dag.py |   3 +-
 airflow/contrib/hooks/__init__.py               |  22 +--
 airflow/contrib/operators/__init__.py           |  22 +--
 .../operators/bigquery_check_operator.py        |   2 +-
 airflow/contrib/operators/qubole_operator.py    |   2 +-
 airflow/contrib/operators/vertica_operator.py   |   2 +-
 airflow/contrib/operators/vertica_to_hive.py    |   2 +-
 airflow/example_dags/example_bash_operator.py   |   3 +-
 airflow/example_dags/example_branch_operator.py |   3 +-
 .../example_branch_python_dop_operator_3.py     |   3 +-
 airflow/example_dags/example_http_operator.py   |   2 +-
 .../example_passing_params_via_test_command.py  |   3 +-
 airflow/example_dags/example_python_operator.py |   2 +-
 .../example_short_circuit_operator.py           |   3 +-
 airflow/example_dags/example_skip_dag.py        |   2 +-
 airflow/example_dags/example_subdag_operator.py |   3 +-
 .../example_trigger_controller_dag.py           |   2 +-
 .../example_dags/example_trigger_target_dag.py  |   3 +-
 airflow/example_dags/example_xcom.py            |   6 +-
 airflow/example_dags/subdags/subdag.py          |   2 +-
 airflow/example_dags/test_utils.py              |   2 +-
 airflow/example_dags/tutorial.py                |   2 +-
 airflow/hooks/__init__.py                       |  31 ++--
 airflow/operators/__init__.py                   |  70 +++------
 airflow/operators/check_operator.py             |   2 +-
 airflow/operators/http_operator.py              |   2 +-
 airflow/operators/presto_check_operator.py      |   2 +-
 airflow/operators/sensors.py                    |   4 +-
 airflow/operators/sqlite_operator.py            |   2 +-
 airflow/utils/helpers.py                        | 144 +++++++++++++++----
 airflow/utils/logging.py                        |   2 +-
 airflow/www/views.py                            |   3 +-
 docs/concepts.rst                               |   4 +-
 docs/tutorial.rst                               |   6 +-
 tests/core.py                                   |  71 +++++----
 tests/dags/test_backfill_pooled_tasks.py        |   2 +-
 tests/dags/test_issue_1225.py                   |   4 +-
 tests/dags/test_scheduler_dags.py               |   2 +-
 tests/jobs.py                                   |   2 +-
 tests/models.py                                 |   4 +-
 tests/operators/subdag_operator.py              |   4 +-
 42 files changed, 258 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7b382b4e/airflow/www/views.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7b382b4e/tests/core.py
----------------------------------------------------------------------