You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/06/29 03:19:01 UTC

[2/3] incubator-airflow git commit: [AIRFLOW-285] Use Airflow 2.0 style imports for all remaining hooks/operators

[AIRFLOW-285] Use Airflow 2.0 style imports for all remaining hooks/operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dc84fdec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dc84fdec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dc84fdec

Branch: refs/heads/master
Commit: dc84fdecdfd9de12392c4e1c92005bd427d3ca37
Parents: dce633e
Author: Chris Riccomini <ch...@wepay.com>
Authored: Mon Jun 27 20:08:48 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Jun 28 13:34:47 2016 -0700

----------------------------------------------------------------------
 .../example_dags/example_qubole_operator.py     |  5 +-
 .../contrib/example_dags/example_twitter_dag.py |  3 +-
 .../operators/bigquery_check_operator.py        |  2 +-
 airflow/contrib/operators/qubole_operator.py    |  2 +-
 airflow/contrib/operators/vertica_operator.py   |  2 +-
 airflow/contrib/operators/vertica_to_hive.py    |  2 +-
 airflow/example_dags/example_bash_operator.py   |  3 +-
 airflow/example_dags/example_branch_operator.py |  3 +-
 .../example_branch_python_dop_operator_3.py     |  3 +-
 airflow/example_dags/example_http_operator.py   |  2 +-
 .../example_passing_params_via_test_command.py  |  3 +-
 airflow/example_dags/example_python_operator.py |  2 +-
 .../example_short_circuit_operator.py           |  3 +-
 airflow/example_dags/example_skip_dag.py        |  2 +-
 airflow/example_dags/example_subdag_operator.py |  3 +-
 .../example_trigger_controller_dag.py           |  2 +-
 .../example_dags/example_trigger_target_dag.py  |  3 +-
 airflow/example_dags/example_xcom.py            |  6 +-
 airflow/example_dags/subdags/subdag.py          |  2 +-
 airflow/example_dags/test_utils.py              |  2 +-
 airflow/example_dags/tutorial.py                |  2 +-
 airflow/operators/check_operator.py             |  2 +-
 airflow/operators/http_operator.py              |  2 +-
 airflow/operators/presto_check_operator.py      |  2 +-
 airflow/operators/sensors.py                    |  4 +-
 airflow/operators/sqlite_operator.py            |  2 +-
 airflow/utils/helpers.py                        | 37 +++++-----
 airflow/utils/logging.py                        |  2 +-
 airflow/www/views.py                            |  3 +-
 docs/concepts.rst                               |  4 +-
 docs/tutorial.rst                               |  6 +-
 tests/core.py                                   | 71 ++++++++++++--------
 tests/dags/test_backfill_pooled_tasks.py        |  2 +-
 tests/dags/test_issue_1225.py                   |  4 +-
 tests/dags/test_scheduler_dags.py               |  2 +-
 tests/jobs.py                                   |  2 +-
 tests/models.py                                 |  4 +-
 tests/operators/subdag_operator.py              |  4 +-
 38 files changed, 121 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index 4f974e2..63cccd3 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -13,8 +13,9 @@
 # limitations under the License.
 
 from airflow import DAG
-from airflow.operators import DummyOperator, PythonOperator, BranchPythonOperator
-from airflow.contrib.operators import QuboleOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
+from airflow.contrib.operators.qubole_operator import QuboleOperator
 from datetime import datetime, timedelta
 import filecmp
 import random

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py
index af1978e..d63b4e8 100644
--- a/airflow/contrib/example_dags/example_twitter_dag.py
+++ b/airflow/contrib/example_dags/example_twitter_dag.py
@@ -23,7 +23,8 @@
 # --------------------------------------------------------------------------------
 
 from airflow import DAG
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.operators.hive_operator import HiveOperator
 from datetime import datetime, date, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/bigquery_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py
index 87e0ad7..10f0b7c 100644
--- a/airflow/contrib/operators/bigquery_check_operator.py
+++ b/airflow/contrib/operators/bigquery_check_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
-from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 9923cec..cbf15c4 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -14,7 +14,7 @@
 
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.contrib.hooks import QuboleHook
+from airflow.contrib.hooks.qubole_hook import QuboleHook
 
 
 class QuboleOperator(BaseOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index 471018f..9266563 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -14,7 +14,7 @@
 
 import logging
 
-from airflow.contrib.hooks import VerticaHook
+from airflow.contrib.hooks.vertica_hook import VerticaHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 4071111..57e4fa8 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -19,7 +19,7 @@ import logging
 from tempfile import NamedTemporaryFile
 
 from airflow.hooks.hive_hooks import HiveCliHook
-from airflow.contrib.hooks import VerticaHook
+from airflow.contrib.hooks.vertica_hook import VerticaHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index c759f4d..0d18bcf 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -12,7 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from builtins import range
-from airflow.operators import BashOperator, DummyOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index edd177a..cc559d0 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -11,7 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators import BranchPythonOperator, DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 import random

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_python_dop_operator_3.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index ff959fc..19bb183 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -13,7 +13,8 @@
 # limitations under the License.
 #
 
-from airflow.operators import BranchPythonOperator, DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 41ea385..12b0448 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -15,7 +15,7 @@
 ### Example HTTP operator and sensor
 """
 from airflow import DAG
-from airflow.operators import SimpleHttpOperator
+from airflow.operators.http_operator import SimpleHttpOperator
 from airflow.operators.sensors import HttpSensor
 from datetime import datetime, timedelta
 import json

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_passing_params_via_test_command.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index af473d9..cd5a251 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -16,7 +16,8 @@
 from datetime import datetime, timedelta
 
 from airflow import DAG
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 
 dag = DAG("example_passing_params_via_test_command",
           default_args={"owner" : "airflow",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index a2f8abd..6c0b93f 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 from __future__ import print_function
 from builtins import range
-from airflow.operators import PythonOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 907cf51..92efe99 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -11,7 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators import ShortCircuitOperator, DummyOperator
+from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 import airflow.utils.helpers
 from datetime import datetime, timedelta

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_skip_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index b55c3a8..a38b126 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.models import DAG
 from datetime import datetime, timedelta
 from airflow.exceptions import AirflowSkipException

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index 57a62c6..b872f43 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -14,7 +14,8 @@
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator, SubDagOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.subdag_operator import SubDagOperator
 
 from airflow.example_dags.subdags.subdag import subdag
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_controller_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index b754d64..eb8cee0 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -29,7 +29,7 @@ This example illustrates the following features :
 """
 
 from airflow import DAG
-from airflow.operators import TriggerDagRunOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from datetime import datetime
 
 import pprint

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_target_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 41a3e36..a2a85f6 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -11,7 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
 from datetime import datetime
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 71cd44e..8dd2666 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -56,13 +56,13 @@ def puller(**kwargs):
     v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
     assert (v1, v2) == (value_1, value_2)
 
-push1 = airflow.operators.PythonOperator(
+push1 = airflow.operators.python_operator.PythonOperator(
     task_id='push', dag=dag, python_callable=push)
 
-push2 = airflow.operators.PythonOperator(
+push2 = airflow.operators.python_operator.PythonOperator(
     task_id='push_by_returning', dag=dag, python_callable=push_by_returning)
 
-pull = airflow.operators.PythonOperator(
+pull = airflow.operators.python_operator.PythonOperator(
     task_id='puller', dag=dag, python_callable=puller)
 
 pull.set_upstream([push1, push2])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/subdags/subdag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py
index c0e1326..82e1dd1 100644
--- a/airflow/example_dags/subdags/subdag.py
+++ b/airflow/example_dags/subdags/subdag.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 
 
 def subdag(parent_dag_name, child_dag_name, args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index 38e50d0..70391c3 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 """Used for unit tests"""
-from airflow.operators import BashOperator
+from airflow.operators.bash_operator import BashOperator
 from airflow.models import DAG
 from datetime import datetime
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/tutorial.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 6bb2cd3..7c89666 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -18,7 +18,7 @@ Documentation that goes along with the Airflow tutorial located
 [here](http://pythonhosted.org/airflow/tutorial.html)
 """
 from airflow import DAG
-from airflow.operators import BashOperator
+from airflow.operators.bash_operator import BashOperator
 from datetime import datetime, timedelta
 
 seven_days_ago = datetime.combine(datetime.today() - timedelta(7),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index e4c8262..83190eb 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -17,7 +17,7 @@ from builtins import str
 import logging
 
 from airflow.exceptions import AirflowException
-from airflow.hooks import BaseHook
+from airflow.hooks.base_hook import BaseHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index ad9bd4f..e5cf339 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -15,7 +15,7 @@
 import logging
 
 from airflow.exceptions import AirflowException
-from airflow.hooks import HttpHook
+from airflow.hooks.http_hook import HttpHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/presto_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py
index c1ac9cf..6207460 100644
--- a/airflow/operators/presto_check_operator.py
+++ b/airflow/operators/presto_check_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.hooks.presto_hook import PrestoHook
-from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 4e4cb3b..90a4d14 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -25,7 +25,7 @@ import airflow
 from airflow import hooks, settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
 from airflow.models import BaseOperator, TaskInstance, Connection as DB
-from airflow.hooks import BaseHook
+from airflow.hooks.base_hook import BaseHook
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
 
@@ -519,7 +519,7 @@ class HttpSensor(BaseSensorOperator):
         self.extra_options = extra_options or {}
         self.response_check = response_check
 
-        self.hook = hooks.HttpHook(method='GET', http_conn_id=http_conn_id)
+        self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id)
 
     def poke(self, context):
         logging.info('Poking: ' + self.endpoint)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 52b3b4b..0ff4d05 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -14,7 +14,7 @@
 
 import logging
 
-from airflow.hooks import SqliteHook
+from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index c79ebee..7e3426e 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -25,6 +25,7 @@ import logging
 import os
 import re
 import sys
+import warnings
 
 from airflow.exceptions import AirflowException
 
@@ -163,8 +164,9 @@ def pprinttable(rows):
 class AirflowImporter(object):
     """
     Importer that dynamically loads a class and module from its parent. This
-    allows Airflow to support `from airflow.operators import BashOperator` even
-    though BashOperator is actually in airflow.operators.bash_operator.
+    allows Airflow to support `from airflow.operators.bash_operator import
+    BashOperator` even though BashOperator is actually in
+    airflow.operators.bash_operator.
 
     The importer also takes over for the parent_module by wrapping it. This is
     required to support attribute-based usage:
@@ -182,9 +184,9 @@ class AirflowImporter(object):
             classes.
         :type module_attributes: string
         """
-        self.parent_module = parent_module
-        self.attribute_modules = self._build_attribute_modules(module_attributes)
-        self.loaded_modules = {}
+        self._parent_module = parent_module
+        self._attribute_modules = self._build_attribute_modules(module_attributes)
+        self._loaded_modules = {}
 
         # Wrap the module so we can take over __getattr__.
         sys.modules[parent_module.__name__] = self
@@ -215,34 +217,33 @@ class AirflowImporter(object):
         """
         Load the class attribute if it hasn't been loaded yet, and return it.
         """
-        module = self.attribute_modules.get(attribute, False)
+        module = self._attribute_modules.get(attribute, False)
 
         if not module:
             # This shouldn't happen. The check happens in find_modules, too.
             raise ImportError(attribute)
-        elif module not in self.loaded_modules:
+        elif module not in self._loaded_modules:
             # Note that it's very important to only load a given modules once.
             # If they are loaded more than once, the memory reference to the
             # class objects changes, and Python thinks that an object of type
             # Foo that was declared before Foo's module was reloaded is no
             # longer the same type as Foo after it's reloaded.
-            path = os.path.realpath(self.parent_module.__file__)
+            path = os.path.realpath(self._parent_module.__file__)
             folder = os.path.dirname(path)
             f, filename, description = imp.find_module(module, [folder])
-            self.loaded_modules[module] = imp.load_module(module, f, filename, description)
+            self._loaded_modules[module] = imp.load_module(module, f, filename, description)
 
             # This functionality is deprecated, and AirflowImporter should be
             # removed in 2.0.
-            from zope.deprecation import deprecated as _deprecated
-            _deprecated(
-                attribute,
+            warnings.warn(
                 "Importing {i} directly from {m} has been "
                 "deprecated. Please import from "
                 "'{m}.[operator_module]' instead. Support for direct "
                 "imports will be dropped entirely in Airflow 2.0.".format(
-                    i=attribute, m=self.parent_module))
+                    i=attribute, m=self._parent_module),
+                DeprecationWarning)
 
-        loaded_module = self.loaded_modules[module]
+        loaded_module = self._loaded_modules[module]
 
         return getattr(loaded_module, attribute)
 
@@ -259,12 +260,12 @@ class AirflowImporter(object):
 
         It also allows normal from imports to work:
 
-            from airflow.operators import BashOperator
+            from airflow.operators.bash_operator import BashOperator
         """
-        if hasattr(self.parent_module, attribute):
+        if hasattr(self._parent_module, attribute):
             # Always default to the parent module if the attribute exists.
-            return getattr(self.parent_module, attribute)
-        elif attribute in self.attribute_modules:
+            return getattr(self._parent_module, attribute)
+        elif attribute in self._attribute_modules:
             # Try and import the attribute if it's got a module defined.
             loaded_attribute = self._load_attribute(attribute)
             setattr(self, attribute, loaded_attribute)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 8f5fc51..79c6cbf 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -128,7 +128,7 @@ class GCSLog(object):
         self.hook = None
 
         try:
-            from airflow.contrib.hooks import GoogleCloudStorageHook
+            from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
             self.hook = GoogleCloudStorageHook(
                 google_cloud_storage_conn_id=remote_conn_id)
         except:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1fb3f91..0bd5b05 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -60,7 +60,8 @@ from airflow.exceptions import AirflowException
 from airflow.settings import Session
 from airflow.models import XCom
 
-from airflow.operators import BaseOperator, SubDagOperator
+from airflow.models import BaseOperator
+from airflow.operators.subdag_operator import SubDagOperator
 
 from airflow.utils.logging import LoggingMixin
 from airflow.utils.json import json_ser

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 6e15ff8..31e7d61 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -485,7 +485,7 @@ the main UI. For example:
 
   #dags/subdag.py
   from airflow.models import DAG
-  from airflow.operators import DummyOperator
+  from airflow.operators.dummy_operator import DummyOperator
 
 
   # Dag is returned by a factory method
@@ -510,7 +510,7 @@ This SubDAG can then be referenced in your main DAG file:
   # main_dag.py
   from datetime import datetime, timedelta
   from airflow.models import DAG
-  from airflow.operators import SubDagOperator
+  from airflow.operators.subdag_operator import SubDagOperator
   from dags.subdag import sub_dag
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/tutorial.rst
----------------------------------------------------------------------
diff --git a/docs/tutorial.rst b/docs/tutorial.rst
index e9d382b..a93479c 100644
--- a/docs/tutorial.rst
+++ b/docs/tutorial.rst
@@ -18,7 +18,7 @@ complicated, a line by line explanation follows below.
     https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
     """
     from airflow import DAG
-    from airflow.operators import BashOperator
+    from airflow.operators.bash_operator import BashOperator
     from datetime import datetime, timedelta
 
 
@@ -100,7 +100,7 @@ Airflow DAG object. Let's start by importing the libraries we will need.
     from airflow import DAG
 
     # Operators; we need this to operate!
-    from airflow.operators import BashOperator
+    from airflow.operators.bash_operator import BashOperator
 
 Default Arguments
 -----------------
@@ -270,7 +270,7 @@ something like this:
     http://airflow.readthedocs.org/en/latest/tutorial.html
     """
     from airflow import DAG
-    from airflow.operators import BashOperator
+    from airflow.operators.bash_operator import BashOperator
     from datetime import datetime, timedelta
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 9c688ea..002ad30 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -36,8 +36,17 @@ from airflow.executors import SequentialExecutor, LocalExecutor
 from airflow.models import Variable
 
 configuration.test_mode()
-from airflow import jobs, models, DAG, operators, hooks, utils, macros, settings, exceptions
-from airflow.hooks import BaseHook
+from airflow import jobs, models, DAG, utils, macros, settings, exceptions
+from airflow.models import BaseOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.http_operator import SimpleHttpOperator
+from airflow.operators import sensors
+from airflow.hooks.base_hook import BaseHook
+from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.bin import cli
 from airflow.www import app as application
 from airflow.settings import Session
@@ -85,7 +94,7 @@ def reset(dag_id=TEST_DAG_ID):
 reset()
 
 
-class OperatorSubclass(operators.BaseOperator):
+class OperatorSubclass(BaseOperator):
     """
     An operator to test template substitution
     """
@@ -305,7 +314,7 @@ class CoreTest(unittest.TestCase):
         assert hash(self.dag) != hash(dag_subclass)
 
     def test_time_sensor(self):
-        t = operators.sensors.TimeSensor(
+        t = sensors.TimeSensor(
             task_id='time_sensor_check',
             target_time=time(0),
             dag=self.dag)
@@ -319,14 +328,14 @@ class CoreTest(unittest.TestCase):
         captainHook.run("CREATE TABLE operator_test_table (a, b)")
         captainHook.run("insert into operator_test_table values (1,2)")
 
-        t = operators.CheckOperator(
+        t = CheckOperator(
             task_id='check',
             sql="select count(*) from operator_test_table",
             conn_id=conn_id,
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
-        t = operators.ValueCheckOperator(
+        t = ValueCheckOperator(
             task_id='value_check',
             pass_value=95,
             tolerance=0.1,
@@ -350,7 +359,7 @@ class CoreTest(unittest.TestCase):
         Tests that Operators reject illegal arguments
         """
         with warnings.catch_warnings(record=True) as w:
-            t = operators.BashOperator(
+            t = BashOperator(
                 task_id='test_illegal_args',
                 bash_command='echo success',
                 dag=self.dag,
@@ -362,14 +371,14 @@ class CoreTest(unittest.TestCase):
                 w[0].message.args[0])
 
     def test_bash_operator(self):
-        t = operators.BashOperator(
+        t = BashOperator(
             task_id='time_sensor_check',
             bash_command="echo success",
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_bash_operator_multi_byte_output(self):
-        t = operators.BashOperator(
+        t = BashOperator(
             task_id='test_multi_byte_bash_operator',
             bash_command=u"echo \u2600",
             dag=self.dag,
@@ -381,7 +390,7 @@ class CoreTest(unittest.TestCase):
             if True:
                 return obj
 
-        t = operators.TriggerDagRunOperator(
+        t = TriggerDagRunOperator(
             task_id='test_trigger_dagrun',
             trigger_dag_id='example_bash_operator',
             python_callable=trigga,
@@ -389,7 +398,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_dryrun(self):
-        t = operators.BashOperator(
+        t = BashOperator(
             task_id='time_sensor_check',
             bash_command="echo success",
             dag=self.dag)
@@ -404,14 +413,14 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_timedelta_sensor(self):
-        t = operators.sensors.TimeDeltaSensor(
+        t = sensors.TimeDeltaSensor(
             task_id='timedelta_sensor_check',
             delta=timedelta(seconds=2),
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_external_task_sensor(self):
-        t = operators.sensors.ExternalTaskSensor(
+        t = sensors.ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
@@ -419,7 +428,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_external_task_sensor_delta(self):
-        t = operators.sensors.ExternalTaskSensor(
+        t = sensors.ExternalTaskSensor(
             task_id='test_external_task_sensor_check_delta',
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
@@ -429,7 +438,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_timeout(self):
-        t = operators.PythonOperator(
+        t = PythonOperator(
             task_id='test_timeout',
             execution_timeout=timedelta(seconds=1),
             python_callable=lambda: sleep(5),
@@ -444,7 +453,7 @@ class CoreTest(unittest.TestCase):
             if not templates_dict['ds'] == ds:
                 raise Exception("failure")
 
-        t = operators.PythonOperator(
+        t = PythonOperator(
             task_id='test_py_op',
             provide_context=True,
             python_callable=test_py_op,
@@ -460,7 +469,7 @@ class CoreTest(unittest.TestCase):
             task_id='test_complex_template',
             some_templated_field={
                 'foo': '123',
-                'bar': ['baz', ' {{ ds }}']
+                'bar': ['baz', '{{ ds }}']
             },
             on_success_callback=verify_templated_field,
             dag=self.dag)
@@ -698,7 +707,7 @@ class CoreTest(unittest.TestCase):
 
     def test_bad_trigger_rule(self):
         with self.assertRaises(AirflowException):
-            operators.DummyOperator(
+            DummyOperator(
                 task_id='test_bad_trigger',
                 trigger_rule="non_existant",
                 dag=self.dag)
@@ -1195,7 +1204,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_get(self):
-        t = operators.SimpleHttpOperator(
+        t = SimpleHttpOperator(
             task_id='get_op',
             method='GET',
             endpoint='/search',
@@ -1206,7 +1215,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_get_response_check(self):
-        t = operators.SimpleHttpOperator(
+        t = SimpleHttpOperator(
             task_id='get_op',
             method='GET',
             endpoint='/search',
@@ -1218,7 +1227,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_sensor(self):
-        sensor = operators.sensors.HttpSensor(
+        sensor = sensors.HttpSensor(
             task_id='http_sensor_check',
             conn_id='http_default',
             endpoint='/search',
@@ -1256,7 +1265,7 @@ class ConnectionTest(unittest.TestCase):
                 del os.environ[ev]
 
     def test_using_env_var(self):
-        c = hooks.SqliteHook.get_connection(conn_id='test_uri')
+        c = SqliteHook.get_connection(conn_id='test_uri')
         assert c.host == 'ec2.compute.com'
         assert c.schema == 'the_database'
         assert c.login == 'username'
@@ -1264,7 +1273,7 @@ class ConnectionTest(unittest.TestCase):
         assert c.port == 5432
 
     def test_using_unix_socket_env_var(self):
-        c = hooks.SqliteHook.get_connection(conn_id='test_uri_no_creds')
+        c = SqliteHook.get_connection(conn_id='test_uri_no_creds')
         assert c.host == 'ec2.compute.com'
         assert c.schema == 'the_database'
         assert c.login is None
@@ -1282,12 +1291,12 @@ class ConnectionTest(unittest.TestCase):
         assert c.port is None
 
     def test_env_var_priority(self):
-        c = hooks.SqliteHook.get_connection(conn_id='airflow_db')
+        c = SqliteHook.get_connection(conn_id='airflow_db')
         assert c.host != 'ec2.compute.com'
 
         os.environ['AIRFLOW_CONN_AIRFLOW_DB'] = \
             'postgres://username:password@ec2.compute.com:5432/the_database'
-        c = hooks.SqliteHook.get_connection(conn_id='airflow_db')
+        c = SqliteHook.get_connection(conn_id='airflow_db')
         assert c.host == 'ec2.compute.com'
         assert c.schema == 'the_database'
         assert c.login == 'username'
@@ -1311,15 +1320,21 @@ class WebHDFSHookTest(unittest.TestCase):
         assert c.proxy_user == 'someone'
 
 
-@unittest.skipUnless("S3Hook" in dir(hooks),
-                     "Skipping test because S3Hook is not installed")
+try:
+    from airflow.hooks.S3_hook import S3Hook
+except ImportError:
+    S3Hook = None
+
+
+@unittest.skipIf(S3Hook is None,
+                 "Skipping test because S3Hook is not installed")
 class S3HookTest(unittest.TestCase):
     def setUp(self):
         configuration.test_mode()
         self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
 
     def test_parse_s3_url(self):
-        parsed = hooks.S3Hook.parse_s3_url(self.s3_test_url)
+        parsed = S3Hook.parse_s3_url(self.s3_test_url)
         self.assertEqual(parsed,
                          ("test", "this/is/not/a-real-key.txt"),
                          "Incorrect parsing of the s3 url")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_backfill_pooled_tasks.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_backfill_pooled_tasks.py b/tests/dags/test_backfill_pooled_tasks.py
index 306db7d..4b2ba8f 100644
--- a/tests/dags/test_backfill_pooled_tasks.py
+++ b/tests/dags/test_backfill_pooled_tasks.py
@@ -21,7 +21,7 @@ Addresses issue #1225.
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 
 dag = DAG(dag_id='test_backfill_pooled_task_dag')
 task = DummyOperator(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index ecfa646..8f43b08 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -21,7 +21,9 @@ Addresses issue #1225.
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils.trigger_rule import TriggerRule
 import time
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_scheduler_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py
index ac291e0..224e7c5 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -15,7 +15,7 @@
 from datetime import datetime
 
 from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 DEFAULT_DATE = datetime(2100, 1, 1)
 
 # DAG tests backfill with pooled tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 0619f3d..2f53fbc 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -26,7 +26,7 @@ from airflow.bin import cli
 from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 2aae476..e4f5aa8 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -27,7 +27,9 @@ from airflow.exceptions import AirflowSkipException
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
 from airflow.models import DagModel
-from airflow.operators import DummyOperator, BashOperator, PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 0006f60..0a7be23 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -18,7 +18,9 @@ import unittest
 
 import airflow
 from airflow.models import DAG, DagBag
-from airflow.operators import BashOperator, DummyOperator, SubDagOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.subdag_operator import SubDagOperator
 from airflow.jobs import BackfillJob
 from airflow.exceptions import AirflowException