You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/06/29 03:19:00 UTC
[1/3] incubator-airflow git commit: [AIRFLOW-200] Make hook/operator
imports lazy, and print proper exceptions
Repository: incubator-airflow
Updated Branches:
refs/heads/master be6766a6a -> 7b382b4e9
[AIRFLOW-200] Make hook/operator imports lazy, and print proper exceptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dce633ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dce633ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dce633ee
Branch: refs/heads/master
Commit: dce633ee53500f7a293a66b60e2c19307123d1ce
Parents: d15f8ca
Author: Chris Riccomini <ch...@wepay.com>
Authored: Mon Jun 27 13:32:07 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Jun 27 13:32:07 2016 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/__init__.py | 22 +----
airflow/contrib/operators/__init__.py | 22 ++---
airflow/hooks/__init__.py | 31 ++-----
airflow/operators/__init__.py | 70 +++++---------
airflow/utils/helpers.py | 143 +++++++++++++++++++++++------
5 files changed, 155 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 83b505d..237c831 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -11,16 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+#
# Contrib hooks are not imported by default. They should be accessed
# directly: from airflow.contrib.hooks.hook_module import Hook
-
-
-
+import sys
# ------------------------------------------------------------------------
@@ -33,11 +31,6 @@
# for compatibility.
#
# ------------------------------------------------------------------------
-
-# Imports the hooks dynamically while keeping the package API clean,
-# abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-
_hooks = {
'ftp_hook': ['FTPHook'],
'ftps_hook': ['FTPSHook'],
@@ -54,12 +47,5 @@ _hooks = {
import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
- from zope.deprecation import deprecated as _deprecated
- _imported = _import_module_attrs(globals(), _hooks)
- for _i in _imported:
- _deprecated(
- _i,
- "Importing {i} directly from 'contrib.hooks' has been "
- "deprecated. Please import from "
- "'contrib.hooks.[hook_module]' instead. Support for direct imports "
- "will be dropped entirely in Airflow 2.0.".format(i=_i))
+ from airflow.utils.helpers import AirflowImporter
+ airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index 5ac2d45..ae481ea 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -11,12 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+#
# Contrib operators are not imported by default. They should be accessed
# directly: from airflow.contrib.operators.operator_module import Operator
+import sys
+
+
# ------------------------------------------------------------------------
#
# #TODO #FIXME Airflow 2.0
@@ -27,27 +31,15 @@
# for compatibility.
#
# ------------------------------------------------------------------------
-
-# Imports the operators dynamically while keeping the package API clean,
-# abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-
_operators = {
'ssh_execute_operator': ['SSHExecuteOperator'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator'],
- 'fs': ['FileSensor']
+ 'fs_operator': ['FileSensor']
}
import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
- from zope.deprecation import deprecated as _deprecated
- _imported = _import_module_attrs(globals(), _operators)
- for _i in _imported:
- _deprecated(
- _i,
- "Importing {i} directly from 'contrib.operators' has been "
- "deprecated. Please import from "
- "'contrib.operators.[operator_module]' instead. Support for direct "
- "imports will be dropped entirely in Airflow 2.0.".format(i=_i))
+ from airflow.utils.helpers import AirflowImporter
+ airflow_importer = AirflowImporter(sys.modules[__name__], _operators)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 5209a51..29d4379 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -11,13 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+#
+
+
+import sys
-# Only import Core Airflow Operators that don't have extra requirements.
-# All other operators must be imported directly.
-from .base_hook import BaseHook
-from .dbapi_hook import DbApiHook
-from .http_hook import HttpHook
-from .sqlite_hook import SqliteHook
# ------------------------------------------------------------------------
#
@@ -34,10 +32,8 @@ from .sqlite_hook import SqliteHook
# abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-from airflow.hooks.base_hook import BaseHook # noqa to expose in package
-
_hooks = {
+ 'base_hook': ['BaseHook'],
'hive_hooks': [
'HiveCliHook',
'HiveMetastoreHook',
@@ -50,9 +46,9 @@ _hooks = {
'postgres_hook': ['PostgresHook'],
'presto_hook': ['PrestoHook'],
'samba_hook': ['SambaHook'],
- # 'sqlite_hook': ['SqliteHook'],
+ 'sqlite_hook': ['SqliteHook'],
'S3_hook': ['S3Hook'],
- # 'http_hook': ['HttpHook'],
+ 'http_hook': ['HttpHook'],
'druid_hook': ['DruidHook'],
'jdbc_hook': ['JdbcHook'],
'dbapi_hook': ['DbApiHook'],
@@ -60,19 +56,10 @@ _hooks = {
'oracle_hook': ['OracleHook'],
}
-
import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
- from zope.deprecation import deprecated as _deprecated
- _imported = _import_module_attrs(globals(), _hooks)
- for _i in _imported:
- _deprecated(
- _i,
- "Importing {i} directly from 'airflow.hooks' has been "
- "deprecated. Please import from "
- "'airflow.hooks.[hook_module]' instead. Support for direct imports "
- "will be dropped entirely in Airflow 2.0.".format(i=_i))
-
+ from airflow.utils.helpers import AirflowImporter
+ airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)
def _integrate_plugins():
"""Integrate plugins to the context"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 0d2c403..5e92b13 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -11,28 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+#
-# Only import Core Airflow Operators that don't have extra requirements.
-# All other operators must be imported directly.
+import sys
from airflow.models import BaseOperator
-from .bash_operator import BashOperator
-from .python_operator import (
- BranchPythonOperator,
- PythonOperator,
- ShortCircuitOperator)
-from .check_operator import (
- CheckOperator,
- ValueCheckOperator,
- IntervalCheckOperator)
-from .dagrun_operator import TriggerDagRunOperator
-from .dummy_operator import DummyOperator
-from .email_operator import EmailOperator
-from .http_operator import SimpleHttpOperator
-import airflow.operators.sensors
-from .subdag_operator import SubDagOperator
-
-
# ------------------------------------------------------------------------
@@ -46,26 +29,23 @@ from .subdag_operator import SubDagOperator
#
# ------------------------------------------------------------------------
+
# Imports operators dynamically while keeping the package API clean,
# abstracting the underlying modules
-from airflow.utils.helpers import import_module_attrs as _import_module_attrs
-# These need to be integrated first as other operators depend on them
-# _import_module_attrs(globals(), {
-# 'check_operator': [
-# 'CheckOperator',
-# 'ValueCheckOperator',
-# 'IntervalCheckOperator',
-# ],
-# })
_operators = {
- # 'bash_operator': ['BashOperator'],
- # 'python_operator': [
- # 'PythonOperator',
- # 'BranchPythonOperator',
- # 'ShortCircuitOperator',
- # ],
+ 'bash_operator': ['BashOperator'],
+ 'check_operator': [
+ 'CheckOperator',
+ 'ValueCheckOperator',
+ 'IntervalCheckOperator',
+ ],
+ 'python_operator': [
+ 'PythonOperator',
+ 'BranchPythonOperator',
+ 'ShortCircuitOperator',
+ ],
'hive_operator': ['HiveOperator'],
'pig_operator': ['PigOperator'],
'presto_check_operator': [
@@ -73,9 +53,9 @@ _operators = {
'PrestoValueCheckOperator',
'PrestoIntervalCheckOperator',
],
- # 'dagrun_operator': ['TriggerDagRunOperator'],
- # 'dummy_operator': ['DummyOperator'],
- # 'email_operator': ['EmailOperator'],
+ 'dagrun_operator': ['TriggerDagRunOperator'],
+ 'dummy_operator': ['DummyOperator'],
+ 'email_operator': ['EmailOperator'],
'hive_to_samba_operator': ['Hive2SambaOperator'],
'mysql_operator': ['MySqlOperator'],
'sqlite_operator': ['SqliteOperator'],
@@ -95,13 +75,13 @@ _operators = {
'TimeSensor',
'WebHdfsSensor',
],
- # 'subdag_operator': ['SubDagOperator'],
+ 'subdag_operator': ['SubDagOperator'],
'hive_stats_operator': ['HiveStatsCollectionOperator'],
's3_to_hive_operator': ['S3ToHiveTransfer'],
'hive_to_mysql': ['HiveToMySqlTransfer'],
'presto_to_mysql': ['PrestoToMySqlTransfer'],
's3_file_transform_operator': ['S3FileTransformOperator'],
- # 'http_operator': ['SimpleHttpOperator'],
+ 'http_operator': ['SimpleHttpOperator'],
'hive_to_druid': ['HiveToDruidTransfer'],
'jdbc_operator': ['JdbcOperator'],
'mssql_operator': ['MsSqlOperator'],
@@ -113,16 +93,8 @@ _operators = {
import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
- from zope.deprecation import deprecated as _deprecated
- _imported = _import_module_attrs(globals(), _operators)
- for _i in _imported:
- _deprecated(
- _i,
- "Importing {i} directly from 'airflow.operators' has been "
- "deprecated. Please import from "
- "'airflow.operators.[operator_module]' instead. Support for direct "
- "imports will be dropped entirely in Airflow 2.0.".format(i=_i))
-
+ from airflow.utils.helpers import AirflowImporter
+ airflow_importer = AirflowImporter(sys.modules[__name__], _operators)
def _integrate_plugins():
"""Integrate plugins to the context"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dce633ee/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 527c9ac..c79ebee 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -24,6 +24,7 @@ import imp
import logging
import os
import re
+import sys
from airflow.exceptions import AirflowException
@@ -72,35 +73,6 @@ def ask_yesno(question):
else:
print("Please respond by yes or no.")
-
-def import_module_attrs(parent_module_globals, module_attrs_dict):
- '''
- Attempts to import a set of modules and specified attributes in the
- form of a dictionary. The attributes are copied in the parent module's
- namespace. The function returns a list of attributes names that can be
- affected to __all__.
-
- This is used in the context of ``operators`` and ``hooks`` and
- silence the import errors for when libraries are missing. It makes
- for a clean package abstracting the underlying modules and only
- brings functional operators to those namespaces.
- '''
- imported_attrs = []
- for mod, attrs in list(module_attrs_dict.items()):
- try:
- path = os.path.realpath(parent_module_globals['__file__'])
- folder = os.path.dirname(path)
- f, filename, description = imp.find_module(mod, [folder])
- module = imp.load_module(mod, f, filename, description)
- for attr in attrs:
- parent_module_globals[attr] = getattr(module, attr)
- imported_attrs += [attr]
- except Exception as err:
- logging.debug("Error importing module {mod}: {err}".format(
- mod=mod, err=err))
- return imported_attrs
-
-
def is_in(obj, l):
"""
Checks whether an object is one of the item in the list.
@@ -186,3 +158,116 @@ def pprinttable(rows):
s += pattern % tuple(f(t) for t in line) + '\n'
s += separator + '\n'
return s
+
+
+class AirflowImporter(object):
+ """
+ Importer that dynamically loads a class and module from its parent. This
+ allows Airflow to support `from airflow.operators import BashOperator` even
+ though BashOperator is actually in airflow.operators.bash_operator.
+
+ The importer also takes over for the parent_module by wrapping it. This is
+ required to support attribute-based usage:
+
+ from airflow import operators
+ operators.BashOperator(...)
+ """
+
+ def __init__(self, parent_module, module_attributes):
+ """
+ :param parent_module: The string package name of the parent module. For
+ example, 'airflow.operators'
+ :type parent_module: string
+ :param module_attributes: The file to class mappings for all importable
+ classes.
+ :type module_attributes: string
+ """
+ self.parent_module = parent_module
+ self.attribute_modules = self._build_attribute_modules(module_attributes)
+ self.loaded_modules = {}
+
+ # Wrap the module so we can take over __getattr__.
+ sys.modules[parent_module.__name__] = self
+
+ @staticmethod
+ def _build_attribute_modules(module_attributes):
+ """
+ Flips and flattens the module_attributes dictionary from:
+
+ module => [Attribute, ...]
+
+ To:
+
+ Attribute => module
+
+ This is useful so that we can find the module to use, given an
+ attribute.
+ """
+ attribute_modules = {}
+
+ for module, attributes in list(module_attributes.items()):
+ for attribute in attributes:
+ attribute_modules[attribute] = module
+
+ return attribute_modules
+
+ def _load_attribute(self, attribute):
+ """
+ Load the class attribute if it hasn't been loaded yet, and return it.
+ """
+ module = self.attribute_modules.get(attribute, False)
+
+ if not module:
+ # This shouldn't happen. The check happens in find_modules, too.
+ raise ImportError(attribute)
+ elif module not in self.loaded_modules:
+ # Note that it's very important to only load a given modules once.
+ # If they are loaded more than once, the memory reference to the
+ # class objects changes, and Python thinks that an object of type
+ # Foo that was declared before Foo's module was reloaded is no
+ # longer the same type as Foo after it's reloaded.
+ path = os.path.realpath(self.parent_module.__file__)
+ folder = os.path.dirname(path)
+ f, filename, description = imp.find_module(module, [folder])
+ self.loaded_modules[module] = imp.load_module(module, f, filename, description)
+
+ # This functionality is deprecated, and AirflowImporter should be
+ # removed in 2.0.
+ from zope.deprecation import deprecated as _deprecated
+ _deprecated(
+ attribute,
+ "Importing {i} directly from {m} has been "
+ "deprecated. Please import from "
+ "'{m}.[operator_module]' instead. Support for direct "
+ "imports will be dropped entirely in Airflow 2.0.".format(
+ i=attribute, m=self.parent_module))
+
+ loaded_module = self.loaded_modules[module]
+
+ return getattr(loaded_module, attribute)
+
+ def __getattr__(self, attribute):
+ """
+ Get an attribute from the wrapped module. If the attribute doesn't
+ exist, try and import it as a class from a submodule.
+
+ This is a Python trick that allows the class to pretend it's a module,
+ so that attribute-based usage works:
+
+ from airflow import operators
+ operators.BashOperator(...)
+
+ It also allows normal from imports to work:
+
+ from airflow.operators import BashOperator
+ """
+ if hasattr(self.parent_module, attribute):
+ # Always default to the parent module if the attribute exists.
+ return getattr(self.parent_module, attribute)
+ elif attribute in self.attribute_modules:
+ # Try and import the attribute if it's got a module defined.
+ loaded_attribute = self._load_attribute(attribute)
+ setattr(self, attribute, loaded_attribute)
+ return loaded_attribute
+
+ raise AttributeError
[2/3] incubator-airflow git commit: [AIRFLOW-285] Use Airflow 2.0
style imports for all remaining hooks/operators
Posted by cr...@apache.org.
[AIRFLOW-285] Use Airflow 2.0 style imports for all remaining hooks/operators
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dc84fdec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dc84fdec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dc84fdec
Branch: refs/heads/master
Commit: dc84fdecdfd9de12392c4e1c92005bd427d3ca37
Parents: dce633e
Author: Chris Riccomini <ch...@wepay.com>
Authored: Mon Jun 27 20:08:48 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Jun 28 13:34:47 2016 -0700
----------------------------------------------------------------------
.../example_dags/example_qubole_operator.py | 5 +-
.../contrib/example_dags/example_twitter_dag.py | 3 +-
.../operators/bigquery_check_operator.py | 2 +-
airflow/contrib/operators/qubole_operator.py | 2 +-
airflow/contrib/operators/vertica_operator.py | 2 +-
airflow/contrib/operators/vertica_to_hive.py | 2 +-
airflow/example_dags/example_bash_operator.py | 3 +-
airflow/example_dags/example_branch_operator.py | 3 +-
.../example_branch_python_dop_operator_3.py | 3 +-
airflow/example_dags/example_http_operator.py | 2 +-
.../example_passing_params_via_test_command.py | 3 +-
airflow/example_dags/example_python_operator.py | 2 +-
.../example_short_circuit_operator.py | 3 +-
airflow/example_dags/example_skip_dag.py | 2 +-
airflow/example_dags/example_subdag_operator.py | 3 +-
.../example_trigger_controller_dag.py | 2 +-
.../example_dags/example_trigger_target_dag.py | 3 +-
airflow/example_dags/example_xcom.py | 6 +-
airflow/example_dags/subdags/subdag.py | 2 +-
airflow/example_dags/test_utils.py | 2 +-
airflow/example_dags/tutorial.py | 2 +-
airflow/operators/check_operator.py | 2 +-
airflow/operators/http_operator.py | 2 +-
airflow/operators/presto_check_operator.py | 2 +-
airflow/operators/sensors.py | 4 +-
airflow/operators/sqlite_operator.py | 2 +-
airflow/utils/helpers.py | 37 +++++-----
airflow/utils/logging.py | 2 +-
airflow/www/views.py | 3 +-
docs/concepts.rst | 4 +-
docs/tutorial.rst | 6 +-
tests/core.py | 71 ++++++++++++--------
tests/dags/test_backfill_pooled_tasks.py | 2 +-
tests/dags/test_issue_1225.py | 4 +-
tests/dags/test_scheduler_dags.py | 2 +-
tests/jobs.py | 2 +-
tests/models.py | 4 +-
tests/operators/subdag_operator.py | 4 +-
38 files changed, 121 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index 4f974e2..63cccd3 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -13,8 +13,9 @@
# limitations under the License.
from airflow import DAG
-from airflow.operators import DummyOperator, PythonOperator, BranchPythonOperator
-from airflow.contrib.operators import QuboleOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
+from airflow.contrib.operators.qubole_operator import QuboleOperator
from datetime import datetime, timedelta
import filecmp
import random
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py
index af1978e..d63b4e8 100644
--- a/airflow/contrib/example_dags/example_twitter_dag.py
+++ b/airflow/contrib/example_dags/example_twitter_dag.py
@@ -23,7 +23,8 @@
# --------------------------------------------------------------------------------
from airflow import DAG
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, date, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/bigquery_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py
index 87e0ad7..10f0b7c 100644
--- a/airflow/contrib/operators/bigquery_check_operator.py
+++ b/airflow/contrib/operators/bigquery_check_operator.py
@@ -13,7 +13,7 @@
# limitations under the License.
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
-from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 9923cec..cbf15c4 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -14,7 +14,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
-from airflow.contrib.hooks import QuboleHook
+from airflow.contrib.hooks.qubole_hook import QuboleHook
class QuboleOperator(BaseOperator):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index 471018f..9266563 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -14,7 +14,7 @@
import logging
-from airflow.contrib.hooks import VerticaHook
+from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 4071111..57e4fa8 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -19,7 +19,7 @@ import logging
from tempfile import NamedTemporaryFile
from airflow.hooks.hive_hooks import HiveCliHook
-from airflow.contrib.hooks import VerticaHook
+from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index c759f4d..0d18bcf 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from builtins import range
-from airflow.operators import BashOperator, DummyOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index edd177a..cc559d0 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators import BranchPythonOperator, DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import random
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_python_dop_operator_3.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index ff959fc..19bb183 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -13,7 +13,8 @@
# limitations under the License.
#
-from airflow.operators import BranchPythonOperator, DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 41ea385..12b0448 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -15,7 +15,7 @@
### Example HTTP operator and sensor
"""
from airflow import DAG
-from airflow.operators import SimpleHttpOperator
+from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_passing_params_via_test_command.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index af473d9..cd5a251 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -16,7 +16,8 @@
from datetime import datetime, timedelta
from airflow import DAG
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
dag = DAG("example_passing_params_via_test_command",
default_args={"owner" : "airflow",
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index a2f8abd..6c0b93f 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import print_function
from builtins import range
-from airflow.operators import PythonOperator
+from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 907cf51..92efe99 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators import ShortCircuitOperator, DummyOperator
+from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
import airflow.utils.helpers
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_skip_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index b55c3a8..a38b126 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.exceptions import AirflowSkipException
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index 57a62c6..b872f43 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -14,7 +14,8 @@
from datetime import datetime
from airflow.models import DAG
-from airflow.operators import DummyOperator, SubDagOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.subdag_operator import SubDagOperator
from airflow.example_dags.subdags.subdag import subdag
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_controller_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index b754d64..eb8cee0 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -29,7 +29,7 @@ This example illustrates the following features :
"""
from airflow import DAG
-from airflow.operators import TriggerDagRunOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
import pprint
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_target_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 41a3e36..a2a85f6 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators import BashOperator, PythonOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 71cd44e..8dd2666 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -56,13 +56,13 @@ def puller(**kwargs):
v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
assert (v1, v2) == (value_1, value_2)
-push1 = airflow.operators.PythonOperator(
+push1 = airflow.operators.python_operator.PythonOperator(
task_id='push', dag=dag, python_callable=push)
-push2 = airflow.operators.PythonOperator(
+push2 = airflow.operators.python_operator.PythonOperator(
task_id='push_by_returning', dag=dag, python_callable=push_by_returning)
-pull = airflow.operators.PythonOperator(
+pull = airflow.operators.python_operator.PythonOperator(
task_id='puller', dag=dag, python_callable=puller)
pull.set_upstream([push1, push2])
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/subdags/subdag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py
index c0e1326..82e1dd1 100644
--- a/airflow/example_dags/subdags/subdag.py
+++ b/airflow/example_dags/subdags/subdag.py
@@ -13,7 +13,7 @@
# limitations under the License.
from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index 38e50d0..70391c3 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Used for unit tests"""
-from airflow.operators import BashOperator
+from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/tutorial.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 6bb2cd3..7c89666 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -18,7 +18,7 @@ Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
from airflow import DAG
-from airflow.operators import BashOperator
+from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index e4c8262..83190eb 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -17,7 +17,7 @@ from builtins import str
import logging
from airflow.exceptions import AirflowException
-from airflow.hooks import BaseHook
+from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index ad9bd4f..e5cf339 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -15,7 +15,7 @@
import logging
from airflow.exceptions import AirflowException
-from airflow.hooks import HttpHook
+from airflow.hooks.http_hook import HttpHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/presto_check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py
index c1ac9cf..6207460 100644
--- a/airflow/operators/presto_check_operator.py
+++ b/airflow/operators/presto_check_operator.py
@@ -13,7 +13,7 @@
# limitations under the License.
from airflow.hooks.presto_hook import PrestoHook
-from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 4e4cb3b..90a4d14 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -25,7 +25,7 @@ import airflow
from airflow import hooks, settings
from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
from airflow.models import BaseOperator, TaskInstance, Connection as DB
-from airflow.hooks import BaseHook
+from airflow.hooks.base_hook import BaseHook
from airflow.utils.state import State
from airflow.utils.decorators import apply_defaults
@@ -519,7 +519,7 @@ class HttpSensor(BaseSensorOperator):
self.extra_options = extra_options or {}
self.response_check = response_check
- self.hook = hooks.HttpHook(method='GET', http_conn_id=http_conn_id)
+ self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id)
def poke(self, context):
logging.info('Poking: ' + self.endpoint)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 52b3b4b..0ff4d05 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -14,7 +14,7 @@
import logging
-from airflow.hooks import SqliteHook
+from airflow.hooks.sqlite_hook import SqliteHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index c79ebee..7e3426e 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -25,6 +25,7 @@ import logging
import os
import re
import sys
+import warnings
from airflow.exceptions import AirflowException
@@ -163,8 +164,9 @@ def pprinttable(rows):
class AirflowImporter(object):
"""
Importer that dynamically loads a class and module from its parent. This
- allows Airflow to support `from airflow.operators import BashOperator` even
- though BashOperator is actually in airflow.operators.bash_operator.
+ allows Airflow to support `from airflow.operators.bash_operator import
+ BashOperator` even though BashOperator is actually in
+ airflow.operators.bash_operator.
The importer also takes over for the parent_module by wrapping it. This is
required to support attribute-based usage:
@@ -182,9 +184,9 @@ class AirflowImporter(object):
classes.
:type module_attributes: string
"""
- self.parent_module = parent_module
- self.attribute_modules = self._build_attribute_modules(module_attributes)
- self.loaded_modules = {}
+ self._parent_module = parent_module
+ self._attribute_modules = self._build_attribute_modules(module_attributes)
+ self._loaded_modules = {}
# Wrap the module so we can take over __getattr__.
sys.modules[parent_module.__name__] = self
@@ -215,34 +217,33 @@ class AirflowImporter(object):
"""
Load the class attribute if it hasn't been loaded yet, and return it.
"""
- module = self.attribute_modules.get(attribute, False)
+ module = self._attribute_modules.get(attribute, False)
if not module:
# This shouldn't happen. The check happens in find_modules, too.
raise ImportError(attribute)
- elif module not in self.loaded_modules:
+ elif module not in self._loaded_modules:
# Note that it's very important to only load a given modules once.
# If they are loaded more than once, the memory reference to the
# class objects changes, and Python thinks that an object of type
# Foo that was declared before Foo's module was reloaded is no
# longer the same type as Foo after it's reloaded.
- path = os.path.realpath(self.parent_module.__file__)
+ path = os.path.realpath(self._parent_module.__file__)
folder = os.path.dirname(path)
f, filename, description = imp.find_module(module, [folder])
- self.loaded_modules[module] = imp.load_module(module, f, filename, description)
+ self._loaded_modules[module] = imp.load_module(module, f, filename, description)
# This functionality is deprecated, and AirflowImporter should be
# removed in 2.0.
- from zope.deprecation import deprecated as _deprecated
- _deprecated(
- attribute,
+ warnings.warn(
"Importing {i} directly from {m} has been "
"deprecated. Please import from "
"'{m}.[operator_module]' instead. Support for direct "
"imports will be dropped entirely in Airflow 2.0.".format(
- i=attribute, m=self.parent_module))
+ i=attribute, m=self._parent_module),
+ DeprecationWarning)
- loaded_module = self.loaded_modules[module]
+ loaded_module = self._loaded_modules[module]
return getattr(loaded_module, attribute)
@@ -259,12 +260,12 @@ class AirflowImporter(object):
It also allows normal from imports to work:
- from airflow.operators import BashOperator
+ from airflow.operators.bash_operator import BashOperator
"""
- if hasattr(self.parent_module, attribute):
+ if hasattr(self._parent_module, attribute):
# Always default to the parent module if the attribute exists.
- return getattr(self.parent_module, attribute)
- elif attribute in self.attribute_modules:
+ return getattr(self._parent_module, attribute)
+ elif attribute in self._attribute_modules:
# Try and import the attribute if it's got a module defined.
loaded_attribute = self._load_attribute(attribute)
setattr(self, attribute, loaded_attribute)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 8f5fc51..79c6cbf 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -128,7 +128,7 @@ class GCSLog(object):
self.hook = None
try:
- from airflow.contrib.hooks import GoogleCloudStorageHook
+ from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
self.hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=remote_conn_id)
except:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1fb3f91..0bd5b05 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -60,7 +60,8 @@ from airflow.exceptions import AirflowException
from airflow.settings import Session
from airflow.models import XCom
-from airflow.operators import BaseOperator, SubDagOperator
+from airflow.models import BaseOperator
+from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.logging import LoggingMixin
from airflow.utils.json import json_ser
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/concepts.rst
----------------------------------------------------------------------
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 6e15ff8..31e7d61 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -485,7 +485,7 @@ the main UI. For example:
#dags/subdag.py
from airflow.models import DAG
- from airflow.operators import DummyOperator
+ from airflow.operators.dummy_operator import DummyOperator
# Dag is returned by a factory method
@@ -510,7 +510,7 @@ This SubDAG can then be referenced in your main DAG file:
# main_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
- from airflow.operators import SubDagOperator
+ from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/tutorial.rst
----------------------------------------------------------------------
diff --git a/docs/tutorial.rst b/docs/tutorial.rst
index e9d382b..a93479c 100644
--- a/docs/tutorial.rst
+++ b/docs/tutorial.rst
@@ -18,7 +18,7 @@ complicated, a line by line explanation follows below.
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
- from airflow.operators import BashOperator
+ from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
@@ -100,7 +100,7 @@ Airflow DAG object. Let's start by importing the libraries we will need.
from airflow import DAG
# Operators; we need this to operate!
- from airflow.operators import BashOperator
+ from airflow.operators.bash_operator import BashOperator
Default Arguments
-----------------
@@ -270,7 +270,7 @@ something like this:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
- from airflow.operators import BashOperator
+ from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 9c688ea..002ad30 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -36,8 +36,17 @@ from airflow.executors import SequentialExecutor, LocalExecutor
from airflow.models import Variable
configuration.test_mode()
-from airflow import jobs, models, DAG, operators, hooks, utils, macros, settings, exceptions
-from airflow.hooks import BaseHook
+from airflow import jobs, models, DAG, utils, macros, settings, exceptions
+from airflow.models import BaseOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
+from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.http_operator import SimpleHttpOperator
+from airflow.operators import sensors
+from airflow.hooks.base_hook import BaseHook
+from airflow.hooks.sqlite_hook import SqliteHook
from airflow.bin import cli
from airflow.www import app as application
from airflow.settings import Session
@@ -85,7 +94,7 @@ def reset(dag_id=TEST_DAG_ID):
reset()
-class OperatorSubclass(operators.BaseOperator):
+class OperatorSubclass(BaseOperator):
"""
An operator to test template substitution
"""
@@ -305,7 +314,7 @@ class CoreTest(unittest.TestCase):
assert hash(self.dag) != hash(dag_subclass)
def test_time_sensor(self):
- t = operators.sensors.TimeSensor(
+ t = sensors.TimeSensor(
task_id='time_sensor_check',
target_time=time(0),
dag=self.dag)
@@ -319,14 +328,14 @@ class CoreTest(unittest.TestCase):
captainHook.run("CREATE TABLE operator_test_table (a, b)")
captainHook.run("insert into operator_test_table values (1,2)")
- t = operators.CheckOperator(
+ t = CheckOperator(
task_id='check',
sql="select count(*) from operator_test_table",
conn_id=conn_id,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
- t = operators.ValueCheckOperator(
+ t = ValueCheckOperator(
task_id='value_check',
pass_value=95,
tolerance=0.1,
@@ -350,7 +359,7 @@ class CoreTest(unittest.TestCase):
Tests that Operators reject illegal arguments
"""
with warnings.catch_warnings(record=True) as w:
- t = operators.BashOperator(
+ t = BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=self.dag,
@@ -362,14 +371,14 @@ class CoreTest(unittest.TestCase):
w[0].message.args[0])
def test_bash_operator(self):
- t = operators.BashOperator(
+ t = BashOperator(
task_id='time_sensor_check',
bash_command="echo success",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_bash_operator_multi_byte_output(self):
- t = operators.BashOperator(
+ t = BashOperator(
task_id='test_multi_byte_bash_operator',
bash_command=u"echo \u2600",
dag=self.dag,
@@ -381,7 +390,7 @@ class CoreTest(unittest.TestCase):
if True:
return obj
- t = operators.TriggerDagRunOperator(
+ t = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id='example_bash_operator',
python_callable=trigga,
@@ -389,7 +398,7 @@ class CoreTest(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_dryrun(self):
- t = operators.BashOperator(
+ t = BashOperator(
task_id='time_sensor_check',
bash_command="echo success",
dag=self.dag)
@@ -404,14 +413,14 @@ class CoreTest(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_timedelta_sensor(self):
- t = operators.sensors.TimeDeltaSensor(
+ t = sensors.TimeDeltaSensor(
task_id='timedelta_sensor_check',
delta=timedelta(seconds=2),
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_external_task_sensor(self):
- t = operators.sensors.ExternalTaskSensor(
+ t = sensors.ExternalTaskSensor(
task_id='test_external_task_sensor_check',
external_dag_id=TEST_DAG_ID,
external_task_id='time_sensor_check',
@@ -419,7 +428,7 @@ class CoreTest(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_external_task_sensor_delta(self):
- t = operators.sensors.ExternalTaskSensor(
+ t = sensors.ExternalTaskSensor(
task_id='test_external_task_sensor_check_delta',
external_dag_id=TEST_DAG_ID,
external_task_id='time_sensor_check',
@@ -429,7 +438,7 @@ class CoreTest(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_timeout(self):
- t = operators.PythonOperator(
+ t = PythonOperator(
task_id='test_timeout',
execution_timeout=timedelta(seconds=1),
python_callable=lambda: sleep(5),
@@ -444,7 +453,7 @@ class CoreTest(unittest.TestCase):
if not templates_dict['ds'] == ds:
raise Exception("failure")
- t = operators.PythonOperator(
+ t = PythonOperator(
task_id='test_py_op',
provide_context=True,
python_callable=test_py_op,
@@ -460,7 +469,7 @@ class CoreTest(unittest.TestCase):
task_id='test_complex_template',
some_templated_field={
'foo': '123',
- 'bar': ['baz', ' {{ ds }}']
+ 'bar': ['baz', '{{ ds }}']
},
on_success_callback=verify_templated_field,
dag=self.dag)
@@ -698,7 +707,7 @@ class CoreTest(unittest.TestCase):
def test_bad_trigger_rule(self):
with self.assertRaises(AirflowException):
- operators.DummyOperator(
+ DummyOperator(
task_id='test_bad_trigger',
trigger_rule="non_existant",
dag=self.dag)
@@ -1195,7 +1204,7 @@ class HttpOpSensorTest(unittest.TestCase):
@mock.patch('requests.Session', FakeSession)
def test_get(self):
- t = operators.SimpleHttpOperator(
+ t = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='/search',
@@ -1206,7 +1215,7 @@ class HttpOpSensorTest(unittest.TestCase):
@mock.patch('requests.Session', FakeSession)
def test_get_response_check(self):
- t = operators.SimpleHttpOperator(
+ t = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='/search',
@@ -1218,7 +1227,7 @@ class HttpOpSensorTest(unittest.TestCase):
@mock.patch('requests.Session', FakeSession)
def test_sensor(self):
- sensor = operators.sensors.HttpSensor(
+ sensor = sensors.HttpSensor(
task_id='http_sensor_check',
conn_id='http_default',
endpoint='/search',
@@ -1256,7 +1265,7 @@ class ConnectionTest(unittest.TestCase):
del os.environ[ev]
def test_using_env_var(self):
- c = hooks.SqliteHook.get_connection(conn_id='test_uri')
+ c = SqliteHook.get_connection(conn_id='test_uri')
assert c.host == 'ec2.compute.com'
assert c.schema == 'the_database'
assert c.login == 'username'
@@ -1264,7 +1273,7 @@ class ConnectionTest(unittest.TestCase):
assert c.port == 5432
def test_using_unix_socket_env_var(self):
- c = hooks.SqliteHook.get_connection(conn_id='test_uri_no_creds')
+ c = SqliteHook.get_connection(conn_id='test_uri_no_creds')
assert c.host == 'ec2.compute.com'
assert c.schema == 'the_database'
assert c.login is None
@@ -1282,12 +1291,12 @@ class ConnectionTest(unittest.TestCase):
assert c.port is None
def test_env_var_priority(self):
- c = hooks.SqliteHook.get_connection(conn_id='airflow_db')
+ c = SqliteHook.get_connection(conn_id='airflow_db')
assert c.host != 'ec2.compute.com'
os.environ['AIRFLOW_CONN_AIRFLOW_DB'] = \
'postgres://username:password@ec2.compute.com:5432/the_database'
- c = hooks.SqliteHook.get_connection(conn_id='airflow_db')
+ c = SqliteHook.get_connection(conn_id='airflow_db')
assert c.host == 'ec2.compute.com'
assert c.schema == 'the_database'
assert c.login == 'username'
@@ -1311,15 +1320,21 @@ class WebHDFSHookTest(unittest.TestCase):
assert c.proxy_user == 'someone'
-@unittest.skipUnless("S3Hook" in dir(hooks),
- "Skipping test because S3Hook is not installed")
+try:
+ from airflow.hooks.S3_hook import S3Hook
+except ImportError:
+ S3Hook = None
+
+
+@unittest.skipIf(S3Hook is None,
+ "Skipping test because S3Hook is not installed")
class S3HookTest(unittest.TestCase):
def setUp(self):
configuration.test_mode()
self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
def test_parse_s3_url(self):
- parsed = hooks.S3Hook.parse_s3_url(self.s3_test_url)
+ parsed = S3Hook.parse_s3_url(self.s3_test_url)
self.assertEqual(parsed,
("test", "this/is/not/a-real-key.txt"),
"Incorrect parsing of the s3 url")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_backfill_pooled_tasks.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_backfill_pooled_tasks.py b/tests/dags/test_backfill_pooled_tasks.py
index 306db7d..4b2ba8f 100644
--- a/tests/dags/test_backfill_pooled_tasks.py
+++ b/tests/dags/test_backfill_pooled_tasks.py
@@ -21,7 +21,7 @@ Addresses issue #1225.
from datetime import datetime
from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
dag = DAG(dag_id='test_backfill_pooled_task_dag')
task = DummyOperator(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index ecfa646..8f43b08 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -21,7 +21,9 @@ Addresses issue #1225.
from datetime import datetime
from airflow.models import DAG
-from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
import time
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_scheduler_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py
index ac291e0..224e7c5 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -15,7 +15,7 @@
from datetime import datetime
from airflow.models import DAG
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
DEFAULT_DATE = datetime(2100, 1, 1)
# DAG tests backfill with pooled tasks
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 0619f3d..2f53fbc 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -26,7 +26,7 @@ from airflow.bin import cli
from airflow.executors import DEFAULT_EXECUTOR
from airflow.jobs import BackfillJob, SchedulerJob
from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
-from airflow.operators import DummyOperator
+from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 2aae476..e4f5aa8 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -27,7 +27,9 @@ from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, TaskInstance as TI
from airflow.models import State as ST
from airflow.models import DagModel
-from airflow.operators import DummyOperator, BashOperator, PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.python_operator import PythonOperator
from airflow.utils.state import State
from mock import patch
from nose_parameterized import parameterized
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 0006f60..0a7be23 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -18,7 +18,9 @@ import unittest
import airflow
from airflow.models import DAG, DagBag
-from airflow.operators import BashOperator, DummyOperator, SubDagOperator
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.subdag_operator import SubDagOperator
from airflow.jobs import BackfillJob
from airflow.exceptions import AirflowException
[3/3] incubator-airflow git commit: Merge pull request #1586 from
criccomini/AIRFLOW-200
Posted by cr...@apache.org.
Merge pull request #1586 from criccomini/AIRFLOW-200
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7b382b4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7b382b4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7b382b4e
Branch: refs/heads/master
Commit: 7b382b4e928b96c08890317b2b09d22163882a8a
Parents: be6766a dc84fde
Author: Chris Riccomini <ch...@wepay.com>
Authored: Tue Jun 28 20:18:46 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Jun 28 20:18:46 2016 -0700
----------------------------------------------------------------------
.../example_dags/example_qubole_operator.py | 5 +-
.../contrib/example_dags/example_twitter_dag.py | 3 +-
airflow/contrib/hooks/__init__.py | 22 +--
airflow/contrib/operators/__init__.py | 22 +--
.../operators/bigquery_check_operator.py | 2 +-
airflow/contrib/operators/qubole_operator.py | 2 +-
airflow/contrib/operators/vertica_operator.py | 2 +-
airflow/contrib/operators/vertica_to_hive.py | 2 +-
airflow/example_dags/example_bash_operator.py | 3 +-
airflow/example_dags/example_branch_operator.py | 3 +-
.../example_branch_python_dop_operator_3.py | 3 +-
airflow/example_dags/example_http_operator.py | 2 +-
.../example_passing_params_via_test_command.py | 3 +-
airflow/example_dags/example_python_operator.py | 2 +-
.../example_short_circuit_operator.py | 3 +-
airflow/example_dags/example_skip_dag.py | 2 +-
airflow/example_dags/example_subdag_operator.py | 3 +-
.../example_trigger_controller_dag.py | 2 +-
.../example_dags/example_trigger_target_dag.py | 3 +-
airflow/example_dags/example_xcom.py | 6 +-
airflow/example_dags/subdags/subdag.py | 2 +-
airflow/example_dags/test_utils.py | 2 +-
airflow/example_dags/tutorial.py | 2 +-
airflow/hooks/__init__.py | 31 ++--
airflow/operators/__init__.py | 70 +++------
airflow/operators/check_operator.py | 2 +-
airflow/operators/http_operator.py | 2 +-
airflow/operators/presto_check_operator.py | 2 +-
airflow/operators/sensors.py | 4 +-
airflow/operators/sqlite_operator.py | 2 +-
airflow/utils/helpers.py | 144 +++++++++++++++----
airflow/utils/logging.py | 2 +-
airflow/www/views.py | 3 +-
docs/concepts.rst | 4 +-
docs/tutorial.rst | 6 +-
tests/core.py | 71 +++++----
tests/dags/test_backfill_pooled_tasks.py | 2 +-
tests/dags/test_issue_1225.py | 4 +-
tests/dags/test_scheduler_dags.py | 2 +-
tests/jobs.py | 2 +-
tests/models.py | 4 +-
tests/operators/subdag_operator.py | 4 +-
42 files changed, 258 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7b382b4e/airflow/www/views.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7b382b4e/tests/core.py
----------------------------------------------------------------------