You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:59:14 UTC

[1/3] incubator-airflow git commit: [AIRFLOW-1889] Split sensors into separate files

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e7c118da2 -> 33c720421


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_http_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_http_sensor.py b/tests/sensors/test_http_sensor.py
new file mode 100644
index 0000000..31d60e4
--- /dev/null
+++ b/tests/sensors/test_http_sensor.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+import requests
+from mock import patch
+
+from airflow import DAG, configuration
+from airflow.exceptions import AirflowException, AirflowSensorTimeout
+from airflow.operators.http_operator import SimpleHttpOperator
+from airflow.sensors.http_sensor import HttpSensor
+from airflow.utils.timezone import datetime
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class HttpSensorTests(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG(TEST_DAG_ID, default_args=args)
+
+    @patch("airflow.hooks.http_hook.requests.Session.send")
+    def test_poke_exception(self, mock_session_send):
+        """
+        Exception occurs in poke function should not be ignored.
+        """
+        response = requests.Response()
+        response.status_code = 200
+        mock_session_send.return_value = response
+
+        def resp_check(resp):
+            raise AirflowException('AirflowException raised here!')
+
+        task = HttpSensor(
+            task_id='http_sensor_poke_exception',
+            http_conn_id='http_default',
+            endpoint='',
+            request_params={},
+            response_check=resp_check,
+            timeout=5,
+            poke_interval=1)
+        with self.assertRaisesRegexp(AirflowException, 'AirflowException raised here!'):
+            task.execute(None)
+
+    @patch("airflow.hooks.http_hook.requests.Session.send")
+    def test_head_method(self, mock_session_send):
+        def resp_check(resp):
+            return True
+
+        task = HttpSensor(
+            dag=self.dag,
+            task_id='http_sensor_head_method',
+            http_conn_id='http_default',
+            endpoint='',
+            request_params={},
+            method='HEAD',
+            response_check=resp_check,
+            timeout=5,
+            poke_interval=1)
+
+        task.execute(None)
+
+        args, kwargs = mock_session_send.call_args
+        received_request = args[0]
+
+        prep_request = requests.Request(
+            'HEAD',
+            'https://www.google.com',
+            {}).prepare()
+
+        self.assertEqual(prep_request.url, received_request.url)
+        self.assertTrue(prep_request.method, received_request.method)
+
+    @patch("airflow.hooks.http_hook.requests.Session.send")
+    def test_logging_head_error_request(
+        self,
+        mock_session_send
+    ):
+        def resp_check(resp):
+            return True
+
+        response = requests.Response()
+        response.status_code = 404
+        response.reason = 'Not Found'
+        mock_session_send.return_value = response
+
+        task = HttpSensor(
+            dag=self.dag,
+            task_id='http_sensor_head_method',
+            http_conn_id='http_default',
+            endpoint='',
+            request_params={},
+            method='HEAD',
+            response_check=resp_check,
+            timeout=5,
+            poke_interval=1
+        )
+
+        with mock.patch.object(task.hook.log, 'error') as mock_errors:
+            with self.assertRaises(AirflowSensorTimeout):
+                task.execute(None)
+
+            self.assertTrue(mock_errors.called)
+            mock_errors.assert_called_with('HTTP error: %s', 'Not Found')
+
+
+class FakeSession(object):
+    def __init__(self):
+        self.response = requests.Response()
+        self.response.status_code = 200
+        self.response._content = 'airbnb/airflow'.encode('ascii', 'ignore')
+
+    def send(self, request, **kwargs):
+        return self.response
+
+    def prepare_request(self, request):
+        if 'date' in request.params:
+            self.response._content += (
+                '/' + request.params['date']
+            ).encode('ascii', 'ignore')
+        return self.response
+
+
+class HttpOpSensorTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    @mock.patch('requests.Session', FakeSession)
+    def test_get(self):
+        t = SimpleHttpOperator(
+            task_id='get_op',
+            method='GET',
+            endpoint='/search',
+            data={"client": "ubuntu", "q": "airflow"},
+            headers={},
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    @mock.patch('requests.Session', FakeSession)
+    def test_get_response_check(self):
+        t = SimpleHttpOperator(
+            task_id='get_op',
+            method='GET',
+            endpoint='/search',
+            data={"client": "ubuntu", "q": "airflow"},
+            response_check=lambda response: ("airbnb/airflow" in response.text),
+            headers={},
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    @mock.patch('requests.Session', FakeSession)
+    def test_sensor(self):
+        sensor = HttpSensor(
+            task_id='http_sensor_check',
+            http_conn_id='http_default',
+            endpoint='/search',
+            request_params={"client": "ubuntu", "q": "airflow", 'date': '{{ds}}'},
+            headers={},
+            response_check=lambda response: (
+                "airbnb/airflow/" + DEFAULT_DATE.strftime('%Y-%m-%d')
+                in response.text),
+            poke_interval=5,
+            timeout=15,
+            dag=self.dag)
+        sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_sql_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py
new file mode 100644
index 0000000..8bd3022
--- /dev/null
+++ b/tests/sensors/test_sql_sensor.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from airflow import DAG
+from airflow import configuration
+from airflow.sensors.sql_sensor import SqlSensor
+from airflow.utils.timezone import datetime
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_sql_dag'
+
+
+class HttpSensorTests(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG(TEST_DAG_ID, default_args=args)
+
+    def test_sql_sensor_mysql(self):
+        t = SqlSensor(
+            task_id='sql_sensor_check',
+            conn_id='mysql_default',
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            dag=self.dag
+        )
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_sql_sensor_postgres(self):
+        t = SqlSensor(
+            task_id='sql_sensor_check',
+            conn_id='postgres_default',
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            dag=self.dag
+        )
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_timedelta_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_timedelta_sensor.py b/tests/sensors/test_timedelta_sensor.py
new file mode 100644
index 0000000..9cef69f
--- /dev/null
+++ b/tests/sensors/test_timedelta_sensor.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from datetime import timedelta
+
+from airflow import configuration
+from airflow import models, DAG
+from airflow.sensors.time_delta_sensor import TimeDeltaSensor
+from airflow.utils.timezone import datetime
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+DEV_NULL = '/dev/null'
+TEST_DAG_ID = 'unit_tests'
+
+
+class TimedeltaSensorTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        self.dagbag = models.DagBag(
+            dag_folder=DEV_NULL, include_examples=True)
+        self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+
+    def test_timedelta_sensor(self):
+        t = TimeDeltaSensor(
+            task_id='timedelta_sensor_check',
+            delta=timedelta(seconds=2),
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_timeout_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_timeout_sensor.py b/tests/sensors/test_timeout_sensor.py
new file mode 100644
index 0000000..8dde98f
--- /dev/null
+++ b/tests/sensors/test_timeout_sensor.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+import time
+from datetime import timedelta
+
+from airflow import DAG, configuration
+from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.timezone import datetime
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class TimeoutTestSensor(BaseSensorOperator):
+    """
+    Sensor that always returns the return_value provided
+
+    :param return_value: Set to true to mark the task as SKIPPED on failure
+    :type return_value: any
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 return_value=False,
+                 *args,
+                 **kwargs):
+        self.return_value = return_value
+        super(TimeoutTestSensor, self).__init__(*args, **kwargs)
+
+    def poke(self, context):
+        return self.return_value
+
+    def execute(self, context):
+        started_at = timezone.utcnow()
+        time_jump = self.params.get('time_jump')
+        while not self.poke(context):
+            if time_jump:
+                started_at -= time_jump
+            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
+                if self.soft_fail:
+                    raise AirflowSkipException('Snap. Time is OUT.')
+                else:
+                    raise AirflowSensorTimeout('Snap. Time is OUT.')
+            time.sleep(self.poke_interval)
+        self.log.info("Success criteria met. Exiting.")
+
+
+class SensorTimeoutTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG(TEST_DAG_ID, default_args=args)
+
+    def test_timeout(self):
+        t = TimeoutTestSensor(
+            task_id='test_timeout',
+            execution_timeout=timedelta(days=2),
+            return_value=False,
+            poke_interval=5,
+            params={'time_jump': timedelta(days=2, seconds=1)},
+            dag=self.dag
+        )
+        self.assertRaises(
+            AirflowSensorTimeout,
+            t.run,
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True
+        )


[2/3] incubator-airflow git commit: [AIRFLOW-1889] Split sensors into separate files

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/s3_prefix_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/s3_prefix_sensor.py b/airflow/sensors/s3_prefix_sensor.py
new file mode 100644
index 0000000..e8002f3
--- /dev/null
+++ b/airflow/sensors/s3_prefix_sensor.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+from airflow.utils.decorators import apply_defaults
+
+
+class S3PrefixSensor(BaseSensorOperator):
+    """
+    Waits for a prefix to exist. A prefix is the first part of a key,
+    thus enabling checking of constructs similar to glob airfl* or
+    SQL LIKE 'airfl%'. There is the possibility to precise a delimiter to
+    indicate the hierarchy or keys, meaning that the match will stop at that
+    delimiter. Current code accepts sane delimiters, i.e. characters that
+    are NOT special characters in the Python regex engine.
+
+    :param bucket_name: Name of the S3 bucket
+    :type bucket_name: str
+    :param prefix: The prefix being waited on. Relative path from bucket root level.
+    :type prefix: str
+    :param delimiter: The delimiter intended to show hierarchy.
+        Defaults to '/'.
+    :type delimiter: str
+    """
+    template_fields = ('prefix', 'bucket_name')
+
+    @apply_defaults
+    def __init__(self,
+                 bucket_name,
+                 prefix,
+                 delimiter='/',
+                 aws_conn_id='aws_default',
+                 *args,
+                 **kwargs):
+        super(S3PrefixSensor, self).__init__(*args, **kwargs)
+        # Parse
+        self.bucket_name = bucket_name
+        self.prefix = prefix
+        self.delimiter = delimiter
+        self.full_url = "s3://" + bucket_name + '/' + prefix
+        self.aws_conn_id = aws_conn_id
+
+    def poke(self, context):
+        self.log.info('Poking for prefix : {self.prefix}\n'
+                      'in bucket s3://{self.bucket_name}'.format(**locals()))
+        from airflow.hooks.S3_hook import S3Hook
+        hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        return hook.check_for_prefix(
+            prefix=self.prefix,
+            delimiter=self.delimiter,
+            bucket_name=self.bucket_name)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/sql_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql_sensor.py
new file mode 100644
index 0000000..cb23faa
--- /dev/null
+++ b/airflow/sensors/sql_sensor.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from builtins import str
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SqlSensor(BaseSensorOperator):
+    """
+    Runs a sql statement until a criteria is met. It will keep trying while
+    sql returns no row, or if the first cell in (0, '0', '').
+
+    :param conn_id: The connection to run the sensor against
+    :type conn_id: string
+    :param sql: The sql to run. To pass, it needs to return at least one cell
+        that contains a non-zero / empty string value.
+    """
+    template_fields = ('sql',)
+    template_ext = ('.hql', '.sql',)
+    ui_color = '#7c7287'
+
+    @apply_defaults
+    def __init__(self, conn_id, sql, *args, **kwargs):
+        self.sql = sql
+        self.conn_id = conn_id
+        super(SqlSensor, self).__init__(*args, **kwargs)
+
+    def poke(self, context):
+        hook = BaseHook.get_connection(self.conn_id).get_hook()
+
+        self.log.info('Poking: %s', self.sql)
+        records = hook.get_records(self.sql)
+        if not records:
+            return False
+        else:
+            if str(records[0][0]) in ('0', '',):
+                return False
+            else:
+                return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/time_delta_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/time_delta_sensor.py b/airflow/sensors/time_delta_sensor.py
new file mode 100644
index 0000000..31f75ca
--- /dev/null
+++ b/airflow/sensors/time_delta_sensor.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class TimeDeltaSensor(BaseSensorOperator):
+    """
+    Waits for a timedelta after the task's execution_date + schedule_interval.
+    In Airflow, the daily task stamped with ``execution_date``
+    2016-01-01 can only start running on 2016-01-02. The timedelta here
+    represents the time after the execution period has closed.
+
+    :param delta: time length to wait after execution_date before succeeding
+    :type delta: datetime.timedelta
+    """
+
+    @apply_defaults
+    def __init__(self, delta, *args, **kwargs):
+        super(TimeDeltaSensor, self).__init__(*args, **kwargs)
+        self.delta = delta
+
+    def poke(self, context):
+        dag = context['dag']
+        target_dttm = dag.following_schedule(context['execution_date'])
+        target_dttm += self.delta
+        self.log.info('Checking if the time (%s) has come', target_dttm)
+        return timezone.utcnow() > target_dttm

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/time_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
new file mode 100644
index 0000000..ff10b05
--- /dev/null
+++ b/airflow/sensors/time_sensor.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class TimeSensor(BaseSensorOperator):
+    """
+    Waits until the specified time of the day.
+
+    :param target_time: time after which the job succeeds
+    :type target_time: datetime.time
+    """
+
+    @apply_defaults
+    def __init__(self, target_time, *args, **kwargs):
+        super(TimeSensor, self).__init__(*args, **kwargs)
+        self.target_time = target_time
+
+    def poke(self, context):
+        self.log.info('Checking if the time (%s) has come', self.target_time)
+        return timezone.utcnow().time() > self.target_time

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/web_hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/web_hdfs_sensor.py b/airflow/sensors/web_hdfs_sensor.py
new file mode 100644
index 0000000..df023b7
--- /dev/null
+++ b/airflow/sensors/web_hdfs_sensor.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class WebHdfsSensor(BaseSensorOperator):
+    """
+    Waits for a file or folder to land in HDFS
+    """
+    template_fields = ('filepath',)
+
+    @apply_defaults
+    def __init__(self,
+                 filepath,
+                 webhdfs_conn_id='webhdfs_default',
+                 *args,
+                 **kwargs):
+        super(WebHdfsSensor, self).__init__(*args, **kwargs)
+        self.filepath = filepath
+        self.webhdfs_conn_id = webhdfs_conn_id
+
+    def poke(self, context):
+        from airflow.hooks.webhdfs_hook import WebHDFSHook
+        c = WebHDFSHook(self.webhdfs_conn_id)
+        self.log.info('Poking for file {self.filepath}'.format(**locals()))
+        return c.check_for_path(hdfs_path=self.filepath)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 045e5a4..d74d00e 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -37,7 +37,7 @@ All sensors are derived from ``BaseSensorOperator``. All sensors inherit
 the ``timeout`` and ``poke_interval`` on top of the ``BaseOperator``
 attributes.
 
-.. autoclass:: airflow.operators.sensors.BaseSensorOperator
+.. autoclass:: airflow.operators.base_sensor_operator.BaseSensorOperator
 
 
 Operator API
@@ -54,34 +54,37 @@ Operator API
 .. autoclass:: airflow.operators.docker_operator.DockerOperator
 .. autoclass:: airflow.operators.dummy_operator.DummyOperator
 .. autoclass:: airflow.operators.email_operator.EmailOperator
-.. autoclass:: airflow.operators.sensors.ExternalTaskSensor
 .. autoclass:: airflow.operators.generic_transfer.GenericTransfer
-.. autoclass:: airflow.operators.sensors.HdfsSensor
 .. autoclass:: airflow.operators.hive_to_samba_operator.Hive2SambaOperator
 .. autoclass:: airflow.operators.hive_operator.HiveOperator
-.. autoclass:: airflow.operators.sensors.HivePartitionSensor
 .. autoclass:: airflow.operators.hive_to_druid.HiveToDruidTransfer
 .. autoclass:: airflow.operators.hive_to_mysql.HiveToMySqlTransfer
 .. autoclass:: airflow.operators.http_operator.SimpleHttpOperator
-.. autoclass:: airflow.operators.sensors.HttpSensor
-.. autoclass:: airflow.operators.sensors.MetastorePartitionSensor
 .. autoclass:: airflow.operators.mssql_operator.MsSqlOperator
 .. autoclass:: airflow.operators.mssql_to_hive.MsSqlToHiveTransfer
-.. autoclass:: airflow.operators.sensors.NamedHivePartitionSensor
 .. autoclass:: airflow.operators.postgres_operator.PostgresOperator
 .. autoclass:: airflow.operators.presto_check_operator.PrestoCheckOperator
 .. autoclass:: airflow.operators.presto_check_operator.PrestoIntervalCheckOperator
 .. autoclass:: airflow.operators.presto_check_operator.PrestoValueCheckOperator
 .. autoclass:: airflow.operators.python_operator.PythonOperator
 .. autoclass:: airflow.operators.python_operator.PythonVirtualenvOperator
-.. autoclass:: airflow.operators.sensors.S3KeySensor
 .. autoclass:: airflow.operators.s3_to_hive_operator.S3ToHiveTransfer
 .. autoclass:: airflow.operators.ShortCircuitOperator
 .. autoclass:: airflow.operators.slack_operator.SlackAPIOperator
-.. autoclass:: airflow.operators.sensors.SqlSensor
 .. autoclass:: airflow.operators.subdag_operator.SubDagOperator
-.. autoclass:: airflow.operators.sensors.TimeSensor
-.. autoclass:: airflow.operators.sensors.HdfsSensor
+
+.. autoclass:: airflow.sensors.external_task_sensor.ExternalTaskSensor
+.. autoclass:: airflow.sensors.hdfs_sensor.HdfsSensor
+.. autoclass:: airflow.sensors.hive_partition_sensor.HivePartitionSensor
+.. autoclass:: airflow.sensors.http_sensor.HttpSensor
+.. autoclass:: airflow.sensors.metastore_partition_sensor.MetastorePartitionSensor
+.. autoclass:: airflow.sensors.named_hive_partition_sensor.NamedHivePartitionSensor
+.. autoclass:: airflow.sensors.s3_key_sensor.S3KeySensor
+.. autoclass:: airflow.sensors.s3_prefix_sensor.S3PrefixSensor
+.. autoclass:: airflow.sensors.sql_sensor.SqlSensor
+.. autoclass:: airflow.sensors.time_sensor.TimeSensor
+.. autoclass:: airflow.sensors.time_delta_sensor.TimeDeltaSensor
+.. autoclass:: airflow.sensors.web_hdfs_sensor.WebHdfsSensor
 
 Community-contributed Operators
 '''''''''''''''''''''''''''''''
@@ -91,7 +94,6 @@ Community-contributed Operators
 .. deprecated:: 1.8
  Use :code:`from airflow.operators.bash_operator import BashOperator` instead.
 
-.. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
 .. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
 .. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
@@ -106,14 +108,29 @@ Community-contributed Operators
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
-.. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
 .. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator
 .. autoclass:: airflow.contrib.operators.ssh_operator.SSHOperator
 .. autoclass:: airflow.contrib.operators.vertica_operator.VerticaOperator
 .. autoclass:: airflow.contrib.operators.vertica_to_hive.VerticaToHiveTransfer
+
+.. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
 .. autoclass:: airflow.contrib.sensors.bash_sensor.BashSensor
+.. autoclass:: airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor
+.. autoclass:: airflow.contrib.sensors.datadog_sensor.DatadogSensor
+.. autoclass:: airflow.contrib.sensors.emr_base_sensor.EmrBaseSensor
+.. autoclass:: airflow.contrib.sensors.emr_job_flow_sensor.EmrJobFlowSensor
+.. autoclass:: airflow.contrib.sensors.emr_step_sensor.EmrStepSensor
+.. autoclass:: airflow.contrib.sensors.file_sensor.FileSensor
+.. autoclass:: airflow.contrib.sensors.ftp_sensor.FtpSensor
+.. autoclass:: airflow.contrib.sensors.gcs_sensor.GcsSensor
+.. autoclass:: airflow.contrib.sensors.hdfs_sensor.HdfsSensor
+.. autoclass:: airflow.contrib.sensors.jira_sensor.JiraSensor
+.. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor
+.. autoclass:: airflow.contrib.sensors.qubole_sensor.QuboleSensor
+.. autoclass:: airflow.contrib.sensors.redis_key_sensor.RedisKeySensor
+.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
 
 .. _macros:
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/operators/test_fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_fs_operator.py b/tests/contrib/operators/test_fs_operator.py
deleted file mode 100644
index 2ef4286..0000000
--- a/tests/contrib/operators/test_fs_operator.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-
-from airflow import configuration
-from airflow.settings import Session
-from airflow import models, DAG
-from airflow.contrib.operators.fs_operator import FileSensor
-from airflow.utils.timezone import datetime
-
-TEST_DAG_ID = 'unit_tests'
-DEFAULT_DATE = datetime(2015, 1, 1)
-configuration.load_test_config()
-
-
-def reset(dag_id=TEST_DAG_ID):
-    session = Session()
-    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
-    tis.delete()
-    session.commit()
-    session.close()
-
-
-reset()
-
-
-class FileSensorTest(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-        from airflow.contrib.hooks.fs_hook import FSHook
-        hook = FSHook()
-        args = {
-            'owner': 'airflow',
-            'start_date': DEFAULT_DATE,
-            'provide_context': True
-        }
-        dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args)
-        dag.schedule_interval = '@once'
-        self.hook = hook
-        self.dag = dag
-
-    def test_simple(self):
-        task = FileSensor(
-            task_id="test",
-            filepath="etc/hosts",
-            fs_conn_id='fs_default',
-            _hook=self.hook,
-            dag=self.dag,
-        )
-        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py b/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
index a5c9e66..cec24f2 100644
--- a/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
+++ b/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
@@ -14,6 +14,7 @@
 #
 
 import unittest
+
 import boto3
 
 from airflow import configuration

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_bash_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_bash_sensor.py b/tests/contrib/sensors/test_bash_sensor.py
index 66b18e4..5beab54 100644
--- a/tests/contrib/sensors/test_bash_sensor.py
+++ b/tests/contrib/sensors/test_bash_sensor.py
@@ -13,9 +13,10 @@
 # limitations under the License.
 #
 
-import datetime
 import unittest
 
+import datetime
+
 from airflow import DAG, configuration
 from airflow.contrib.sensors.bash_sensor import BashSensor
 from airflow.exceptions import AirflowSensorTimeout

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_datadog_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_datadog_sensor.py b/tests/contrib/sensors/test_datadog_sensor.py
index d845c54..8ceb7ca 100644
--- a/tests/contrib/sensors/test_datadog_sensor.py
+++ b/tests/contrib/sensors/test_datadog_sensor.py
@@ -14,13 +14,13 @@
 
 import json
 import unittest
+
 from mock import patch
 
 from airflow import configuration
-from airflow.utils import db
 from airflow import models
 from airflow.contrib.sensors.datadog_sensor import DatadogSensor
-
+from airflow.utils import db
 
 at_least_one_event = [{'alert_type': 'info',
                        'comments': [],
@@ -102,5 +102,6 @@ class TestDatadogSensor(unittest.TestCase):
 
         self.assertFalse(sensor.poke({}))
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py
index 970d189..587383b 100644
--- a/tests/contrib/sensors/test_emr_base_sensor.py
+++ b/tests/contrib/sensors/test_emr_base_sensor.py
@@ -15,8 +15,8 @@
 import unittest
 
 from airflow import configuration
-from airflow.exceptions import AirflowException
 from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+from airflow.exceptions import AirflowException
 
 
 class TestEmrBaseSensor(unittest.TestCase):
@@ -92,7 +92,6 @@ class TestEmrBaseSensor(unittest.TestCase):
 
         self.assertEqual(operator.poke(None), False)
 
-
     def test_poke_raises_error_when_job_has_failed(self):
         class EmrBaseSensorSubclass(EmrBaseSensor):
             NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
@@ -115,7 +114,6 @@ class TestEmrBaseSensor(unittest.TestCase):
         )
 
         with self.assertRaises(AirflowException) as context:
-
             operator.poke(None)
 
         self.assertIn('EMR job failed', str(context.exception))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_file_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_file_sensor.py b/tests/contrib/sensors/test_file_sensor.py
new file mode 100644
index 0000000..7bc559d
--- /dev/null
+++ b/tests/contrib/sensors/test_file_sensor.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow import configuration
+from airflow import models, DAG
+from airflow.contrib.sensors.file_sensor import FileSensor
+from airflow.settings import Session
+from airflow.utils.timezone import datetime
+
+TEST_DAG_ID = 'unit_tests'
+DEFAULT_DATE = datetime(2015, 1, 1)
+configuration.load_test_config()
+
+
+def reset(dag_id=TEST_DAG_ID):
+    session = Session()
+    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+    tis.delete()
+    session.commit()
+    session.close()
+
+
+reset()
+
+
+class FileSensorTest(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        from airflow.contrib.hooks.fs_hook import FSHook
+        hook = FSHook()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE,
+            'provide_context': True
+        }
+        dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
+        dag.schedule_interval = '@once'
+        self.hook = hook
+        self.dag = dag
+
+    def test_simple(self):
+        task = FileSensor(
+            task_id="test",
+            filepath="etc/hosts",
+            fs_conn_id='fs_default',
+            _hook=self.hook,
+            dag=self.dag,
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_ftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_ftp_sensor.py b/tests/contrib/sensors/test_ftp_sensor.py
index 50f8b8b..f3aff9b 100644
--- a/tests/contrib/sensors/test_ftp_sensor.py
+++ b/tests/contrib/sensors/test_ftp_sensor.py
@@ -13,8 +13,8 @@
 # limitations under the License.
 
 import unittest
-from ftplib import error_perm
 
+from ftplib import error_perm
 from mock import MagicMock
 
 from airflow.contrib.hooks.ftp_hook import FTPHook

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
index a76a6c4..cd191f8 100644
--- a/tests/contrib/sensors/test_hdfs_sensors.py
+++ b/tests/contrib/sensors/test_hdfs_sensors.py
@@ -12,10 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import sys
 import unittest
+
 import re
 from datetime import timedelta
+
 from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex
 from airflow.exceptions import AirflowSensorTimeout
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_jira_sensor_test.py b/tests/contrib/sensors/test_jira_sensor_test.py
index 7c16188..4390f34 100644
--- a/tests/contrib/sensors/test_jira_sensor_test.py
+++ b/tests/contrib/sensors/test_jira_sensor_test.py
@@ -19,13 +19,13 @@ from mock import Mock
 from mock import patch
 
 from airflow import DAG, configuration
-from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
 from airflow import models
+from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
 from airflow.utils import db, timezone
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 jira_client_mock = Mock(
-        name="jira_client_for_test"
+    name="jira_client_for_test"
 )
 
 minimal_test_ticket = {
@@ -52,10 +52,10 @@ class TestJiraSensor(unittest.TestCase):
         dag = DAG('test_dag_id', default_args=args)
         self.dag = dag
         db.merge_conn(
-                models.Connection(
-                        conn_id='jira_default', conn_type='jira',
-                        host='https://localhost/jira/', port=443,
-                        extra='{"verify": "False", "project": "AIRFLOW"}'))
+            models.Connection(
+                conn_id='jira_default', conn_type='jira',
+                host='https://localhost/jira/', port=443,
+                extra='{"verify": "False", "project": "AIRFLOW"}'))
 
     @patch("airflow.contrib.hooks.jira_hook.JIRA",
            autospec=True, return_value=jira_client_mock)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_pubsub_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_pubsub_sensor.py b/tests/contrib/sensors/test_pubsub_sensor.py
index ae59bb7..dcbbf6e 100644
--- a/tests/contrib/sensors/test_pubsub_sensor.py
+++ b/tests/contrib/sensors/test_pubsub_sensor.py
@@ -14,9 +14,10 @@
 
 from __future__ import unicode_literals
 
-from base64 import b64encode as b64e
 import unittest
 
+from base64 import b64encode as b64e
+
 from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor
 from airflow.exceptions import AirflowSensorTimeout
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_qubole_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_qubole_sensor.py b/tests/contrib/sensors/test_qubole_sensor.py
index 035b231..88c5be6 100644
--- a/tests/contrib/sensors/test_qubole_sensor.py
+++ b/tests/contrib/sensors/test_qubole_sensor.py
@@ -14,15 +14,14 @@
 #
 
 import unittest
-from mock import patch
 
 from datetime import datetime
+from mock import patch
 
+from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, QubolePartitionSensor
+from airflow.exceptions import AirflowException
 from airflow.models import DAG, Connection
 from airflow.utils import db
-from airflow.exceptions import AirflowException
-
-from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, QubolePartitionSensor
 
 DAG_ID = "qubole_test_dag"
 TASK_ID = "test_task"
@@ -39,14 +38,10 @@ class QuboleSensorTest(unittest.TestCase):
     @patch('airflow.contrib.sensors.qubole_sensor.QuboleFileSensor.poke')
     def test_file_sensore(self, patched_poke):
         patched_poke.return_value = True
-
         sensor = QuboleFileSensor(
             task_id='test_qubole_file_sensor',
-            data={"files":
-                    ["s3://some_bucket/some_file"]
-            }
+            data={"files": ["s3://some_bucket/some_file"]}
         )
-
         self.assertTrue(sensor.poke({}))
 
     @patch('airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor.poke')
@@ -55,12 +50,11 @@ class QuboleSensorTest(unittest.TestCase):
 
         sensor = QubolePartitionSensor(
             task_id='test_qubole_partition_sensor',
-            data={"schema":"default",
-                  "table":"my_partitioned_table",
-                  "columns":[
-                      {"column" : "month", "values" : ["1", "2"]},
-                  ]
-            },
+            data={
+                "schema": "default",
+                "table": "my_partitioned_table",
+                "columns": [{"column": "month", "values": ["1", "2"]}]
+            }
         )
 
         self.assertTrue(sensor.poke({}))
@@ -72,14 +66,13 @@ class QuboleSensorTest(unittest.TestCase):
         dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
 
         with self.assertRaises(AirflowException):
-            sensor = QubolePartitionSensor(
+            QubolePartitionSensor(
                 task_id='test_qubole_partition_sensor',
                 poke_interval=1,
-                data={"schema":"default",
-                      "table":"my_partitioned_table",
-                      "columns":[
-                          {"column" : "month", "values" : ["1", "2"]},
-                      ]
-                      },
+                data={
+                    "schema": "default",
+                    "table": "my_partitioned_table",
+                    "columns": [{"column": "month", "values": ["1", "2"]}]
+                },
                 dag=dag
-            )
\ No newline at end of file
+            )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_wasb_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_wasb_sensor.py b/tests/contrib/sensors/test_wasb_sensor.py
index a26ba2d..a11d740 100644
--- a/tests/contrib/sensors/test_wasb_sensor.py
+++ b/tests/contrib/sensors/test_wasb_sensor.py
@@ -13,9 +13,10 @@
 # limitations under the License.
 #
 
-import datetime
 import unittest
 
+import datetime
+
 from airflow import DAG, configuration
 from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor
 from airflow.contrib.sensors.wasb_sensor import WasbPrefixSensor
@@ -30,7 +31,6 @@ except ImportError:
 
 
 class TestWasbBlobSensor(unittest.TestCase):
-
     _config = {
         'container_name': 'container',
         'blob_name': 'blob',
@@ -83,7 +83,6 @@ class TestWasbBlobSensor(unittest.TestCase):
 
 
 class TestWasbPrefixSensor(unittest.TestCase):
-
     _config = {
         'container_name': 'container',
         'prefix': 'prefix',
@@ -134,5 +133,6 @@ class TestWasbPrefixSensor(unittest.TestCase):
             'container', 'prefix', timeout=2
         )
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0778628..f25d0e7 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -14,28 +14,27 @@
 
 from __future__ import print_function
 
+import json
+import unittest
+
 import bleach
 import doctest
-import json
-import logging
+import mock
+import multiprocessing
 import os
 import re
-import unittest
-import multiprocessing
-import mock
-from numpy.testing import assert_array_almost_equal
+import signal
+import sqlalchemy
 import tempfile
-from datetime import time, timedelta
-from email.mime.multipart import MIMEMultipart
+import warnings
+from datetime import timedelta
+from dateutil.relativedelta import relativedelta
 from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
 from freezegun import freeze_time
-import signal
+from numpy.testing import assert_array_almost_equal
 from six.moves.urllib.parse import urlencode
 from time import sleep
-import warnings
-
-from dateutil.relativedelta import relativedelta
-import sqlalchemy
 
 from airflow import configuration
 from airflow.executors import SequentialExecutor
@@ -49,8 +48,7 @@ from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.operators.python_operator import PythonOperator
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.http_operator import SimpleHttpOperator
-from airflow.operators import sensors
+
 from airflow.hooks.base_hook import BaseHook
 from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.bin import cli
@@ -77,7 +75,6 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
 TEST_DAG_ID = 'unit_tests'
 
-
 try:
     import cPickle as pickle
 except ImportError:
@@ -111,7 +108,6 @@ class OperatorSubclass(BaseOperator):
 
 
 class CoreTest(unittest.TestCase):
-
     # These defaults make the test faster to run
     default_scheduler_args = {"file_process_interval": 0,
                               "processor_poll_interval": 0.5,
@@ -122,8 +118,7 @@ class CoreTest(unittest.TestCase):
         self.dagbag = models.DagBag(
             dag_folder=DEV_NULL, include_examples=True)
         self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
-        dag = DAG(TEST_DAG_ID, default_args=self.args)
-        self.dag = dag
+        self.dag = DAG(TEST_DAG_ID, default_args=self.args)
         self.dag_bash = self.dagbag.dags['example_bash_operator']
         self.runme_0 = self.dag_bash.get_task('runme_0')
         self.run_after_loop = self.dag_bash.get_task('run_after_loop')
@@ -144,9 +139,12 @@ class CoreTest(unittest.TestCase):
         self.assertEqual(dag.dag_id, dag_run.dag_id)
         self.assertIsNotNone(dag_run.run_id)
         self.assertNotEqual('', dag_run.run_id)
-        self.assertEqual(datetime(2015, 1, 2, 0, 0), dag_run.execution_date, msg=
-            'dag_run.execution_date did not match expectation: {0}'
-                .format(dag_run.execution_date))
+        self.assertEqual(
+            datetime(2015, 1, 2, 0, 0),
+            dag_run.execution_date,
+            msg='dag_run.execution_date did not match expectation: {0}'
+            .format(dag_run.execution_date)
+        )
         self.assertEqual(State.RUNNING, dag_run.state)
         self.assertFalse(dag_run.external_trigger)
         dag.clear()
@@ -175,9 +173,12 @@ class CoreTest(unittest.TestCase):
         self.assertEqual(dag.dag_id, dag_run.dag_id)
         self.assertIsNotNone(dag_run.run_id)
         self.assertNotEqual('', dag_run.run_id)
-        self.assertEqual(DEFAULT_DATE + delta, dag_run.execution_date, msg=
-            'dag_run.execution_date did not match expectation: {0}'
-                .format(dag_run.execution_date))
+        self.assertEqual(
+            DEFAULT_DATE + delta,
+            dag_run.execution_date,
+            msg='dag_run.execution_date did not match expectation: {0}'
+            .format(dag_run.execution_date)
+        )
         self.assertEqual(State.RUNNING, dag_run.state)
         self.assertFalse(dag_run.external_trigger)
 
@@ -348,13 +349,6 @@ class CoreTest(unittest.TestCase):
         self.assertNotEqual(hash(dag_diff_name), hash(self.dag))
         self.assertNotEqual(hash(dag_subclass), hash(self.dag))
 
-    def test_time_sensor(self):
-        t = sensors.TimeSensor(
-            task_id='time_sensor_check',
-            target_time=time(0),
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
     def test_check_operators(self):
 
         conn_id = "sqlite_default"
@@ -407,7 +401,7 @@ class CoreTest(unittest.TestCase):
 
     def test_bash_operator(self):
         t = BashOperator(
-            task_id='time_sensor_check',
+            task_id='test_bash_operator',
             bash_command="echo success",
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@@ -421,7 +415,6 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_bash_operator_kill(self):
-        import subprocess
         import psutil
         sleep_time = "100%d" % os.getpid()
         t = BashOperator(
@@ -456,7 +449,7 @@ class CoreTest(unittest.TestCase):
 
     def test_dryrun(self):
         t = BashOperator(
-            task_id='time_sensor_check',
+            task_id='test_dryrun',
             bash_command="echo success",
             dag=self.dag)
         t.dry_run()
@@ -469,70 +462,6 @@ class CoreTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-    def test_timedelta_sensor(self):
-        t = sensors.TimeDeltaSensor(
-            task_id='timedelta_sensor_check',
-            delta=timedelta(seconds=2),
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-    def test_external_task_sensor(self):
-        t = sensors.ExternalTaskSensor(
-            task_id='test_external_task_sensor_check',
-            external_dag_id=TEST_DAG_ID,
-            external_task_id='time_sensor_check',
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-    def test_external_task_sensor_delta(self):
-        t = sensors.ExternalTaskSensor(
-            task_id='test_external_task_sensor_check_delta',
-            external_dag_id=TEST_DAG_ID,
-            external_task_id='time_sensor_check',
-            execution_delta=timedelta(0),
-            allowed_states=['success'],
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-    def test_external_task_sensor_fn(self):
-        self.test_time_sensor()
-        # check that the execution_fn works
-        t = sensors.ExternalTaskSensor(
-            task_id='test_external_task_sensor_check_delta',
-            external_dag_id=TEST_DAG_ID,
-            external_task_id='time_sensor_check',
-            execution_date_fn=lambda dt: dt + timedelta(0),
-            allowed_states=['success'],
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-        # double check that the execution is being called by failing the test
-        t2 = sensors.ExternalTaskSensor(
-            task_id='test_external_task_sensor_check_delta',
-            external_dag_id=TEST_DAG_ID,
-            external_task_id='time_sensor_check',
-            execution_date_fn=lambda dt: dt + timedelta(days=1),
-            allowed_states=['success'],
-            timeout=1,
-            poke_interval=1,
-            dag=self.dag)
-        with self.assertRaises(exceptions.AirflowSensorTimeout):
-            t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-    def test_external_task_sensor_error_delta_and_fn(self):
-        """
-        Test that providing execution_delta and a function raises an error
-        """
-        with self.assertRaises(ValueError):
-            t = sensors.ExternalTaskSensor(
-                task_id='test_external_task_sensor_check_delta',
-                external_dag_id=TEST_DAG_ID,
-                external_task_id='time_sensor_check',
-                execution_delta=timedelta(0),
-                execution_date_fn=lambda dt: dt,
-                allowed_states=['success'],
-                dag=self.dag)
-
     def test_timeout(self):
         t = PythonOperator(
             task_id='test_timeout',
@@ -576,7 +505,7 @@ class CoreTest(unittest.TestCase):
         Test the availability of variables in templates
         """
         val = {
-            'success':False,
+            'success': False,
             'test_value': 'a test value'
         }
         Variable.set("a_variable", val['test_value'])
@@ -645,9 +574,11 @@ class CoreTest(unittest.TestCase):
         """
         Test templates can handle objects with no sense of truthiness
         """
+
         class NonBoolObject(object):
             def __len__(self):
                 return NotImplemented
+
             def __bool__(self):
                 return NotImplemented
 
@@ -693,13 +624,13 @@ class CoreTest(unittest.TestCase):
     def test_get_non_existing_var_should_return_default(self):
         default_value = "some default val"
         self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
-                                             default_var=default_value))
+                                                     default_var=default_value))
 
     def test_get_non_existing_var_should_not_deserialize_json_default(self):
         default_value = "}{ this is a non JSON default }{"
         self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
-                                             default_var=default_value,
-                                             deserialize_json=True))
+                                                     default_var=default_value,
+                                                     deserialize_json=True))
 
     def test_variable_setdefault_round_trip(self):
         key = "tested_var_setdefault_1_id"
@@ -900,14 +831,12 @@ class CoreTest(unittest.TestCase):
         ti.refresh_from_db(session=session)
         # making sure it's actually running
         self.assertEqual(State.RUNNING, ti.state)
-        ti = (
-            session.query(TI)
-            .filter_by(
-                dag_id=task.dag_id,
-                task_id=task.task_id,
-                execution_date=DEFAULT_DATE)
-            .one()
-        )
+        ti = session.query(TI).filter_by(
+            dag_id=task.dag_id,
+            task_id=task.task_id,
+            execution_date=DEFAULT_DATE
+        ).one()
+
         # deleting the instance should result in a failure
         session.delete(ti)
         session.commit()
@@ -986,7 +915,7 @@ class CoreTest(unittest.TestCase):
 
         run2 = self.dag_bash.create_dagrun(
             run_id="run2",
-            execution_date=DEFAULT_DATE+timedelta(days=1),
+            execution_date=DEFAULT_DATE + timedelta(days=1),
             state=State.RUNNING)
 
         models.DagStat.update([self.dag_bash.dag_id], session=session)
@@ -1277,8 +1206,7 @@ class CliTests(unittest.TestCase):
         # Check deletions
         for index in range(1, 7):
             conn_id = 'new%s' % index
-            result = (session
-                      .query(models.Connection)
+            result = (session.query(models.Connection)
                       .filter(models.Connection.conn_id == conn_id)
                       .first())
 
@@ -1625,20 +1553,27 @@ class SecurityTests(unittest.TestCase):
         session.add(chart2)
         session.add(chart3)
         session.commit()
-        chart1_id = session.query(Chart).filter(Chart.label=='insecure_chart').first().id
+        chart1 = session.query(Chart).filter(Chart.label == 'insecure_chart').first()
         with self.assertRaises(SecurityError):
-            response = self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart1_id))
-        chart2_id = session.query(Chart).filter(Chart.label=="{{ ''.__class__.__mro__[1].__subclasses__() }}").first().id
+            self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart1.id))
+
+        chart2 = session.query(Chart).filter(
+            Chart.label == "{{ ''.__class__.__mro__[1].__subclasses__() }}"
+        ).first()
         with self.assertRaises(SecurityError):
-            response = self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart2_id))
-        chart3_id = session.query(Chart).filter(Chart.label=="{{ subprocess.check_output('ls') }}").first().id
+            self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart2.id))
+
+        chart3 = session.query(Chart).filter(
+            Chart.label == "{{ subprocess.check_output('ls') }}"
+        ).first()
         with self.assertRaises(UndefinedError):
-            response = self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart3_id))
+            self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart3.id))
 
     def tearDown(self):
         configuration.conf.set("webserver", "expose_config", "False")
         self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
 
+
 class WebUiTests(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()
@@ -1688,7 +1623,7 @@ class WebUiTests(unittest.TestCase):
         url = "/admin/airflow/graph?" + urlencode({
             "dag_id": self.dag_bash2.dag_id,
             "execution_date": self.dagrun_bash2.execution_date,
-            }).replace("&", "&")
+        }).replace("&", "&")
         self.assertIn(url, resp_html)
         self.assertIn(self.dagrun_bash2.execution_date.strftime("%Y-%m-%d %H:%M"), resp_html)
 
@@ -1697,8 +1632,8 @@ class WebUiTests(unittest.TestCase):
         self.assertIn("Ad Hoc Query", response.data.decode('utf-8'))
         response = self.app.post(
             "/admin/queryview/", data=dict(
-            conn_id="airflow_db",
-            sql="SELECT+COUNT%281%29+as+TEST+FROM+task_instance"))
+                conn_id="airflow_db",
+                sql="SELECT+COUNT%281%29+as+TEST+FROM+task_instance"))
         self.assertIn("TEST", response.data.decode('utf-8'))
 
     def test_health(self):
@@ -2061,69 +1996,6 @@ class LdapGroupTest(unittest.TestCase):
         configuration.conf.set("webserver", "authenticate", "False")
 
 
-class FakeSession(object):
-    def __init__(self):
-        from requests import Response
-        self.response = Response()
-        self.response.status_code = 200
-        self.response._content = 'airbnb/airflow'.encode('ascii', 'ignore')
-
-    def send(self, request, **kwargs):
-        return self.response
-
-    def prepare_request(self, request):
-        if 'date' in request.params:
-            self.response._content += (
-                '/' + request.params['date']).encode('ascii', 'ignore')
-        return self.response
-
-class HttpOpSensorTest(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
-        dag = DAG(TEST_DAG_ID, default_args=args)
-        self.dag = dag
-
-    @mock.patch('requests.Session', FakeSession)
-    def test_get(self):
-        t = SimpleHttpOperator(
-            task_id='get_op',
-            method='GET',
-            endpoint='/search',
-            data={"client": "ubuntu", "q": "airflow"},
-            headers={},
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-    @mock.patch('requests.Session', FakeSession)
-    def test_get_response_check(self):
-        t = SimpleHttpOperator(
-            task_id='get_op',
-            method='GET',
-            endpoint='/search',
-            data={"client": "ubuntu", "q": "airflow"},
-            response_check=lambda response: ("airbnb/airflow" in response.text),
-            headers={},
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-    @mock.patch('requests.Session', FakeSession)
-    def test_sensor(self):
-
-        sensor = sensors.HttpSensor(
-            task_id='http_sensor_check',
-            http_conn_id='http_default',
-            endpoint='/search',
-            request_params={"client": "ubuntu", "q": "airflow", 'date': '{{ds}}'},
-            headers={},
-            response_check=lambda response: (
-                "airbnb/airflow/" + DEFAULT_DATE.strftime('%Y-%m-%d')
-                in response.text),
-            poke_interval=5,
-            timeout=15,
-            dag=self.dag)
-        sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
 class FakeWebHDFSHook(object):
     def __init__(self, conn_id):
         self.conn_id = conn_id
@@ -2154,45 +2026,126 @@ class FakeSnakeBiteClient(object):
         if path[0] == '/datadirectory/empty_directory' and not include_toplevel:
             return []
         elif path[0] == '/datadirectory/datafile':
-            return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/datafile'}]
+            return [{
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 0,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/datafile'
+            }]
         elif path[0] == '/datadirectory/empty_directory' and include_toplevel:
-            return [
-                {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 'access_time': 0, 'block_replication': 0,
-                 'modification_time': 1481132141540, 'length': 0, 'blocksize': 0, 'owner': u'hdfs',
-                 'path': '/datadirectory/empty_directory'}]
+            return [{
+                'group': u'supergroup',
+                'permission': 493,
+                'file_type': 'd',
+                'access_time': 0,
+                'block_replication': 0,
+                'modification_time': 1481132141540,
+                'length': 0,
+                'blocksize': 0,
+                'owner': u'hdfs',
+                'path': '/datadirectory/empty_directory'
+            }]
         elif path[0] == '/datadirectory/not_empty_directory' and include_toplevel:
-            return [
-                {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 'access_time': 0, 'block_replication': 0,
-                 'modification_time': 1481132141540, 'length': 0, 'blocksize': 0, 'owner': u'hdfs',
-                 'path': '/datadirectory/empty_directory'},
-                {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728,
-                 'owner': u'hdfs', 'path': '/datadirectory/not_empty_directory/test_file'}]
+            return [{
+                'group': u'supergroup',
+                'permission': 493,
+                'file_type': 'd',
+                'access_time': 0,
+                'block_replication': 0,
+                'modification_time': 1481132141540,
+                'length': 0,
+                'blocksize': 0,
+                'owner': u'hdfs',
+                'path': '/datadirectory/empty_directory'
+            }, {
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 0,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/not_empty_directory/test_file'
+            }]
         elif path[0] == '/datadirectory/not_empty_directory':
-            return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/not_empty_directory/test_file'}]
+            return [{
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 0,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/not_empty_directory/test_file'
+            }]
         elif path[0] == '/datadirectory/not_existing_file_or_directory':
             raise FakeSnakeBiteClientException
         elif path[0] == '/datadirectory/regex_dir':
-            return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test1file'},
-                    {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test2file'},
-                    {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test3file'},
-                    {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/regex_dir/copying_file_1.txt._COPYING_'},
-                    {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
-                     'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
-                     'owner': u'hdfs', 'path': '/datadirectory/regex_dir/copying_file_3.txt.sftp'}
-                    ]
+            return [{
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862, 'length': 12582912,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/regex_dir/test1file'
+            }, {
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 12582912,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/regex_dir/test2file'
+            }, {
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 12582912,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/regex_dir/test3file'
+            }, {
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 12582912,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/regex_dir/copying_file_1.txt._COPYING_'
+            }, {
+                'group': u'supergroup',
+                'permission': 420,
+                'file_type': 'f',
+                'access_time': 1481122343796,
+                'block_replication': 3,
+                'modification_time': 1481122343862,
+                'length': 12582912,
+                'blocksize': 134217728,
+                'owner': u'hdfs',
+                'path': '/datadirectory/regex_dir/copying_file_3.txt.sftp'
+            }]
         else:
             raise FakeSnakeBiteClientException
 
@@ -2424,7 +2377,10 @@ class EmailTest(unittest.TestCase):
     def test_custom_backend(self, mock_send_email):
         configuration.set('email', 'EMAIL_BACKEND', 'tests.core.send_email_test')
         utils.email.send_email('to', 'subject', 'content')
-        send_email_test.assert_called_with('to', 'subject', 'content', files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed')
+        send_email_test.assert_called_with(
+            'to', 'subject', 'content', files=None, dryrun=False,
+            cc=None, bcc=None, mime_subtype='mixed'
+        )
         self.assertFalse(mock_send_email.called)
 
 
@@ -2447,7 +2403,7 @@ class EmailSmtpTest(unittest.TestCase):
         self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
         self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"',
-            msg.get_payload()[-1].get(u'Content-Disposition'))
+                         msg.get_payload()[-1].get(u'Content-Disposition'))
         mimeapp = MIMEApplication('attachment')
         self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload())
 
@@ -2466,11 +2422,10 @@ class EmailSmtpTest(unittest.TestCase):
         self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
         self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"',
-            msg.get_payload()[-1].get(u'Content-Disposition'))
+                         msg.get_payload()[-1].get(u'Content-Disposition'))
         mimeapp = MIMEApplication('attachment')
         self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload())
 
-
     @mock.patch('smtplib.SMTP_SSL')
     @mock.patch('smtplib.SMTP')
     def test_send_mime(self, mock_smtp, mock_smtp_ssl):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py
index e6f6830..77d415c 100644
--- a/tests/operators/__init__.py
+++ b/tests/operators/__init__.py
@@ -15,7 +15,6 @@
 from .docker_operator import *
 from .subdag_operator import *
 from .operators import *
-from .sensors import *
 from .hive_operator import *
 from .s3_to_hive_operator import *
 from .python_operator import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 40f0ffd..ae0bec8 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -105,14 +105,6 @@ class MySqlTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-    def test_sql_sensor(self):
-        t = operators.sensors.SqlSensor(
-            task_id='sql_sensor_check',
-            conn_id='mysql_default',
-            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
     def test_overwrite_schema(self):
         """
         Verifies option to overwrite connection schema
@@ -191,14 +183,6 @@ class PostgresTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-    def test_sql_sensor(self):
-        t = operators.sensors.SqlSensor(
-            task_id='sql_sensor_check',
-            conn_id='postgres_default',
-            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
-            dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
     def test_vacuum(self):
         """
         Verifies the VACUUM operation runs well with the PostgresOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
deleted file mode 100644
index d60f15c..0000000
--- a/tests/operators/sensors.py
+++ /dev/null
@@ -1,382 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-import sys
-import time
-import unittest
-from datetime import timedelta
-from mock import patch
-
-from airflow import DAG, configuration, settings
-from airflow.exceptions import (AirflowException,
-                                AirflowSensorTimeout,
-                                AirflowSkipException)
-from airflow.models import TaskInstance
-from airflow.operators.bash_operator import BashOperator
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor, ExternalTaskSensor
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.state import State
-from airflow.utils import timezone
-from airflow.utils.timezone import datetime
-
-try:
-    from unittest import mock
-except ImportError:
-    try:
-        import mock
-    except ImportError:
-        mock = None
-
-configuration.load_test_config()
-
-DEFAULT_DATE = datetime(2015, 1, 1)
-TEST_DAG_ID = 'unit_test_dag'
-
-
-class TimeoutTestSensor(BaseSensorOperator):
-    """
-    Sensor that always returns the return_value provided
-
-    :param return_value: Set to true to mark the task as SKIPPED on failure
-    :type return_value: any
-    """
-
-    @apply_defaults
-    def __init__(
-            self,
-            return_value=False,
-            *args,
-            **kwargs):
-        self.return_value = return_value
-        super(TimeoutTestSensor, self).__init__(*args, **kwargs)
-
-    def poke(self, context):
-        return self.return_value
-
-    def execute(self, context):
-        started_at = timezone.utcnow()
-        time_jump = self.params.get('time_jump')
-        while not self.poke(context):
-            if time_jump:
-                started_at -= time_jump
-            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
-                if self.soft_fail:
-                    raise AirflowSkipException('Snap. Time is OUT.')
-                else:
-                    raise AirflowSensorTimeout('Snap. Time is OUT.')
-            time.sleep(self.poke_interval)
-        self.log.info("Success criteria met. Exiting.")
-
-
-class SensorTimeoutTest(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-        args = {
-            'owner': 'airflow',
-            'start_date': DEFAULT_DATE
-        }
-        dag = DAG(TEST_DAG_ID, default_args=args)
-        self.dag = dag
-
-    def test_timeout(self):
-        t = TimeoutTestSensor(
-            task_id='test_timeout',
-            execution_timeout=timedelta(days=2),
-            return_value=False,
-            poke_interval=5,
-            params={'time_jump': timedelta(days=2, seconds=1)},
-            dag=self.dag
-            )
-        self.assertRaises(
-            AirflowSensorTimeout,
-            t.run,
-            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-
-class HttpSensorTests(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-        args = {
-            'owner': 'airflow',
-            'start_date': DEFAULT_DATE
-        }
-        dag = DAG(TEST_DAG_ID, default_args=args)
-        self.dag = dag
-
-    def test_poke_exception(self):
-        """
-        Exception occurs in poke function should not be ignored.
-        """
-        def resp_check(resp):
-            raise AirflowException('AirflowException raised here!')
-
-        task = HttpSensor(
-            task_id='http_sensor_poke_exception',
-            http_conn_id='http_default',
-            endpoint='',
-            request_params={},
-            response_check=resp_check,
-            poke_interval=5)
-        with self.assertRaisesRegexp(AirflowException, 'AirflowException raised here!'):
-            task.execute(None)
-
-    @patch("airflow.hooks.http_hook.requests.Session.send")
-    def test_head_method(self, mock_session_send):
-        def resp_check(resp):
-            return True
-
-        task = HttpSensor(
-            dag=self.dag,
-            task_id='http_sensor_head_method',
-            http_conn_id='http_default',
-            endpoint='',
-            request_params={},
-            method='HEAD',
-            response_check=resp_check,
-            timeout=5,
-            poke_interval=1)
-
-        import requests
-        task.execute(None)
-
-        args, kwargs = mock_session_send.call_args
-        received_request = args[0]
-
-        prep_request = requests.Request(
-            'HEAD',
-            'https://www.google.com',
-            {}).prepare()
-
-        self.assertEqual(prep_request.url, received_request.url)
-        self.assertTrue(prep_request.method, received_request.method)
-
-    @patch("airflow.hooks.http_hook.requests.Session.send")
-    def test_logging_head_error_request(
-        self,
-        mock_session_send
-    ):
-
-        def resp_check(resp):
-            return True
-
-        import requests
-        response = requests.Response()
-        response.status_code = 404
-        response.reason = 'Not Found'
-        mock_session_send.return_value = response
-
-        task = HttpSensor(
-            dag=self.dag,
-            task_id='http_sensor_head_method',
-            http_conn_id='http_default',
-            endpoint='',
-            request_params={},
-            method='HEAD',
-            response_check=resp_check,
-            timeout=5,
-            poke_interval=1
-        )
-
-        with mock.patch.object(task.hook.log, 'error') as mock_errors:
-            with self.assertRaises(AirflowSensorTimeout):
-                task.execute(None)
-
-            self.assertTrue(mock_errors.called)
-            mock_errors.assert_called_with('HTTP error: %s', 'Not Found')
-
-
-class HdfsSensorTests(unittest.TestCase):
-
-    def setUp(self):
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-
-    def test_legacy_file_exist(self):
-        """
-        Test the legacy behaviour
-        :return:
-        """
-        # Given
-        logging.info("Test for existing file with the legacy behaviour")
-        # When
-        task = HdfsSensor(task_id='Should_be_file_legacy',
-                          filepath='/datadirectory/datafile',
-                          timeout=1,
-                          retry_delay=timedelta(seconds=1),
-                          poke_interval=1,
-                          hook=self.hook)
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_legacy_file_exist_but_filesize(self):
-        """
-        Test the legacy behaviour with the filesize
-        :return:
-        """
-        # Given
-        logging.info("Test for existing file with the legacy behaviour")
-        # When
-        task = HdfsSensor(task_id='Should_be_file_legacy',
-                          filepath='/datadirectory/datafile',
-                          timeout=1,
-                          file_size=20,
-                          retry_delay=timedelta(seconds=1),
-                          poke_interval=1,
-                          hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_legacy_file_does_not_exists(self):
-        """
-        Test the legacy behaviour
-        :return:
-        """
-        # Given
-        logging.info("Test for non existing file with the legacy behaviour")
-        task = HdfsSensor(task_id='Should_not_be_file_legacy',
-                          filepath='/datadirectory/not_existing_file_or_directory',
-                          timeout=1,
-                          retry_delay=timedelta(seconds=1),
-                          poke_interval=1,
-                          hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-
-class ExternalTaskSensorTests(unittest.TestCase):
-
-    def setUp(self):
-        configuration.load_test_config()
-        self.args = {
-            'owner': 'airflow',
-            'start_date': DEFAULT_DATE,
-            'depends_on_past': False}
-
-    def test_templated_sensor(self):
-        dag = DAG(TEST_DAG_ID, self.args)
-
-        with dag:
-            sensor = ExternalTaskSensor(
-                task_id='templated_task',
-                external_dag_id='dag_{{ ds }}',
-                external_task_id='task_{{ ds }}',
-                start_date=DEFAULT_DATE
-            )
-
-        instance = TaskInstance(sensor, DEFAULT_DATE)
-        instance.render_templates()
-
-        self.assertEqual(sensor.external_dag_id,
-                         "dag_{}".format(DEFAULT_DATE.date()))
-        self.assertEqual(sensor.external_task_id,
-                         "task_{}".format(DEFAULT_DATE.date()))
-
-    def test_external_task_sensor_fn_multiple_execution_dates(self):
-        bash_command_code = """
-{% set s=execution_date.time().second %}
-echo "second is {{ s }}"
-if [[ $(( {{ s }} % 60 )) == 1 ]]
-    then
-        exit 1
-fi
-exit 0
-"""
-        dag_external_id = TEST_DAG_ID + '_external'
-        dag_external = DAG(
-            dag_external_id,
-            default_args=self.args,
-            schedule_interval=timedelta(seconds=1))
-        task_external_with_failure = BashOperator(
-            task_id="task_external_with_failure",
-            bash_command=bash_command_code,
-            retries=0,
-            dag=dag_external)
-        task_external_without_failure = DummyOperator(
-            task_id="task_external_without_failure",
-            retries=0,
-            dag=dag_external)
-
-        task_external_without_failure.run(
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + timedelta(seconds=1),
-            ignore_ti_state=True)
-
-        session = settings.Session()
-        TI = TaskInstance
-        try:
-            task_external_with_failure.run(
-                start_date=DEFAULT_DATE,
-                end_date=DEFAULT_DATE + timedelta(seconds=1),
-                ignore_ti_state=True)
-            # The test_with_failure task is excepted to fail
-            # once per minute (the run on the first second of
-            # each minute).
-        except Exception as e:
-            failed_tis = session.query(TI).filter(
-                TI.dag_id == dag_external_id,
-                TI.state == State.FAILED,
-                TI.execution_date == DEFAULT_DATE + timedelta(seconds=1)).all()
-            if (len(failed_tis) == 1 and
-                    failed_tis[0].task_id == 'task_external_with_failure'):
-                pass
-            else:
-                raise e
-
-        dag_id = TEST_DAG_ID
-        dag = DAG(
-            dag_id,
-            default_args=self.args,
-            schedule_interval=timedelta(minutes=1))
-        task_without_failure = ExternalTaskSensor(
-            task_id='task_without_failure',
-            external_dag_id=dag_external_id,
-            external_task_id='task_external_without_failure',
-            execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
-                                          for i in range(2)],
-            allowed_states=['success'],
-            retries=0,
-            timeout=1,
-            poke_interval=1,
-            dag=dag)
-        task_with_failure = ExternalTaskSensor(
-            task_id='task_with_failure',
-            external_dag_id=dag_external_id,
-            external_task_id='task_external_with_failure',
-            execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
-                                          for i in range(2)],
-            allowed_states=['success'],
-            retries=0,
-            timeout=1,
-            poke_interval=1,
-            dag=dag)
-
-        task_without_failure.run(
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE,
-            ignore_ti_state=True)
-
-        with self.assertRaises(AirflowSensorTimeout):
-            task_with_failure.run(
-                start_date=DEFAULT_DATE,
-                end_date=DEFAULT_DATE,
-                ignore_ti_state=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/sensors/__init__.py b/tests/sensors/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/sensors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_external_task_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
new file mode 100644
index 0000000..32e073d
--- /dev/null
+++ b/tests/sensors/test_external_task_sensor.py
@@ -0,0 +1,244 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from datetime import timedelta, time
+
+from airflow import DAG, configuration, settings
+from airflow import exceptions
+from airflow.exceptions import AirflowSensorTimeout
+from airflow.models import TaskInstance, DagBag
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.sensors.external_task_sensor import ExternalTaskSensor
+from airflow.sensors.time_sensor import TimeSensor
+from airflow.utils.state import State
+from airflow.utils.timezone import datetime
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+TEST_TASK_ID = 'time_sensor_check'
+DEV_NULL = '/dev/null'
+
+
+class ExternalTaskSensorTests(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        self.dagbag = DagBag(
+            dag_folder=DEV_NULL,
+            include_examples=True
+        )
+        self.args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+
+    def test_time_sensor(self):
+        t = TimeSensor(
+            task_id=TEST_TASK_ID,
+            target_time=time(0),
+            dag=self.dag
+        )
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_external_task_sensor(self):
+        self.test_time_sensor()
+        t = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            dag=self.dag
+        )
+        t.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_ti_state=True
+        )
+
+    def test_templated_sensor(self):
+        dag = DAG(TEST_DAG_ID, self.args)
+
+        with dag:
+            sensor = ExternalTaskSensor(
+                task_id='templated_task',
+                external_dag_id='dag_{{ ds }}',
+                external_task_id='task_{{ ds }}',
+                start_date=DEFAULT_DATE
+            )
+
+        instance = TaskInstance(sensor, DEFAULT_DATE)
+        instance.render_templates()
+
+        self.assertEqual(sensor.external_dag_id,
+                         "dag_{}".format(DEFAULT_DATE.date()))
+        self.assertEqual(sensor.external_task_id,
+                         "task_{}".format(DEFAULT_DATE.date()))
+
+    def test_external_task_sensor_fn_multiple_execution_dates(self):
+        bash_command_code = """
+{% set s=execution_date.time().second %}
+echo "second is {{ s }}"
+if [[ $(( {{ s }} % 60 )) == 1 ]]
+    then
+        exit 1
+fi
+exit 0
+"""
+        dag_external_id = TEST_DAG_ID + '_external'
+        dag_external = DAG(
+            dag_external_id,
+            default_args=self.args,
+            schedule_interval=timedelta(seconds=1))
+        task_external_with_failure = BashOperator(
+            task_id="task_external_with_failure",
+            bash_command=bash_command_code,
+            retries=0,
+            dag=dag_external)
+        task_external_without_failure = DummyOperator(
+            task_id="task_external_without_failure",
+            retries=0,
+            dag=dag_external)
+
+        task_external_without_failure.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + timedelta(seconds=1),
+            ignore_ti_state=True)
+
+        session = settings.Session()
+        TI = TaskInstance
+        try:
+            task_external_with_failure.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE + timedelta(seconds=1),
+                ignore_ti_state=True)
+            # The test_with_failure task is excepted to fail
+            # once per minute (the run on the first second of
+            # each minute).
+        except Exception as e:
+            failed_tis = session.query(TI).filter(
+                TI.dag_id == dag_external_id,
+                TI.state == State.FAILED,
+                TI.execution_date == DEFAULT_DATE + timedelta(seconds=1)).all()
+            if len(failed_tis) == 1 and \
+               failed_tis[0].task_id == 'task_external_with_failure':
+                pass
+            else:
+                raise e
+
+        dag_id = TEST_DAG_ID
+        dag = DAG(
+            dag_id,
+            default_args=self.args,
+            schedule_interval=timedelta(minutes=1))
+        task_without_failure = ExternalTaskSensor(
+            task_id='task_without_failure',
+            external_dag_id=dag_external_id,
+            external_task_id='task_external_without_failure',
+            execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
+                                          for i in range(2)],
+            allowed_states=['success'],
+            retries=0,
+            timeout=1,
+            poke_interval=1,
+            dag=dag)
+        task_with_failure = ExternalTaskSensor(
+            task_id='task_with_failure',
+            external_dag_id=dag_external_id,
+            external_task_id='task_external_with_failure',
+            execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
+                                          for i in range(2)],
+            allowed_states=['success'],
+            retries=0,
+            timeout=1,
+            poke_interval=1,
+            dag=dag)
+
+        task_without_failure.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_ti_state=True)
+
+        with self.assertRaises(AirflowSensorTimeout):
+            task_with_failure.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                ignore_ti_state=True)
+
+    def test_external_task_sensor_delta(self):
+        self.test_time_sensor()
+        t = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            execution_delta=timedelta(0),
+            allowed_states=['success'],
+            dag=self.dag
+        )
+        t.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_ti_state=True
+        )
+
+    def test_external_task_sensor_fn(self):
+        self.test_time_sensor()
+        # check that the execution_fn works
+        t = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            execution_date_fn=lambda dt: dt + timedelta(0),
+            allowed_states=['success'],
+            dag=self.dag
+        )
+        t.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_ti_state=True
+        )
+        # double check that the execution is being called by failing the test
+        t2 = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            execution_date_fn=lambda dt: dt + timedelta(days=1),
+            allowed_states=['success'],
+            timeout=1,
+            poke_interval=1,
+            dag=self.dag
+        )
+        with self.assertRaises(exceptions.AirflowSensorTimeout):
+            t2.run(
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                ignore_ti_state=True
+            )
+
+    def test_external_task_sensor_error_delta_and_fn(self):
+        self.test_time_sensor()
+        # Test that providing execution_delta and a function raises an error
+        with self.assertRaises(ValueError):
+            ExternalTaskSensor(
+                task_id='test_external_task_sensor_check_delta',
+                external_dag_id=TEST_DAG_ID,
+                external_task_id=TEST_TASK_ID,
+                execution_delta=timedelta(0),
+                execution_date_fn=lambda dt: dt,
+                allowed_states=['success'],
+                dag=self.dag
+            )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_hdfs_sensor.py b/tests/sensors/test_hdfs_sensor.py
new file mode 100644
index 0000000..4ff9c52
--- /dev/null
+++ b/tests/sensors/test_hdfs_sensor.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from datetime import timedelta
+
+from airflow import configuration
+from airflow.exceptions import AirflowSensorTimeout
+from airflow.sensors.hdfs_sensor import HdfsSensor
+from airflow.utils.timezone import datetime
+from tests.core import FakeHDFSHook
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class HdfsSensorTests(unittest.TestCase):
+
+    def setUp(self):
+        self.hook = FakeHDFSHook
+
+    def test_legacy_file_exist(self):
+        """
+        Test the legacy behaviour
+        :return:
+        """
+        # When
+        task = HdfsSensor(task_id='Should_be_file_legacy',
+                          filepath='/datadirectory/datafile',
+                          timeout=1,
+                          retry_delay=timedelta(seconds=1),
+                          poke_interval=1,
+                          hook=self.hook)
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_legacy_file_exist_but_filesize(self):
+        """
+        Test the legacy behaviour with the filesize
+        :return:
+        """
+        # When
+        task = HdfsSensor(task_id='Should_be_file_legacy',
+                          filepath='/datadirectory/datafile',
+                          timeout=1,
+                          file_size=20,
+                          retry_delay=timedelta(seconds=1),
+                          poke_interval=1,
+                          hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_legacy_file_does_not_exists(self):
+        """
+        Test the legacy behaviour
+        :return:
+        """
+        task = HdfsSensor(task_id='Should_not_be_file_legacy',
+                          filepath='/datadirectory/not_existing_file_or_directory',
+                          timeout=1,
+                          retry_delay=timedelta(seconds=1),
+                          poke_interval=1,
+                          hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)



[3/3] incubator-airflow git commit: [AIRFLOW-1889] Split sensors into separate files

Posted by bo...@apache.org.
[AIRFLOW-1889] Split sensors into separate files

Moving the sensors to seperate files increases
readability of the
code. Also this reduces the code in the big
core.py file.

Closes #2875 from Fokko/AIRFLOW-1889-move-sensors-
to-separate-package


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/33c72042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/33c72042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/33c72042

Branch: refs/heads/master
Commit: 33c720421208696c9cae422e1056c8e914d3329b
Parents: e7c118d
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri Jan 19 18:59:08 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 19 18:59:08 2018 +0100

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 airflow/contrib/operators/fs_operator.py        |  56 --
 .../sensors/aws_redshift_cluster_sensor.py      |  13 +-
 airflow/contrib/sensors/bash_sensor.py          |   2 +-
 airflow/contrib/sensors/bigquery_sensor.py      |   2 +-
 airflow/contrib/sensors/datadog_sensor.py       |   2 +-
 airflow/contrib/sensors/emr_base_sensor.py      |   2 +-
 airflow/contrib/sensors/emr_job_flow_sensor.py  |   8 +-
 airflow/contrib/sensors/emr_step_sensor.py      |  10 +-
 airflow/contrib/sensors/file_sensor.py          |  56 ++
 airflow/contrib/sensors/ftp_sensor.py           |   2 +-
 airflow/contrib/sensors/gcs_sensor.py           |   2 +-
 airflow/contrib/sensors/hdfs_sensors.py         |  20 +-
 airflow/contrib/sensors/jira_sensor.py          |  31 +-
 airflow/contrib/sensors/pubsub_sensor.py        |   2 +-
 airflow/contrib/sensors/qubole_sensor.py        |  12 +-
 airflow/contrib/sensors/redis_key_sensor.py     |   2 +-
 airflow/contrib/sensors/wasb_sensor.py          |   2 +-
 airflow/example_dags/example_http_operator.py   |   8 +-
 airflow/operators/__init__.py                   |  15 -
 airflow/operators/sensors.py                    | 697 -------------------
 airflow/plugins_manager.py                      |   4 +
 airflow/sensors/__init__.py                     |  60 ++
 airflow/sensors/base_sensor_operator.py         |  70 ++
 airflow/sensors/external_task_sensor.py         |  96 +++
 airflow/sensors/hdfs_sensor.py                  | 112 +++
 airflow/sensors/hive_partition_sensor.py        |  70 ++
 airflow/sensors/http_sensor.py                  |  87 +++
 airflow/sensors/metastore_partition_sensor.py   |  78 +++
 airflow/sensors/named_hive_partition_sensor.py  |  89 +++
 airflow/sensors/s3_key_sensor.py                |  76 ++
 airflow/sensors/s3_prefix_sensor.py             |  63 ++
 airflow/sensors/sql_sensor.py                   |  53 ++
 airflow/sensors/time_delta_sensor.py            |  41 ++
 airflow/sensors/time_sensor.py                  |  35 +
 airflow/sensors/web_hdfs_sensor.py              |  39 ++
 docs/code.rst                                   |  43 +-
 tests/contrib/operators/test_fs_operator.py     |  67 --
 .../sensors/test_aws_redshift_cluster_sensor.py |   1 +
 tests/contrib/sensors/test_bash_sensor.py       |   3 +-
 tests/contrib/sensors/test_datadog_sensor.py    |   5 +-
 tests/contrib/sensors/test_emr_base_sensor.py   |   4 +-
 tests/contrib/sensors/test_file_sensor.py       |  67 ++
 tests/contrib/sensors/test_ftp_sensor.py        |   2 +-
 tests/contrib/sensors/test_hdfs_sensors.py      |   3 +-
 tests/contrib/sensors/test_jira_sensor_test.py  |  12 +-
 tests/contrib/sensors/test_pubsub_sensor.py     |   3 +-
 tests/contrib/sensors/test_qubole_sensor.py     |  39 +-
 tests/contrib/sensors/test_wasb_sensor.py       |   6 +-
 tests/core.py                                   | 401 +++++------
 tests/operators/__init__.py                     |   1 -
 tests/operators/operators.py                    |  16 -
 tests/operators/sensors.py                      | 382 ----------
 tests/sensors/__init__.py                       |  13 +
 tests/sensors/test_external_task_sensor.py      | 244 +++++++
 tests/sensors/test_hdfs_sensor.py               |  86 +++
 tests/sensors/test_http_sensor.py               | 195 ++++++
 tests/sensors/test_sql_sensor.py                |  52 ++
 tests/sensors/test_timedelta_sensor.py          |  43 ++
 tests/sensors/test_timeout_sensor.py            |  88 +++
 60 files changed, 2121 insertions(+), 1573 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e32065d..0a512df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -112,6 +112,7 @@ ENV/
 
 # PyCharm
 .idea/
+*.iml
 
 # Visual Studio Code
 .vscode/

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
deleted file mode 100644
index e7640c8..0000000
--- a/airflow/contrib/operators/fs_operator.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from os import walk
-
-from airflow.operators.sensors import BaseSensorOperator
-from airflow.contrib.hooks.fs_hook import FSHook
-from airflow.utils.decorators import apply_defaults
-
-
-class FileSensor(BaseSensorOperator):
-    """
-    Waits for a file or folder to land in a filesystem
-
-    :param fs_conn_id: reference to the File (path)
-        connection id
-    :type fs_conn_id: string
-    :param filepath: File or folder name (relative to
-        the base path set within the connection)
-    :type fs_conn_id: string
-    """
-    template_fields = ('filepath',)
-    ui_color = '#91818a'
-
-    @apply_defaults
-    def __init__(
-            self,
-            filepath,
-            fs_conn_id='fs_default2',
-            *args, **kwargs):
-        super(FileSensor, self).__init__(*args, **kwargs)
-        self.filepath = filepath
-        self.fs_conn_id = fs_conn_id
-
-    def poke(self, context):
-        hook = FSHook(self.fs_conn_id)
-        basepath = hook.get_path()
-        full_path = "/".join([basepath, self.filepath])
-        self.log.info('Poking for file {full_path}'.format(**locals()))
-        try:
-            files = [f for f in walk(full_path)]
-        except:
-            return False
-        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
index 8db85e6..bf29cbf 100644
--- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
+++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
@@ -12,8 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from airflow.operators.sensors import BaseSensorOperator
 from airflow.contrib.hooks.redshift_hook import RedshiftHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 
@@ -29,11 +29,12 @@ class AwsRedshiftClusterSensor(BaseSensorOperator):
     template_fields = ('cluster_identifier', 'target_status')
 
     @apply_defaults
-    def __init__(
-            self, cluster_identifier,
-            target_status='available',
-            aws_conn_id='aws_default',
-            *args, **kwargs):
+    def __init__(self,
+                 cluster_identifier,
+                 target_status='available',
+                 aws_conn_id='aws_default',
+                 *args,
+                 **kwargs):
         super(AwsRedshiftClusterSensor, self).__init__(*args, **kwargs)
         self.cluster_identifier = cluster_identifier
         self.target_status = target_status

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/bash_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py
index 760ee2c..1781a0f 100644
--- a/airflow/contrib/sensors/bash_sensor.py
+++ b/airflow/contrib/sensors/bash_sensor.py
@@ -17,7 +17,7 @@ import os
 from subprocess import Popen, STDOUT, PIPE
 from tempfile import gettempdir, NamedTemporaryFile
 from airflow.utils.decorators import apply_defaults
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.file import TemporaryDirectory
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 90a3264..08d5a42 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
 from airflow.utils.decorators import apply_defaults
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index e1a9169..09efc85 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.contrib.hooks.datadog_hook import DatadogHook
 from airflow.utils import apply_defaults
 from airflow.exceptions import AirflowException

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index c6f96f8..552c8c6 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils import apply_defaults
 from airflow.exceptions import AirflowException
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index a437fc3..b38d1da 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -31,10 +31,10 @@ class EmrJobFlowSensor(EmrBaseSensor):
     template_ext = ()
 
     @apply_defaults
-    def __init__(
-            self,
-            job_flow_id,
-            *args, **kwargs):
+    def __init__(self,
+                 job_flow_id,
+                 *args,
+                 **kwargs):
         super(EmrJobFlowSensor, self).__init__(*args, **kwargs)
         self.job_flow_id = job_flow_id
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index c5a450d..543efe4 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -33,11 +33,11 @@ class EmrStepSensor(EmrBaseSensor):
     template_ext = ()
 
     @apply_defaults
-    def __init__(
-            self,
-            job_flow_id,
-            step_id,
-            *args, **kwargs):
+    def __init__(self,
+                 job_flow_id,
+                 step_id,
+                 *args,
+                 **kwargs):
         super(EmrStepSensor, self).__init__(*args, **kwargs)
         self.job_flow_id = job_flow_id
         self.step_id = step_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/file_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/file_sensor.py b/airflow/contrib/sensors/file_sensor.py
new file mode 100644
index 0000000..fdc1e67
--- /dev/null
+++ b/airflow/contrib/sensors/file_sensor.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from os import walk
+
+from airflow.contrib.hooks.fs_hook import FSHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class FileSensor(BaseSensorOperator):
+    """
+    Waits for a file or folder to land in a filesystem
+
+    :param fs_conn_id: reference to the File (path)
+        connection id
+    :type fs_conn_id: string
+    :param filepath: File or folder name (relative to
+        the base path set within the connection)
+    :type fs_conn_id: string
+    """
+    template_fields = ('filepath',)
+    ui_color = '#91818a'
+
+    @apply_defaults
+    def __init__(self,
+                 filepath,
+                 fs_conn_id='fs_default2',
+                 *args,
+                 **kwargs):
+        super(FileSensor, self).__init__(*args, **kwargs)
+        self.filepath = filepath
+        self.fs_conn_id = fs_conn_id
+
+    def poke(self, context):
+        hook = FSHook(self.fs_conn_id)
+        basepath = hook.get_path()
+        full_path = "/".join([basepath, self.filepath])
+        self.log.info('Poking for file {full_path}'.format(**locals()))
+        try:
+            files = [f for f in walk(full_path)]
+        except OSError:
+            return False
+        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index bd66c32..5fb31c3 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -14,7 +14,7 @@
 import ftplib
 
 from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index a45923a..d91e2b1 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 1893f01..c96005a 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -11,14 +11,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from airflow.operators.sensors import HdfsSensor
+from airflow.sensors.hdfs_sensor import HdfsSensor
 
 
 class HdfsSensorRegex(HdfsSensor):
-    def __init__(
-            self,
-            regex,
-            *args, **kwargs):
+    def __init__(self,
+                 regex,
+                 *args,
+                 **kwargs):
         super(HdfsSensorRegex, self).__init__(*args, **kwargs)
         self.regex = regex
 
@@ -39,10 +39,10 @@ class HdfsSensorRegex(HdfsSensor):
 
 
 class HdfsSensorFolder(HdfsSensor):
-    def __init__(
-            self,
-            be_empty=False,
-            *args, **kwargs):
+    def __init__(self,
+                 be_empty=False,
+                 *args,
+                 **kwargs):
         super(HdfsSensorFolder, self).__init__(*args, **kwargs)
         self.be_empty = be_empty
 
@@ -62,5 +62,3 @@ class HdfsSensorFolder(HdfsSensor):
             self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
             result.pop(0)
             return bool(result) and result[0]['file_type'] == 'f'
-
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 1dc7b50..a8a0df7 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -15,7 +15,7 @@ from jira.resources import Resource
 
 from airflow.contrib.operators.jira_operator import JIRAError
 from airflow.contrib.operators.jira_operator import JiraOperator
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 
@@ -83,7 +83,8 @@ class JiraTicketSensor(JiraSensor):
                  field=None,
                  expected_value=None,
                  field_checker_func=None,
-                 *args, **kwargs):
+                 *args,
+                 **kwargs):
 
         self.jira_conn_id = jira_conn_id
         self.ticket_id = ticket_id
@@ -94,7 +95,8 @@ class JiraTicketSensor(JiraSensor):
 
         super(JiraTicketSensor, self).__init__(jira_conn_id=jira_conn_id,
                                                result_processor=field_checker_func,
-                                               *args, **kwargs)
+                                               *args,
+                                               **kwargs)
 
     def poke(self, context):
         self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
@@ -110,18 +112,17 @@ class JiraTicketSensor(JiraSensor):
         result = None
         try:
             if issue is not None \
-                    and self.field is not None \
-                    and self.expected_value is not None:
-
-                field_value = getattr(issue.fields, self.field)
-                if field_value is not None:
-                    if isinstance(field_value, list):
-                        result = self.expected_value in field_value
-                    elif isinstance(field_value, str):
-                        result = self.expected_value.lower() == field_value.lower()
-                    elif isinstance(field_value, Resource) \
-                            and getattr(field_value, 'name'):
-                        result = self.expected_value.lower() == field_value.name.lower()
+               and self.field is not None \
+               and self.expected_value is not None:
+
+                field_val = getattr(issue.fields, self.field)
+                if field_val is not None:
+                    if isinstance(field_val, list):
+                        result = self.expected_value in field_val
+                    elif isinstance(field_val, str):
+                        result = self.expected_value.lower() == field_val.lower()
+                    elif isinstance(field_val, Resource) and getattr(field_val, 'name'):
+                        result = self.expected_value.lower() == field_val.name.lower()
                     else:
                         self.log.warning(
                             "Not implemented checker for issue field %s which "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/pubsub_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/pubsub_sensor.py b/airflow/contrib/sensors/pubsub_sensor.py
index 112f777..c591d55 100644
--- a/airflow/contrib/sensors/pubsub_sensor.py
+++ b/airflow/contrib/sensors/pubsub_sensor.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/qubole_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/qubole_sensor.py b/airflow/contrib/sensors/qubole_sensor.py
index 6915361..1045040 100644
--- a/airflow/contrib/sensors/qubole_sensor.py
+++ b/airflow/contrib/sensors/qubole_sensor.py
@@ -12,17 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from __future__ import print_function
-from future import standard_library
-standard_library.install_aliases()
+from qds_sdk.qubole import Qubole
+from qds_sdk.sensors import FileSensor, PartitionSensor
 
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.exceptions import AirflowException
-
-from qds_sdk.qubole import Qubole
-from qds_sdk.sensors import FileSensor, PartitionSensor
 
 
 class QuboleSensor(BaseSensorOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 6cc314b..9328f21 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from airflow.contrib.hooks.redis_hook import RedisHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 4295a25..50ce68b 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 #
 from airflow.contrib.hooks.wasb_hook import WasbHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 10fc3b1..2878d50 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -14,13 +14,13 @@
 """
 ### Example HTTP operator and sensor
 """
+import json
+from datetime import timedelta
+
 import airflow
 from airflow import DAG
 from airflow.operators.http_operator import SimpleHttpOperator
-from airflow.operators.sensors import HttpSensor
-from datetime import timedelta
-import json
-
+from airflow.sensors.http_sensor import HttpSensor
 
 default_args = {
     'owner': 'airflow',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 50b05ff..c171fb9 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -62,21 +62,6 @@ _operators = {
     'sqlite_operator': ['SqliteOperator'],
     'mysql_to_hive': ['MySqlToHiveTransfer'],
     'postgres_operator': ['PostgresOperator'],
-    'sensors': [
-        'BaseSensorOperator',
-        'ExternalTaskSensor',
-        'HdfsSensor',
-        'HivePartitionSensor',
-        'HttpSensor',
-        'MetastorePartitionSensor',
-        'NamedHivePartitionSensor',
-        'S3KeySensor',
-        'S3PrefixSensor',
-        'SqlSensor',
-        'TimeDeltaSensor',
-        'TimeSensor',
-        'WebHdfsSensor',
-    ],
     'subdag_operator': ['SubDagOperator'],
     'hive_stats_operator': ['HiveStatsCollectionOperator'],
     's3_to_hive_operator': ['S3ToHiveTransfer'],

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
deleted file mode 100644
index e9bf7ff..0000000
--- a/airflow/operators/sensors.py
+++ /dev/null
@@ -1,697 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-from future import standard_library
-
-from airflow.utils.log.logging_mixin import LoggingMixin
-
-standard_library.install_aliases()
-from builtins import str
-from past.builtins import basestring
-
-from airflow.utils import timezone
-from urllib.parse import urlparse
-from time import sleep
-import re
-import sys
-
-from airflow import settings
-from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
-from airflow.models import BaseOperator, TaskInstance
-from airflow.hooks.base_hook import BaseHook
-from airflow.hooks.hdfs_hook import HDFSHook
-from airflow.hooks.http_hook import HttpHook
-from airflow.utils.state import State
-from airflow.utils.db import provide_session
-from airflow.utils.decorators import apply_defaults
-
-
-class BaseSensorOperator(BaseOperator):
-    '''
-    Sensor operators are derived from this class an inherit these attributes.
-
-    Sensor operators keep executing at a time interval and succeed when
-        a criteria is met and fail if and when they time out.
-
-    :param soft_fail: Set to true to mark the task as SKIPPED on failure
-    :type soft_fail: bool
-    :param poke_interval: Time in seconds that the job should wait in
-        between each tries
-    :type poke_interval: int
-    :param timeout: Time, in seconds before the task times out and fails.
-    :type timeout: int
-    '''
-    ui_color = '#e6f1f2'
-
-    @apply_defaults
-    def __init__(
-            self,
-            poke_interval=60,
-            timeout=60*60*24*7,
-            soft_fail=False,
-            *args, **kwargs):
-        super(BaseSensorOperator, self).__init__(*args, **kwargs)
-        self.poke_interval = poke_interval
-        self.soft_fail = soft_fail
-        self.timeout = timeout
-
-    def poke(self, context):
-        '''
-        Function that the sensors defined while deriving this class should
-        override.
-        '''
-        raise AirflowException('Override me.')
-
-    def execute(self, context):
-        started_at = timezone.utcnow()
-        while not self.poke(context):
-            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
-                if self.soft_fail:
-                    raise AirflowSkipException('Snap. Time is OUT.')
-                else:
-                    raise AirflowSensorTimeout('Snap. Time is OUT.')
-            sleep(self.poke_interval)
-        self.log.info("Success criteria met. Exiting.")
-
-
-class SqlSensor(BaseSensorOperator):
-    """
-    Runs a sql statement until a criteria is met. It will keep trying while
-    sql returns no row, or if the first cell in (0, '0', '').
-
-    :param conn_id: The connection to run the sensor against
-    :type conn_id: string
-    :param sql: The sql to run. To pass, it needs to return at least one cell
-        that contains a non-zero / empty string value.
-    """
-    template_fields = ('sql',)
-    template_ext = ('.hql', '.sql',)
-    ui_color = '#7c7287'
-
-    @apply_defaults
-    def __init__(self, conn_id, sql, *args, **kwargs):
-        self.sql = sql
-        self.conn_id = conn_id
-        super(SqlSensor, self).__init__(*args, **kwargs)
-
-    def poke(self, context):
-        hook = BaseHook.get_connection(self.conn_id).get_hook()
-
-        self.log.info('Poking: %s', self.sql)
-        records = hook.get_records(self.sql)
-        if not records:
-            return False
-        else:
-            if str(records[0][0]) in ('0', '',):
-                return False
-            else:
-                return True
-
-
-class MetastorePartitionSensor(SqlSensor):
-    """
-    An alternative to the HivePartitionSensor that talk directly to the
-    MySQL db. This was created as a result of observing sub optimal
-    queries generated by the Metastore thrift service when hitting
-    subpartitioned tables. The Thrift service's queries were written in a
-    way that wouldn't leverage the indexes.
-
-    :param schema: the schema
-    :type schema: str
-    :param table: the table
-    :type table: str
-    :param partition_name: the partition name, as defined in the PARTITIONS
-        table of the Metastore. Order of the fields does matter.
-        Examples: ``ds=2016-01-01`` or
-        ``ds=2016-01-01/sub=foo`` for a sub partitioned table
-    :type partition_name: str
-    :param mysql_conn_id: a reference to the MySQL conn_id for the metastore
-    :type mysql_conn_id: str
-    """
-    template_fields = ('partition_name', 'table', 'schema')
-    ui_color = '#8da7be'
-
-    @apply_defaults
-    def __init__(
-            self, table, partition_name, schema="default",
-            mysql_conn_id="metastore_mysql",
-            *args, **kwargs):
-
-        self.partition_name = partition_name
-        self.table = table
-        self.schema = schema
-        self.first_poke = True
-        self.conn_id = mysql_conn_id
-        # TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.
-        # The problem is the way apply_defaults works isn't compatible with inheritance.
-        # The inheritance model needs to be reworked in order to support overriding args/
-        # kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the
-        # constructor below and apply_defaults will no longer throw an exception.
-        super(SqlSensor, self).__init__(*args, **kwargs)
-
-    def poke(self, context):
-        if self.first_poke:
-            self.first_poke = False
-            if '.' in self.table:
-                self.schema, self.table = self.table.split('.')
-            self.sql = """
-            SELECT 'X'
-            FROM PARTITIONS A0
-            LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID
-            LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID
-            WHERE
-                B0.TBL_NAME = '{self.table}' AND
-                C0.NAME = '{self.schema}' AND
-                A0.PART_NAME = '{self.partition_name}';
-            """.format(self=self)
-        return super(MetastorePartitionSensor, self).poke(context)
-
-
-class ExternalTaskSensor(BaseSensorOperator):
-    """
-    Waits for a task to complete in a different DAG
-
-    :param external_dag_id: The dag_id that contains the task you want to
-        wait for
-    :type external_dag_id: string
-    :param external_task_id: The task_id that contains the task you want to
-        wait for
-    :type external_task_id: string
-    :param allowed_states: list of allowed states, default is ``['success']``
-    :type allowed_states: list
-    :param execution_delta: time difference with the previous execution to
-        look at, the default is the same execution_date as the current task.
-        For yesterday, use [positive!] datetime.timedelta(days=1). Either
-        execution_delta or execution_date_fn can be passed to
-        ExternalTaskSensor, but not both.
-    :type execution_delta: datetime.timedelta
-    :param execution_date_fn: function that receives the current execution date
-        and returns the desired execution dates to query. Either execution_delta
-        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
-    :type execution_date_fn: callable
-    """
-    template_fields = ['external_dag_id', 'external_task_id']
-    ui_color = '#19647e'
-
-    @apply_defaults
-    def __init__(
-            self,
-            external_dag_id,
-            external_task_id,
-            allowed_states=None,
-            execution_delta=None,
-            execution_date_fn=None,
-            *args, **kwargs):
-        super(ExternalTaskSensor, self).__init__(*args, **kwargs)
-        self.allowed_states = allowed_states or [State.SUCCESS]
-        if execution_delta is not None and execution_date_fn is not None:
-            raise ValueError(
-                'Only one of `execution_date` or `execution_date_fn` may'
-                'be provided to ExternalTaskSensor; not both.')
-
-        self.execution_delta = execution_delta
-        self.execution_date_fn = execution_date_fn
-        self.external_dag_id = external_dag_id
-        self.external_task_id = external_task_id
-
-    @provide_session
-    def poke(self, context, session=None):
-        if self.execution_delta:
-            dttm = context['execution_date'] - self.execution_delta
-        elif self.execution_date_fn:
-            dttm = self.execution_date_fn(context['execution_date'])
-        else:
-            dttm = context['execution_date']
-
-        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
-        serialized_dttm_filter = ','.join(
-            [datetime.isoformat() for datetime in dttm_filter])
-
-        self.log.info(
-            'Poking for '
-            '{self.external_dag_id}.'
-            '{self.external_task_id} on '
-            '{} ... '.format(serialized_dttm_filter, **locals()))
-        TI = TaskInstance
-
-        count = session.query(TI).filter(
-            TI.dag_id == self.external_dag_id,
-            TI.task_id == self.external_task_id,
-            TI.state.in_(self.allowed_states),
-            TI.execution_date.in_(dttm_filter),
-        ).count()
-        session.commit()
-        return count == len(dttm_filter)
-
-
-class NamedHivePartitionSensor(BaseSensorOperator):
-    """
-    Waits for a set of partitions to show up in Hive.
-
-    :param partition_names: List of fully qualified names of the
-        partitions to wait for. A fully qualified name is of the
-        form ``schema.table/pk1=pv1/pk2=pv2``, for example,
-        default.users/ds=2016-01-01. This is passed as is to the metastore
-        Thrift client ``get_partitions_by_name`` method. Note that
-        you cannot use logical or comparison operators as in
-        HivePartitionSensor.
-    :type partition_names: list of strings
-    :param metastore_conn_id: reference to the metastore thrift service
-        connection id
-    :type metastore_conn_id: str
-    """
-
-    template_fields = ('partition_names', )
-    ui_color = '#8d99ae'
-
-    @apply_defaults
-    def __init__(
-            self,
-            partition_names,
-            metastore_conn_id='metastore_default',
-            poke_interval=60 * 3,
-            *args,
-            **kwargs):
-        super(NamedHivePartitionSensor, self).__init__(
-            poke_interval=poke_interval, *args, **kwargs)
-
-        if isinstance(partition_names, basestring):
-            raise TypeError('partition_names must be an array of strings')
-
-        self.metastore_conn_id = metastore_conn_id
-        self.partition_names = partition_names
-        self.next_poke_idx = 0
-
-    @classmethod
-    def parse_partition_name(self, partition):
-        try:
-            schema, table_partition = partition.split('.', 1)
-            table, partition = table_partition.split('/', 1)
-            return schema, table, partition
-        except ValueError as e:
-            raise ValueError('Could not parse ' + partition)
-
-    def poke(self, context):
-        if not hasattr(self, 'hook'):
-            from airflow.hooks.hive_hooks import HiveMetastoreHook
-            self.hook = HiveMetastoreHook(
-                metastore_conn_id=self.metastore_conn_id)
-
-        def poke_partition(partition):
-
-            schema, table, partition = self.parse_partition_name(partition)
-
-            self.log.info(
-                'Poking for {schema}.{table}/{partition}'.format(**locals())
-            )
-            return self.hook.check_for_named_partition(
-                schema, table, partition)
-
-        while self.next_poke_idx < len(self.partition_names):
-            if poke_partition(self.partition_names[self.next_poke_idx]):
-                self.next_poke_idx += 1
-            else:
-                return False
-
-        return True
-
-
-class HivePartitionSensor(BaseSensorOperator):
-    """
-    Waits for a partition to show up in Hive.
-
-    Note: Because ``partition`` supports general logical operators, it
-    can be inefficient. Consider using NamedHivePartitionSensor instead if
-    you don't need the full flexibility of HivePartitionSensor.
-
-    :param table: The name of the table to wait for, supports the dot
-        notation (my_database.my_table)
-    :type table: string
-    :param partition: The partition clause to wait for. This is passed as
-        is to the metastore Thrift client ``get_partitions_by_filter`` method,
-        and apparently supports SQL like notation as in ``ds='2015-01-01'
-        AND type='value'`` and comparison operators as in ``"ds>=2015-01-01"``
-    :type partition: string
-    :param metastore_conn_id: reference to the metastore thrift service
-        connection id
-    :type metastore_conn_id: str
-    """
-    template_fields = ('schema', 'table', 'partition',)
-    ui_color = '#C5CAE9'
-
-    @apply_defaults
-    def __init__(
-            self,
-            table, partition="ds='{{ ds }}'",
-            metastore_conn_id='metastore_default',
-            schema='default',
-            poke_interval=60*3,
-            *args, **kwargs):
-        super(HivePartitionSensor, self).__init__(
-            poke_interval=poke_interval, *args, **kwargs)
-        if not partition:
-            partition = "ds='{{ ds }}'"
-        self.metastore_conn_id = metastore_conn_id
-        self.table = table
-        self.partition = partition
-        self.schema = schema
-
-    def poke(self, context):
-        if '.' in self.table:
-            self.schema, self.table = self.table.split('.')
-        self.log.info(
-            'Poking for table {self.schema}.{self.table}, '
-            'partition {self.partition}'.format(**locals()))
-        if not hasattr(self, 'hook'):
-            from airflow.hooks.hive_hooks import HiveMetastoreHook
-            self.hook = HiveMetastoreHook(
-                metastore_conn_id=self.metastore_conn_id)
-        return self.hook.check_for_partition(
-            self.schema, self.table, self.partition)
-
-
-class HdfsSensor(BaseSensorOperator):
-    """
-    Waits for a file or folder to land in HDFS
-    """
-    template_fields = ('filepath',)
-    ui_color = settings.WEB_COLORS['LIGHTBLUE']
-
-    @apply_defaults
-    def __init__(
-            self,
-            filepath,
-            hdfs_conn_id='hdfs_default',
-            ignored_ext=['_COPYING_'],
-            ignore_copying=True,
-            file_size=None,
-            hook=HDFSHook,
-            *args, **kwargs):
-        super(HdfsSensor, self).__init__(*args, **kwargs)
-        self.filepath = filepath
-        self.hdfs_conn_id = hdfs_conn_id
-        self.file_size = file_size
-        self.ignored_ext = ignored_ext
-        self.ignore_copying = ignore_copying
-        self.hook = hook
-
-    @staticmethod
-    def filter_for_filesize(result, size=None):
-        """
-        Will test the filepath result and test if its size is at least self.filesize
-
-        :param result: a list of dicts returned by Snakebite ls
-        :param size: the file size in MB a file should be at least to trigger True
-        :return: (bool) depending on the matching criteria
-        """
-        if size:
-            log = LoggingMixin().log
-            log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
-            size *= settings.MEGABYTE
-            result = [x for x in result if x['length'] >= size]
-            log.debug('HdfsSensor.poke: after size filter result is %s', result)
-        return result
-
-    @staticmethod
-    def filter_for_ignored_ext(result, ignored_ext, ignore_copying):
-        """
-        Will filter if instructed to do so the result to remove matching criteria
-
-        :param result: (list) of dicts returned by Snakebite ls
-        :param ignored_ext: (list) of ignored extensions
-        :param ignore_copying: (bool) shall we ignore ?
-        :return: (list) of dicts which were not removed
-        """
-        if ignore_copying:
-            log = LoggingMixin().log
-            regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
-            ignored_extentions_regex = re.compile(regex_builder)
-            log.debug(
-                'Filtering result for ignored extensions: %s in files %s',
-                ignored_extentions_regex.pattern, map(lambda x: x['path'], result)
-            )
-            result = [x for x in result if not ignored_extentions_regex.match(x['path'])]
-            log.debug('HdfsSensor.poke: after ext filter result is %s', result)
-        return result
-
-    def poke(self, context):
-        sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.log.info('Poking for file {self.filepath}'.format(**locals()))
-        try:
-            # IMOO it's not right here, as there no raise of any kind.
-            # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
-            # it's not correct as the directory exists and sb does not raise any error
-            # here is a quick fix
-            result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
-            self.log.debug('HdfsSensor.poke: result is %s', result)
-            result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
-            result = self.filter_for_filesize(result, self.file_size)
-            return bool(result)
-        except:
-            e = sys.exc_info()
-            self.log.debug("Caught an exception !: %s", str(e))
-            return False
-
-
-class WebHdfsSensor(BaseSensorOperator):
-    """
-    Waits for a file or folder to land in HDFS
-    """
-    template_fields = ('filepath',)
-
-    @apply_defaults
-    def __init__(
-            self,
-            filepath,
-            webhdfs_conn_id='webhdfs_default',
-            *args, **kwargs):
-        super(WebHdfsSensor, self).__init__(*args, **kwargs)
-        self.filepath = filepath
-        self.webhdfs_conn_id = webhdfs_conn_id
-
-    def poke(self, context):
-        from airflow.hooks.webhdfs_hook import WebHDFSHook
-        c = WebHDFSHook(self.webhdfs_conn_id)
-        self.log.info('Poking for file {self.filepath}'.format(**locals()))
-        return c.check_for_path(hdfs_path=self.filepath)
-
-
-class S3KeySensor(BaseSensorOperator):
-    """
-    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
-    S3 being a key/value it does not support folders. The path is just a key
-    a resource.
-
-    :param bucket_key: The key being waited on. Supports full s3:// style url
-        or relative path from root level.
-    :type bucket_key: str
-    :param bucket_name: Name of the S3 bucket
-    :type bucket_name: str
-    :param wildcard_match: whether the bucket_key should be interpreted as a
-        Unix wildcard pattern
-    :type wildcard_match: bool
-    :param aws_conn_id: a reference to the s3 connection
-    :type aws_conn_id: str
-    """
-    template_fields = ('bucket_key', 'bucket_name')
-
-    @apply_defaults
-    def __init__(
-            self, bucket_key,
-            bucket_name=None,
-            wildcard_match=False,
-            aws_conn_id='aws_default',
-            *args, **kwargs):
-        super(S3KeySensor, self).__init__(*args, **kwargs)
-        # Parse
-        if bucket_name is None:
-            parsed_url = urlparse(bucket_key)
-            if parsed_url.netloc == '':
-                raise AirflowException('Please provide a bucket_name')
-            else:
-                bucket_name = parsed_url.netloc
-                if parsed_url.path[0] == '/':
-                    bucket_key = parsed_url.path[1:]
-                else:
-                    bucket_key = parsed_url.path
-        self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
-        self.wildcard_match = wildcard_match
-        self.aws_conn_id = aws_conn_id
-
-    def poke(self, context):
-        from airflow.hooks.S3_hook import S3Hook
-        hook = S3Hook(aws_conn_id=self.aws_conn_id)
-        full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
-        self.log.info('Poking for key : {full_url}'.format(**locals()))
-        if self.wildcard_match:
-            return hook.check_for_wildcard_key(self.bucket_key,
-                                               self.bucket_name)
-        else:
-            return hook.check_for_key(self.bucket_key, self.bucket_name)
-
-
-class S3PrefixSensor(BaseSensorOperator):
-    """
-    Waits for a prefix to exist. A prefix is the first part of a key,
-    thus enabling checking of constructs similar to glob airfl* or
-    SQL LIKE 'airfl%'. There is the possibility to precise a delimiter to
-    indicate the hierarchy or keys, meaning that the match will stop at that
-    delimiter. Current code accepts sane delimiters, i.e. characters that
-    are NOT special characters in the Python regex engine.
-
-    :param bucket_name: Name of the S3 bucket
-    :type bucket_name: str
-    :param prefix: The prefix being waited on. Relative path from bucket root level.
-    :type prefix: str
-    :param delimiter: The delimiter intended to show hierarchy.
-        Defaults to '/'.
-    :type delimiter: str
-    """
-    template_fields = ('prefix', 'bucket_name')
-
-    @apply_defaults
-    def __init__(
-            self, bucket_name,
-            prefix, delimiter='/',
-            aws_conn_id='aws_default',
-            *args, **kwargs):
-        super(S3PrefixSensor, self).__init__(*args, **kwargs)
-        # Parse
-        self.bucket_name = bucket_name
-        self.prefix = prefix
-        self.delimiter = delimiter
-        self.full_url = "s3://" + bucket_name + '/' + prefix
-        self.aws_conn_id = aws_conn_id
-
-    def poke(self, context):
-        self.log.info('Poking for prefix : {self.prefix}\n'
-                      'in bucket s3://{self.bucket_name}'.format(**locals()))
-        from airflow.hooks.S3_hook import S3Hook
-        hook = S3Hook(aws_conn_id=self.aws_conn_id)
-        return hook.check_for_prefix(
-            prefix=self.prefix,
-            delimiter=self.delimiter,
-            bucket_name=self.bucket_name)
-
-
-class TimeSensor(BaseSensorOperator):
-    """
-    Waits until the specified time of the day.
-
-    :param target_time: time after which the job succeeds
-    :type target_time: datetime.time
-    """
-    template_fields = tuple()
-
-    @apply_defaults
-    def __init__(self, target_time, *args, **kwargs):
-        super(TimeSensor, self).__init__(*args, **kwargs)
-        self.target_time = target_time
-
-    def poke(self, context):
-        self.log.info('Checking if the time (%s) has come', self.target_time)
-        return timezone.utcnow().time() > self.target_time
-
-
-class TimeDeltaSensor(BaseSensorOperator):
-    """
-    Waits for a timedelta after the task's execution_date + schedule_interval.
-    In Airflow, the daily task stamped with ``execution_date``
-    2016-01-01 can only start running on 2016-01-02. The timedelta here
-    represents the time after the execution period has closed.
-
-    :param delta: time length to wait after execution_date before succeeding
-    :type delta: datetime.timedelta
-    """
-    template_fields = tuple()
-
-    @apply_defaults
-    def __init__(self, delta, *args, **kwargs):
-        super(TimeDeltaSensor, self).__init__(*args, **kwargs)
-        self.delta = delta
-
-    def poke(self, context):
-        dag = context['dag']
-        target_dttm = dag.following_schedule(context['execution_date'])
-        target_dttm += self.delta
-        self.log.info('Checking if the time (%s) has come', target_dttm)
-        return timezone.utcnow() > target_dttm
-
-
-class HttpSensor(BaseSensorOperator):
-    """
-    Executes a HTTP get statement and returns False on failure:
-        404 not found or response_check function returned False
-
-    :param http_conn_id: The connection to run the sensor against
-    :type http_conn_id: string
-    :param method: The HTTP request method to use
-    :type method: string
-    :param endpoint: The relative part of the full url
-    :type endpoint: string
-    :param request_params: The parameters to be added to the GET url
-    :type request_params: a dictionary of string key/value pairs
-    :param headers: The HTTP headers to be added to the GET request
-    :type headers: a dictionary of string key/value pairs
-    :param response_check: A check against the 'requests' response object.
-        Returns True for 'pass' and False otherwise.
-    :type response_check: A lambda or defined function.
-    :param extra_options: Extra options for the 'requests' library, see the
-        'requests' documentation (options to modify timeout, ssl, etc.)
-    :type extra_options: A dictionary of options, where key is string and value
-        depends on the option that's being modified.
-    """
-
-    template_fields = ('endpoint', 'request_params')
-
-    @apply_defaults
-    def __init__(self,
-                 endpoint,
-                 http_conn_id='http_default',
-                 method='GET',
-                 request_params=None,
-                 headers=None,
-                 response_check=None,
-                 extra_options=None, *args, **kwargs):
-        super(HttpSensor, self).__init__(*args, **kwargs)
-        self.endpoint = endpoint
-        self.http_conn_id = http_conn_id
-        self.request_params = request_params or {}
-        self.headers = headers or {}
-        self.extra_options = extra_options or {}
-        self.response_check = response_check
-
-        self.hook = HttpHook(
-            method=method,
-            http_conn_id=http_conn_id)
-
-    def poke(self, context):
-        self.log.info('Poking: %s', self.endpoint)
-        try:
-            response = self.hook.run(self.endpoint,
-                                     data=self.request_params,
-                                     headers=self.headers,
-                                     extra_options=self.extra_options)
-            if self.response_check:
-                # run content check on response
-                return self.response_check(response)
-        except AirflowException as ae:
-            if str(ae).startswith("404"):
-                return False
-
-            raise ae
-
-        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index d5c3407..22a873c 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -103,6 +103,7 @@ def make_module(name, objects):
 
 # Plugin components to integrate as modules
 operators_modules = []
+sensors_modules = []
 hooks_modules = []
 executors_modules = []
 macros_modules = []
@@ -115,6 +116,9 @@ menu_links = []
 for p in plugins:
     operators_modules.append(
         make_module('airflow.operators.' + p.name, p.operators))
+    sensors_modules.append(
+        make_module('airflow.sensors.' + p.name, p.operators)
+    )
     hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
     executors_modules.append(
         make_module('airflow.executors.' + p.name, p.executors))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py
new file mode 100644
index 0000000..2239467
--- /dev/null
+++ b/airflow/sensors/__init__.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+import os as _os
+
+_sensors = {
+    'base_sensor_operator': ['BaseSensorOperator'],
+    'external_task_sensor': ['ExternalTaskSensor'],
+    'hdfs_sensor': ['HdfsSensor'],
+    'hive_partition_sensor': ['HivePartitionSensor'],
+    'http_sensor': ['HttpSensor'],
+    'metastore_partition_sensor': ['MetastorePartitionSensor'],
+    'named_hive_partition_sensor': ['NamedHivePartitionSensor'],
+    's3_key_sensor': ['S3KeySensor'],
+    's3_prefix_sensor': ['S3PrefixSensor'],
+    'sql_sensor': ['SqlSensor'],
+    'time_delta_sensor': ['TimeDeltaSensor'],
+    'time_sensor': ['TimeSensor'],
+    'web_hdfs_sensor': ['WebHdfsSensor']
+}
+
+if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
+    from airflow.utils.helpers import AirflowImporter
+    airflow_importer = AirflowImporter(sys.modules[__name__], _sensors)
+
+
+def _integrate_plugins():
+    """Integrate plugins to the context"""
+    from airflow.plugins_manager import sensors_modules
+    for sensors_module in sensors_modules:
+        sys.modules[sensors_module.__name__] = sensors_module
+        globals()[sensors_module._name] = sensors_module
+
+        ##########################################################
+        # TODO FIXME Remove in Airflow 2.0
+
+        if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
+            from zope.deprecation import deprecated as _deprecated
+            for _operator in sensors_module._objects:
+                operator_name = _operator.__name__
+                globals()[operator_name] = _operator
+                _deprecated(
+                    operator_name,
+                    "Importing plugin operator '{i}' directly from "
+                    "'airflow.operators' has been deprecated. Please "
+                    "import from 'airflow.operators.[plugin_module]' "
+                    "instead. Support for direct imports will be dropped "
+                    "entirely in Airflow 2.0.".format(i=operator_name))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/base_sensor_operator.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/base_sensor_operator.py b/airflow/sensors/base_sensor_operator.py
new file mode 100644
index 0000000..2a48034
--- /dev/null
+++ b/airflow/sensors/base_sensor_operator.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from time import sleep
+
+from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
+    AirflowSkipException
+from airflow.models import BaseOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseSensorOperator(BaseOperator):
+    """
+    Sensor operators are derived from this class an inherit these attributes.
+
+    Sensor operators keep executing at a time interval and succeed when
+        a criteria is met and fail if and when they time out.
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=60,
+                 timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 *args,
+                 **kwargs):
+        super(BaseSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = timeout
+
+    def poke(self, context):
+        """
+        Function that the sensors defined while deriving this class should
+        override.
+        """
+        raise AirflowException('Override me.')
+
+    def execute(self, context):
+        started_at = timezone.utcnow()
+        while not self.poke(context):
+            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
+                if self.soft_fail:
+                    raise AirflowSkipException('Snap. Time is OUT.')
+                else:
+                    raise AirflowSensorTimeout('Snap. Time is OUT.')
+            sleep(self.poke_interval)
+        self.log.info("Success criteria met. Exiting.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/external_task_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
new file mode 100644
index 0000000..da86c9b
--- /dev/null
+++ b/airflow/sensors/external_task_sensor.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.models import TaskInstance
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.db import provide_session
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.state import State
+
+
+class ExternalTaskSensor(BaseSensorOperator):
+    """
+    Waits for a task to complete in a different DAG
+
+    :param external_dag_id: The dag_id that contains the task you want to
+        wait for
+    :type external_dag_id: string
+    :param external_task_id: The task_id that contains the task you want to
+        wait for
+    :type external_task_id: string
+    :param allowed_states: list of allowed states, default is ``['success']``
+    :type allowed_states: list
+    :param execution_delta: time difference with the previous execution to
+        look at, the default is the same execution_date as the current task.
+        For yesterday, use [positive!] datetime.timedelta(days=1). Either
+        execution_delta or execution_date_fn can be passed to
+        ExternalTaskSensor, but not both.
+    :type execution_delta: datetime.timedelta
+    :param execution_date_fn: function that receives the current execution date
+        and returns the desired execution dates to query. Either execution_delta
+        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
+    :type execution_date_fn: callable
+    """
+    template_fields = ['external_dag_id', 'external_task_id']
+    ui_color = '#19647e'
+
+    @apply_defaults
+    def __init__(self,
+                 external_dag_id,
+                 external_task_id,
+                 allowed_states=None,
+                 execution_delta=None,
+                 execution_date_fn=None,
+                 *args,
+                 **kwargs):
+        super(ExternalTaskSensor, self).__init__(*args, **kwargs)
+        self.allowed_states = allowed_states or [State.SUCCESS]
+        if execution_delta is not None and execution_date_fn is not None:
+            raise ValueError(
+                'Only one of `execution_date` or `execution_date_fn` may'
+                'be provided to ExternalTaskSensor; not both.')
+
+        self.execution_delta = execution_delta
+        self.execution_date_fn = execution_date_fn
+        self.external_dag_id = external_dag_id
+        self.external_task_id = external_task_id
+
+    @provide_session
+    def poke(self, context, session=None):
+        if self.execution_delta:
+            dttm = context['execution_date'] - self.execution_delta
+        elif self.execution_date_fn:
+            dttm = self.execution_date_fn(context['execution_date'])
+        else:
+            dttm = context['execution_date']
+
+        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+        serialized_dttm_filter = ','.join(
+            [datetime.isoformat() for datetime in dttm_filter])
+
+        self.log.info(
+            'Poking for '
+            '{self.external_dag_id}.'
+            '{self.external_task_id} on '
+            '{} ... '.format(serialized_dttm_filter, **locals()))
+        TI = TaskInstance
+
+        count = session.query(TI).filter(
+            TI.dag_id == self.external_dag_id,
+            TI.task_id == self.external_task_id,
+            TI.state.in_(self.allowed_states),
+            TI.execution_date.in_(dttm_filter),
+        ).count()
+        session.commit()
+        return count == len(dttm_filter)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/hdfs_sensor.py b/airflow/sensors/hdfs_sensor.py
new file mode 100644
index 0000000..8d98898
--- /dev/null
+++ b/airflow/sensors/hdfs_sensor.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+from builtins import str
+
+from airflow import settings
+from airflow.hooks.hdfs_hook import HDFSHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class HdfsSensor(BaseSensorOperator):
+    """
+    Waits for a file or folder to land in HDFS
+    """
+    template_fields = ('filepath',)
+    ui_color = settings.WEB_COLORS['LIGHTBLUE']
+
+    @apply_defaults
+    def __init__(self,
+                 filepath,
+                 hdfs_conn_id='hdfs_default',
+                 ignored_ext=['_COPYING_'],
+                 ignore_copying=True,
+                 file_size=None,
+                 hook=HDFSHook,
+                 *args,
+                 **kwargs):
+        super(HdfsSensor, self).__init__(*args, **kwargs)
+        self.filepath = filepath
+        self.hdfs_conn_id = hdfs_conn_id
+        self.file_size = file_size
+        self.ignored_ext = ignored_ext
+        self.ignore_copying = ignore_copying
+        self.hook = hook
+
+    @staticmethod
+    def filter_for_filesize(result, size=None):
+        """
+        Will test the filepath result and test if its size is at least self.filesize
+
+        :param result: a list of dicts returned by Snakebite ls
+        :param size: the file size in MB a file should be at least to trigger True
+        :return: (bool) depending on the matching criteria
+        """
+        if size:
+            log = LoggingMixin().log
+            log.debug(
+                'Filtering for file size >= %s in files: %s',
+                size, map(lambda x: x['path'], result)
+            )
+            size *= settings.MEGABYTE
+            result = [x for x in result if x['length'] >= size]
+            log.debug('HdfsSensor.poke: after size filter result is %s', result)
+        return result
+
+    @staticmethod
+    def filter_for_ignored_ext(result, ignored_ext, ignore_copying):
+        """
+        Will filter if instructed to do so the result to remove matching criteria
+
+        :param result: (list) of dicts returned by Snakebite ls
+        :param ignored_ext: (list) of ignored extensions
+        :param ignore_copying: (bool) shall we ignore ?
+        :return: (list) of dicts which were not removed
+        """
+        if ignore_copying:
+            log = LoggingMixin().log
+            regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
+            ignored_extentions_regex = re.compile(regex_builder)
+            log.debug(
+                'Filtering result for ignored extensions: %s in files %s',
+                ignored_extentions_regex.pattern, map(lambda x: x['path'], result)
+            )
+            result = [x for x in result if not ignored_extentions_regex.match(x['path'])]
+            log.debug('HdfsSensor.poke: after ext filter result is %s', result)
+        return result
+
+    def poke(self, context):
+        sb = self.hook(self.hdfs_conn_id).get_conn()
+        self.log.info('Poking for file {self.filepath}'.format(**locals()))
+        try:
+            # IMOO it's not right here, as there no raise of any kind.
+            # if the filepath is let's say '/data/mydirectory',
+            # it's correct but if it is '/data/mydirectory/*',
+            # it's not correct as the directory exists and sb does not raise any error
+            # here is a quick fix
+            result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
+            self.log.debug('HdfsSensor.poke: result is %s', result)
+            result = self.filter_for_ignored_ext(
+                result, self.ignored_ext, self.ignore_copying
+            )
+            result = self.filter_for_filesize(result, self.file_size)
+            return bool(result)
+        except Exception:
+            e = sys.exc_info()
+            self.log.debug("Caught an exception !: %s", str(e))
+            return False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/hive_partition_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/hive_partition_sensor.py b/airflow/sensors/hive_partition_sensor.py
new file mode 100644
index 0000000..31c4175
--- /dev/null
+++ b/airflow/sensors/hive_partition_sensor.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class HivePartitionSensor(BaseSensorOperator):
+    """
+    Waits for a partition to show up in Hive.
+
+    Note: Because ``partition`` supports general logical operators, it
+    can be inefficient. Consider using NamedHivePartitionSensor instead if
+    you don't need the full flexibility of HivePartitionSensor.
+
+    :param table: The name of the table to wait for, supports the dot
+        notation (my_database.my_table)
+    :type table: string
+    :param partition: The partition clause to wait for. This is passed as
+        is to the metastore Thrift client ``get_partitions_by_filter`` method,
+        and apparently supports SQL like notation as in ``ds='2015-01-01'
+        AND type='value'`` and comparison operators as in ``"ds>=2015-01-01"``
+    :type partition: string
+    :param metastore_conn_id: reference to the metastore thrift service
+        connection id
+    :type metastore_conn_id: str
+    """
+    template_fields = ('schema', 'table', 'partition',)
+    ui_color = '#C5CAE9'
+
+    @apply_defaults
+    def __init__(self,
+                 table, partition="ds='{{ ds }}'",
+                 metastore_conn_id='metastore_default',
+                 schema='default',
+                 poke_interval=60 * 3,
+                 *args,
+                 **kwargs):
+        super(HivePartitionSensor, self).__init__(
+            poke_interval=poke_interval, *args, **kwargs)
+        if not partition:
+            partition = "ds='{{ ds }}'"
+        self.metastore_conn_id = metastore_conn_id
+        self.table = table
+        self.partition = partition
+        self.schema = schema
+
+    def poke(self, context):
+        if '.' in self.table:
+            self.schema, self.table = self.table.split('.')
+        self.log.info(
+            'Poking for table {self.schema}.{self.table}, '
+            'partition {self.partition}'.format(**locals()))
+        if not hasattr(self, 'hook'):
+            from airflow.hooks.hive_hooks import HiveMetastoreHook
+            self.hook = HiveMetastoreHook(
+                metastore_conn_id=self.metastore_conn_id)
+        return self.hook.check_for_partition(
+            self.schema, self.table, self.partition)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/http_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/http_sensor.py b/airflow/sensors/http_sensor.py
new file mode 100644
index 0000000..7d64131
--- /dev/null
+++ b/airflow/sensors/http_sensor.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from builtins import str
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.http_hook import HttpHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class HttpSensor(BaseSensorOperator):
+    """
+    Executes a HTTP get statement and returns False on failure:
+        404 not found or response_check function returned False
+
+    :param http_conn_id: The connection to run the sensor against
+    :type http_conn_id: string
+    :param method: The HTTP request method to use
+    :type method: string
+    :param endpoint: The relative part of the full url
+    :type endpoint: string
+    :param request_params: The parameters to be added to the GET url
+    :type request_params: a dictionary of string key/value pairs
+    :param headers: The HTTP headers to be added to the GET request
+    :type headers: a dictionary of string key/value pairs
+    :param response_check: A check against the 'requests' response object.
+        Returns True for 'pass' and False otherwise.
+    :type response_check: A lambda or defined function.
+    :param extra_options: Extra options for the 'requests' library, see the
+        'requests' documentation (options to modify timeout, ssl, etc.)
+    :type extra_options: A dictionary of options, where key is string and value
+        depends on the option that's being modified.
+    """
+
+    template_fields = ('endpoint', 'request_params')
+
+    @apply_defaults
+    def __init__(self,
+                 endpoint,
+                 http_conn_id='http_default',
+                 method='GET',
+                 request_params=None,
+                 headers=None,
+                 response_check=None,
+                 extra_options=None, *args, **kwargs):
+        super(HttpSensor, self).__init__(*args, **kwargs)
+        self.endpoint = endpoint
+        self.http_conn_id = http_conn_id
+        self.request_params = request_params or {}
+        self.headers = headers or {}
+        self.extra_options = extra_options or {}
+        self.response_check = response_check
+
+        self.hook = HttpHook(
+            method=method,
+            http_conn_id=http_conn_id)
+
+    def poke(self, context):
+        self.log.info('Poking: %s', self.endpoint)
+        try:
+            response = self.hook.run(self.endpoint,
+                                     data=self.request_params,
+                                     headers=self.headers,
+                                     extra_options=self.extra_options)
+            if self.response_check:
+                # run content check on response
+                return self.response_check(response)
+        except AirflowException as ae:
+            if str(ae).startswith("404"):
+                return False
+
+            raise ae
+
+        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/metastore_partition_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/metastore_partition_sensor.py b/airflow/sensors/metastore_partition_sensor.py
new file mode 100644
index 0000000..6f18109
--- /dev/null
+++ b/airflow/sensors/metastore_partition_sensor.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.sql_sensor import SqlSensor
+from airflow.utils.decorators import apply_defaults
+
+
+class MetastorePartitionSensor(SqlSensor):
+    """
+    An alternative to the HivePartitionSensor that talk directly to the
+    MySQL db. This was created as a result of observing sub optimal
+    queries generated by the Metastore thrift service when hitting
+    subpartitioned tables. The Thrift service's queries were written in a
+    way that wouldn't leverage the indexes.
+
+    :param schema: the schema
+    :type schema: str
+    :param table: the table
+    :type table: str
+    :param partition_name: the partition name, as defined in the PARTITIONS
+        table of the Metastore. Order of the fields does matter.
+        Examples: ``ds=2016-01-01`` or
+        ``ds=2016-01-01/sub=foo`` for a sub partitioned table
+    :type partition_name: str
+    :param mysql_conn_id: a reference to the MySQL conn_id for the metastore
+    :type mysql_conn_id: str
+    """
+    template_fields = ('partition_name', 'table', 'schema')
+    ui_color = '#8da7be'
+
+    @apply_defaults
+    def __init__(self,
+                 table,
+                 partition_name,
+                 schema="default",
+                 mysql_conn_id="metastore_mysql",
+                 *args,
+                 **kwargs):
+
+        self.partition_name = partition_name
+        self.table = table
+        self.schema = schema
+        self.first_poke = True
+        self.conn_id = mysql_conn_id
+        # TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.
+        # The problem is the way apply_defaults works isn't compatible with inheritance.
+        # The inheritance model needs to be reworked in order to support overriding args/
+        # kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the
+        # constructor below and apply_defaults will no longer throw an exception.
+        super(SqlSensor, self).__init__(*args, **kwargs)
+
+    def poke(self, context):
+        if self.first_poke:
+            self.first_poke = False
+            if '.' in self.table:
+                self.schema, self.table = self.table.split('.')
+            self.sql = """
+            SELECT 'X'
+            FROM PARTITIONS A0
+            LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID
+            LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID
+            WHERE
+                B0.TBL_NAME = '{self.table}' AND
+                C0.NAME = '{self.schema}' AND
+                A0.PART_NAME = '{self.partition_name}';
+            """.format(self=self)
+        return super(MetastorePartitionSensor, self).poke(context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/named_hive_partition_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/named_hive_partition_sensor.py b/airflow/sensors/named_hive_partition_sensor.py
new file mode 100644
index 0000000..75bdb54
--- /dev/null
+++ b/airflow/sensors/named_hive_partition_sensor.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from past.builtins import basestring
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class NamedHivePartitionSensor(BaseSensorOperator):
+    """
+    Waits for a set of partitions to show up in Hive.
+
+    :param partition_names: List of fully qualified names of the
+        partitions to wait for. A fully qualified name is of the
+        form ``schema.table/pk1=pv1/pk2=pv2``, for example,
+        default.users/ds=2016-01-01. This is passed as is to the metastore
+        Thrift client ``get_partitions_by_name`` method. Note that
+        you cannot use logical or comparison operators as in
+        HivePartitionSensor.
+    :type partition_names: list of strings
+    :param metastore_conn_id: reference to the metastore thrift service
+        connection id
+    :type metastore_conn_id: str
+    """
+
+    template_fields = ('partition_names',)
+    ui_color = '#8d99ae'
+
+    @apply_defaults
+    def __init__(self,
+                 partition_names,
+                 metastore_conn_id='metastore_default',
+                 poke_interval=60 * 3,
+                 *args,
+                 **kwargs):
+        super(NamedHivePartitionSensor, self).__init__(
+            poke_interval=poke_interval, *args, **kwargs)
+
+        if isinstance(partition_names, basestring):
+            raise TypeError('partition_names must be an array of strings')
+
+        self.metastore_conn_id = metastore_conn_id
+        self.partition_names = partition_names
+        self.next_poke_idx = 0
+
+    @classmethod
+    def parse_partition_name(self, partition):
+        try:
+            schema, table_partition = partition.split('.', 1)
+            table, partition = table_partition.split('/', 1)
+            return schema, table, partition
+        except ValueError as e:
+            raise ValueError('Could not parse ' + partition)
+
+    def poke(self, context):
+        if not hasattr(self, 'hook'):
+            from airflow.hooks.hive_hooks import HiveMetastoreHook
+            self.hook = HiveMetastoreHook(
+                metastore_conn_id=self.metastore_conn_id)
+
+        def poke_partition(partition):
+
+            schema, table, partition = self.parse_partition_name(partition)
+
+            self.log.info(
+                'Poking for {schema}.{table}/{partition}'.format(**locals())
+            )
+            return self.hook.check_for_named_partition(
+                schema, table, partition)
+
+        while self.next_poke_idx < len(self.partition_names):
+            if poke_partition(self.partition_names[self.next_poke_idx]):
+                self.next_poke_idx += 1
+            else:
+                return False
+
+        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/s3_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/s3_key_sensor.py b/airflow/sensors/s3_key_sensor.py
new file mode 100644
index 0000000..0d63426
--- /dev/null
+++ b/airflow/sensors/s3_key_sensor.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from urllib.parse import urlparse
+
+from airflow.exceptions import AirflowException
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class S3KeySensor(BaseSensorOperator):
+    """
+    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+    S3 being a key/value it does not support folders. The path is just a key
+    a resource.
+
+    :param bucket_key: The key being waited on. Supports full s3:// style url
+        or relative path from root level.
+    :type bucket_key: str
+    :param bucket_name: Name of the S3 bucket
+    :type bucket_name: str
+    :param wildcard_match: whether the bucket_key should be interpreted as a
+        Unix wildcard pattern
+    :type wildcard_match: bool
+    :param aws_conn_id: a reference to the s3 connection
+    :type aws_conn_id: str
+    """
+    template_fields = ('bucket_key', 'bucket_name')
+
+    @apply_defaults
+    def __init__(self,
+                 bucket_key,
+                 bucket_name=None,
+                 wildcard_match=False,
+                 aws_conn_id='aws_default',
+                 *args,
+                 **kwargs):
+        super(S3KeySensor, self).__init__(*args, **kwargs)
+        # Parse
+        if bucket_name is None:
+            parsed_url = urlparse(bucket_key)
+            if parsed_url.netloc == '':
+                raise AirflowException('Please provide a bucket_name')
+            else:
+                bucket_name = parsed_url.netloc
+                if parsed_url.path[0] == '/':
+                    bucket_key = parsed_url.path[1:]
+                else:
+                    bucket_key = parsed_url.path
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+
+    def poke(self, context):
+        from airflow.hooks.S3_hook import S3Hook
+        hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
+        self.log.info('Poking for key : {full_url}'.format(**locals()))
+        if self.wildcard_match:
+            return hook.check_for_wildcard_key(self.bucket_key,
+                                               self.bucket_name)
+        else:
+            return hook.check_for_key(self.bucket_key, self.bucket_name)