You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:59:16 UTC
[3/3] incubator-airflow git commit: [AIRFLOW-1889] Split sensors into
separate files
[AIRFLOW-1889] Split sensors into separate files
Moving the sensors to seperate files increases
readability of the
code. Also this reduces the code in the big
core.py file.
Closes #2875 from Fokko/AIRFLOW-1889-move-sensors-
to-separate-package
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/33c72042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/33c72042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/33c72042
Branch: refs/heads/master
Commit: 33c720421208696c9cae422e1056c8e914d3329b
Parents: e7c118d
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Fri Jan 19 18:59:08 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 19 18:59:08 2018 +0100
----------------------------------------------------------------------
.gitignore | 1 +
airflow/contrib/operators/fs_operator.py | 56 --
.../sensors/aws_redshift_cluster_sensor.py | 13 +-
airflow/contrib/sensors/bash_sensor.py | 2 +-
airflow/contrib/sensors/bigquery_sensor.py | 2 +-
airflow/contrib/sensors/datadog_sensor.py | 2 +-
airflow/contrib/sensors/emr_base_sensor.py | 2 +-
airflow/contrib/sensors/emr_job_flow_sensor.py | 8 +-
airflow/contrib/sensors/emr_step_sensor.py | 10 +-
airflow/contrib/sensors/file_sensor.py | 56 ++
airflow/contrib/sensors/ftp_sensor.py | 2 +-
airflow/contrib/sensors/gcs_sensor.py | 2 +-
airflow/contrib/sensors/hdfs_sensors.py | 20 +-
airflow/contrib/sensors/jira_sensor.py | 31 +-
airflow/contrib/sensors/pubsub_sensor.py | 2 +-
airflow/contrib/sensors/qubole_sensor.py | 12 +-
airflow/contrib/sensors/redis_key_sensor.py | 2 +-
airflow/contrib/sensors/wasb_sensor.py | 2 +-
airflow/example_dags/example_http_operator.py | 8 +-
airflow/operators/__init__.py | 15 -
airflow/operators/sensors.py | 697 -------------------
airflow/plugins_manager.py | 4 +
airflow/sensors/__init__.py | 60 ++
airflow/sensors/base_sensor_operator.py | 70 ++
airflow/sensors/external_task_sensor.py | 96 +++
airflow/sensors/hdfs_sensor.py | 112 +++
airflow/sensors/hive_partition_sensor.py | 70 ++
airflow/sensors/http_sensor.py | 87 +++
airflow/sensors/metastore_partition_sensor.py | 78 +++
airflow/sensors/named_hive_partition_sensor.py | 89 +++
airflow/sensors/s3_key_sensor.py | 76 ++
airflow/sensors/s3_prefix_sensor.py | 63 ++
airflow/sensors/sql_sensor.py | 53 ++
airflow/sensors/time_delta_sensor.py | 41 ++
airflow/sensors/time_sensor.py | 35 +
airflow/sensors/web_hdfs_sensor.py | 39 ++
docs/code.rst | 43 +-
tests/contrib/operators/test_fs_operator.py | 67 --
.../sensors/test_aws_redshift_cluster_sensor.py | 1 +
tests/contrib/sensors/test_bash_sensor.py | 3 +-
tests/contrib/sensors/test_datadog_sensor.py | 5 +-
tests/contrib/sensors/test_emr_base_sensor.py | 4 +-
tests/contrib/sensors/test_file_sensor.py | 67 ++
tests/contrib/sensors/test_ftp_sensor.py | 2 +-
tests/contrib/sensors/test_hdfs_sensors.py | 3 +-
tests/contrib/sensors/test_jira_sensor_test.py | 12 +-
tests/contrib/sensors/test_pubsub_sensor.py | 3 +-
tests/contrib/sensors/test_qubole_sensor.py | 39 +-
tests/contrib/sensors/test_wasb_sensor.py | 6 +-
tests/core.py | 401 +++++------
tests/operators/__init__.py | 1 -
tests/operators/operators.py | 16 -
tests/operators/sensors.py | 382 ----------
tests/sensors/__init__.py | 13 +
tests/sensors/test_external_task_sensor.py | 244 +++++++
tests/sensors/test_hdfs_sensor.py | 86 +++
tests/sensors/test_http_sensor.py | 195 ++++++
tests/sensors/test_sql_sensor.py | 52 ++
tests/sensors/test_timedelta_sensor.py | 43 ++
tests/sensors/test_timeout_sensor.py | 88 +++
60 files changed, 2121 insertions(+), 1573 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e32065d..0a512df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -112,6 +112,7 @@ ENV/
# PyCharm
.idea/
+*.iml
# Visual Studio Code
.vscode/
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
deleted file mode 100644
index e7640c8..0000000
--- a/airflow/contrib/operators/fs_operator.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from os import walk
-
-from airflow.operators.sensors import BaseSensorOperator
-from airflow.contrib.hooks.fs_hook import FSHook
-from airflow.utils.decorators import apply_defaults
-
-
-class FileSensor(BaseSensorOperator):
- """
- Waits for a file or folder to land in a filesystem
-
- :param fs_conn_id: reference to the File (path)
- connection id
- :type fs_conn_id: string
- :param filepath: File or folder name (relative to
- the base path set within the connection)
- :type fs_conn_id: string
- """
- template_fields = ('filepath',)
- ui_color = '#91818a'
-
- @apply_defaults
- def __init__(
- self,
- filepath,
- fs_conn_id='fs_default2',
- *args, **kwargs):
- super(FileSensor, self).__init__(*args, **kwargs)
- self.filepath = filepath
- self.fs_conn_id = fs_conn_id
-
- def poke(self, context):
- hook = FSHook(self.fs_conn_id)
- basepath = hook.get_path()
- full_path = "/".join([basepath, self.filepath])
- self.log.info('Poking for file {full_path}'.format(**locals()))
- try:
- files = [f for f in walk(full_path)]
- except:
- return False
- return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
index 8db85e6..bf29cbf 100644
--- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
+++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py
@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.hooks.redshift_hook import RedshiftHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -29,11 +29,12 @@ class AwsRedshiftClusterSensor(BaseSensorOperator):
template_fields = ('cluster_identifier', 'target_status')
@apply_defaults
- def __init__(
- self, cluster_identifier,
- target_status='available',
- aws_conn_id='aws_default',
- *args, **kwargs):
+ def __init__(self,
+ cluster_identifier,
+ target_status='available',
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
super(AwsRedshiftClusterSensor, self).__init__(*args, **kwargs)
self.cluster_identifier = cluster_identifier
self.target_status = target_status
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/bash_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py
index 760ee2c..1781a0f 100644
--- a/airflow/contrib/sensors/bash_sensor.py
+++ b/airflow/contrib/sensors/bash_sensor.py
@@ -17,7 +17,7 @@ import os
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile
from airflow.utils.decorators import apply_defaults
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.file import TemporaryDirectory
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 90a3264..08d5a42 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index e1a9169..09efc85 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.contrib.hooks.datadog_hook import DatadogHook
from airflow.utils import apply_defaults
from airflow.exceptions import AirflowException
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index c6f96f8..552c8c6 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import apply_defaults
from airflow.exceptions import AirflowException
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index a437fc3..b38d1da 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -31,10 +31,10 @@ class EmrJobFlowSensor(EmrBaseSensor):
template_ext = ()
@apply_defaults
- def __init__(
- self,
- job_flow_id,
- *args, **kwargs):
+ def __init__(self,
+ job_flow_id,
+ *args,
+ **kwargs):
super(EmrJobFlowSensor, self).__init__(*args, **kwargs)
self.job_flow_id = job_flow_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index c5a450d..543efe4 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -33,11 +33,11 @@ class EmrStepSensor(EmrBaseSensor):
template_ext = ()
@apply_defaults
- def __init__(
- self,
- job_flow_id,
- step_id,
- *args, **kwargs):
+ def __init__(self,
+ job_flow_id,
+ step_id,
+ *args,
+ **kwargs):
super(EmrStepSensor, self).__init__(*args, **kwargs)
self.job_flow_id = job_flow_id
self.step_id = step_id
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/file_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/file_sensor.py b/airflow/contrib/sensors/file_sensor.py
new file mode 100644
index 0000000..fdc1e67
--- /dev/null
+++ b/airflow/contrib/sensors/file_sensor.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from os import walk
+
+from airflow.contrib.hooks.fs_hook import FSHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class FileSensor(BaseSensorOperator):
+ """
+ Waits for a file or folder to land in a filesystem
+
+ :param fs_conn_id: reference to the File (path)
+ connection id
+ :type fs_conn_id: string
+ :param filepath: File or folder name (relative to
+ the base path set within the connection)
+ :type fs_conn_id: string
+ """
+ template_fields = ('filepath',)
+ ui_color = '#91818a'
+
+ @apply_defaults
+ def __init__(self,
+ filepath,
+ fs_conn_id='fs_default2',
+ *args,
+ **kwargs):
+ super(FileSensor, self).__init__(*args, **kwargs)
+ self.filepath = filepath
+ self.fs_conn_id = fs_conn_id
+
+ def poke(self, context):
+ hook = FSHook(self.fs_conn_id)
+ basepath = hook.get_path()
+ full_path = "/".join([basepath, self.filepath])
+ self.log.info('Poking for file {full_path}'.format(**locals()))
+ try:
+ files = [f for f in walk(full_path)]
+ except OSError:
+ return False
+ return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index bd66c32..5fb31c3 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -14,7 +14,7 @@
import ftplib
from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index a45923a..d91e2b1 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 1893f01..c96005a 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -11,14 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from airflow.operators.sensors import HdfsSensor
+from airflow.sensors.hdfs_sensor import HdfsSensor
class HdfsSensorRegex(HdfsSensor):
- def __init__(
- self,
- regex,
- *args, **kwargs):
+ def __init__(self,
+ regex,
+ *args,
+ **kwargs):
super(HdfsSensorRegex, self).__init__(*args, **kwargs)
self.regex = regex
@@ -39,10 +39,10 @@ class HdfsSensorRegex(HdfsSensor):
class HdfsSensorFolder(HdfsSensor):
- def __init__(
- self,
- be_empty=False,
- *args, **kwargs):
+ def __init__(self,
+ be_empty=False,
+ *args,
+ **kwargs):
super(HdfsSensorFolder, self).__init__(*args, **kwargs)
self.be_empty = be_empty
@@ -62,5 +62,3 @@ class HdfsSensorFolder(HdfsSensor):
self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
result.pop(0)
return bool(result) and result[0]['file_type'] == 'f'
-
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 1dc7b50..a8a0df7 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -15,7 +15,7 @@ from jira.resources import Resource
from airflow.contrib.operators.jira_operator import JIRAError
from airflow.contrib.operators.jira_operator import JiraOperator
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -83,7 +83,8 @@ class JiraTicketSensor(JiraSensor):
field=None,
expected_value=None,
field_checker_func=None,
- *args, **kwargs):
+ *args,
+ **kwargs):
self.jira_conn_id = jira_conn_id
self.ticket_id = ticket_id
@@ -94,7 +95,8 @@ class JiraTicketSensor(JiraSensor):
super(JiraTicketSensor, self).__init__(jira_conn_id=jira_conn_id,
result_processor=field_checker_func,
- *args, **kwargs)
+ *args,
+ **kwargs)
def poke(self, context):
self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
@@ -110,18 +112,17 @@ class JiraTicketSensor(JiraSensor):
result = None
try:
if issue is not None \
- and self.field is not None \
- and self.expected_value is not None:
-
- field_value = getattr(issue.fields, self.field)
- if field_value is not None:
- if isinstance(field_value, list):
- result = self.expected_value in field_value
- elif isinstance(field_value, str):
- result = self.expected_value.lower() == field_value.lower()
- elif isinstance(field_value, Resource) \
- and getattr(field_value, 'name'):
- result = self.expected_value.lower() == field_value.name.lower()
+ and self.field is not None \
+ and self.expected_value is not None:
+
+ field_val = getattr(issue.fields, self.field)
+ if field_val is not None:
+ if isinstance(field_val, list):
+ result = self.expected_value in field_val
+ elif isinstance(field_val, str):
+ result = self.expected_value.lower() == field_val.lower()
+ elif isinstance(field_val, Resource) and getattr(field_val, 'name'):
+ result = self.expected_value.lower() == field_val.name.lower()
else:
self.log.warning(
"Not implemented checker for issue field %s which "
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/pubsub_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/pubsub_sensor.py b/airflow/contrib/sensors/pubsub_sensor.py
index 112f777..c591d55 100644
--- a/airflow/contrib/sensors/pubsub_sensor.py
+++ b/airflow/contrib/sensors/pubsub_sensor.py
@@ -13,7 +13,7 @@
# limitations under the License.
from airflow.contrib.hooks.gcp_pubsub_hook import PubSubHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/qubole_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/qubole_sensor.py b/airflow/contrib/sensors/qubole_sensor.py
index 6915361..1045040 100644
--- a/airflow/contrib/sensors/qubole_sensor.py
+++ b/airflow/contrib/sensors/qubole_sensor.py
@@ -12,17 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import print_function
-from future import standard_library
-standard_library.install_aliases()
+from qds_sdk.qubole import Qubole
+from qds_sdk.sensors import FileSensor, PartitionSensor
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
-from airflow.exceptions import AirflowException
-
-from qds_sdk.qubole import Qubole
-from qds_sdk.sensors import FileSensor, PartitionSensor
class QuboleSensor(BaseSensorOperator):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 6cc314b..9328f21 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.contrib.hooks.redis_hook import RedisHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 4295a25..50ce68b 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -13,7 +13,7 @@
# limitations under the License.
#
from airflow.contrib.hooks.wasb_hook import WasbHook
-from airflow.operators.sensors import BaseSensorOperator
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 10fc3b1..2878d50 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -14,13 +14,13 @@
"""
### Example HTTP operator and sensor
"""
+import json
+from datetime import timedelta
+
import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
-from airflow.operators.sensors import HttpSensor
-from datetime import timedelta
-import json
-
+from airflow.sensors.http_sensor import HttpSensor
default_args = {
'owner': 'airflow',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 50b05ff..c171fb9 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -62,21 +62,6 @@ _operators = {
'sqlite_operator': ['SqliteOperator'],
'mysql_to_hive': ['MySqlToHiveTransfer'],
'postgres_operator': ['PostgresOperator'],
- 'sensors': [
- 'BaseSensorOperator',
- 'ExternalTaskSensor',
- 'HdfsSensor',
- 'HivePartitionSensor',
- 'HttpSensor',
- 'MetastorePartitionSensor',
- 'NamedHivePartitionSensor',
- 'S3KeySensor',
- 'S3PrefixSensor',
- 'SqlSensor',
- 'TimeDeltaSensor',
- 'TimeSensor',
- 'WebHdfsSensor',
- ],
'subdag_operator': ['SubDagOperator'],
'hive_stats_operator': ['HiveStatsCollectionOperator'],
's3_to_hive_operator': ['S3ToHiveTransfer'],
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
deleted file mode 100644
index e9bf7ff..0000000
--- a/airflow/operators/sensors.py
+++ /dev/null
@@ -1,697 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-from future import standard_library
-
-from airflow.utils.log.logging_mixin import LoggingMixin
-
-standard_library.install_aliases()
-from builtins import str
-from past.builtins import basestring
-
-from airflow.utils import timezone
-from urllib.parse import urlparse
-from time import sleep
-import re
-import sys
-
-from airflow import settings
-from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException
-from airflow.models import BaseOperator, TaskInstance
-from airflow.hooks.base_hook import BaseHook
-from airflow.hooks.hdfs_hook import HDFSHook
-from airflow.hooks.http_hook import HttpHook
-from airflow.utils.state import State
-from airflow.utils.db import provide_session
-from airflow.utils.decorators import apply_defaults
-
-
-class BaseSensorOperator(BaseOperator):
- '''
- Sensor operators are derived from this class an inherit these attributes.
-
- Sensor operators keep executing at a time interval and succeed when
- a criteria is met and fail if and when they time out.
-
- :param soft_fail: Set to true to mark the task as SKIPPED on failure
- :type soft_fail: bool
- :param poke_interval: Time in seconds that the job should wait in
- between each tries
- :type poke_interval: int
- :param timeout: Time, in seconds before the task times out and fails.
- :type timeout: int
- '''
- ui_color = '#e6f1f2'
-
- @apply_defaults
- def __init__(
- self,
- poke_interval=60,
- timeout=60*60*24*7,
- soft_fail=False,
- *args, **kwargs):
- super(BaseSensorOperator, self).__init__(*args, **kwargs)
- self.poke_interval = poke_interval
- self.soft_fail = soft_fail
- self.timeout = timeout
-
- def poke(self, context):
- '''
- Function that the sensors defined while deriving this class should
- override.
- '''
- raise AirflowException('Override me.')
-
- def execute(self, context):
- started_at = timezone.utcnow()
- while not self.poke(context):
- if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
- if self.soft_fail:
- raise AirflowSkipException('Snap. Time is OUT.')
- else:
- raise AirflowSensorTimeout('Snap. Time is OUT.')
- sleep(self.poke_interval)
- self.log.info("Success criteria met. Exiting.")
-
-
-class SqlSensor(BaseSensorOperator):
- """
- Runs a sql statement until a criteria is met. It will keep trying while
- sql returns no row, or if the first cell in (0, '0', '').
-
- :param conn_id: The connection to run the sensor against
- :type conn_id: string
- :param sql: The sql to run. To pass, it needs to return at least one cell
- that contains a non-zero / empty string value.
- """
- template_fields = ('sql',)
- template_ext = ('.hql', '.sql',)
- ui_color = '#7c7287'
-
- @apply_defaults
- def __init__(self, conn_id, sql, *args, **kwargs):
- self.sql = sql
- self.conn_id = conn_id
- super(SqlSensor, self).__init__(*args, **kwargs)
-
- def poke(self, context):
- hook = BaseHook.get_connection(self.conn_id).get_hook()
-
- self.log.info('Poking: %s', self.sql)
- records = hook.get_records(self.sql)
- if not records:
- return False
- else:
- if str(records[0][0]) in ('0', '',):
- return False
- else:
- return True
-
-
-class MetastorePartitionSensor(SqlSensor):
- """
- An alternative to the HivePartitionSensor that talk directly to the
- MySQL db. This was created as a result of observing sub optimal
- queries generated by the Metastore thrift service when hitting
- subpartitioned tables. The Thrift service's queries were written in a
- way that wouldn't leverage the indexes.
-
- :param schema: the schema
- :type schema: str
- :param table: the table
- :type table: str
- :param partition_name: the partition name, as defined in the PARTITIONS
- table of the Metastore. Order of the fields does matter.
- Examples: ``ds=2016-01-01`` or
- ``ds=2016-01-01/sub=foo`` for a sub partitioned table
- :type partition_name: str
- :param mysql_conn_id: a reference to the MySQL conn_id for the metastore
- :type mysql_conn_id: str
- """
- template_fields = ('partition_name', 'table', 'schema')
- ui_color = '#8da7be'
-
- @apply_defaults
- def __init__(
- self, table, partition_name, schema="default",
- mysql_conn_id="metastore_mysql",
- *args, **kwargs):
-
- self.partition_name = partition_name
- self.table = table
- self.schema = schema
- self.first_poke = True
- self.conn_id = mysql_conn_id
- # TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.
- # The problem is the way apply_defaults works isn't compatible with inheritance.
- # The inheritance model needs to be reworked in order to support overriding args/
- # kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the
- # constructor below and apply_defaults will no longer throw an exception.
- super(SqlSensor, self).__init__(*args, **kwargs)
-
- def poke(self, context):
- if self.first_poke:
- self.first_poke = False
- if '.' in self.table:
- self.schema, self.table = self.table.split('.')
- self.sql = """
- SELECT 'X'
- FROM PARTITIONS A0
- LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID
- LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID
- WHERE
- B0.TBL_NAME = '{self.table}' AND
- C0.NAME = '{self.schema}' AND
- A0.PART_NAME = '{self.partition_name}';
- """.format(self=self)
- return super(MetastorePartitionSensor, self).poke(context)
-
-
-class ExternalTaskSensor(BaseSensorOperator):
- """
- Waits for a task to complete in a different DAG
-
- :param external_dag_id: The dag_id that contains the task you want to
- wait for
- :type external_dag_id: string
- :param external_task_id: The task_id that contains the task you want to
- wait for
- :type external_task_id: string
- :param allowed_states: list of allowed states, default is ``['success']``
- :type allowed_states: list
- :param execution_delta: time difference with the previous execution to
- look at, the default is the same execution_date as the current task.
- For yesterday, use [positive!] datetime.timedelta(days=1). Either
- execution_delta or execution_date_fn can be passed to
- ExternalTaskSensor, but not both.
- :type execution_delta: datetime.timedelta
- :param execution_date_fn: function that receives the current execution date
- and returns the desired execution dates to query. Either execution_delta
- or execution_date_fn can be passed to ExternalTaskSensor, but not both.
- :type execution_date_fn: callable
- """
- template_fields = ['external_dag_id', 'external_task_id']
- ui_color = '#19647e'
-
- @apply_defaults
- def __init__(
- self,
- external_dag_id,
- external_task_id,
- allowed_states=None,
- execution_delta=None,
- execution_date_fn=None,
- *args, **kwargs):
- super(ExternalTaskSensor, self).__init__(*args, **kwargs)
- self.allowed_states = allowed_states or [State.SUCCESS]
- if execution_delta is not None and execution_date_fn is not None:
- raise ValueError(
- 'Only one of `execution_date` or `execution_date_fn` may'
- 'be provided to ExternalTaskSensor; not both.')
-
- self.execution_delta = execution_delta
- self.execution_date_fn = execution_date_fn
- self.external_dag_id = external_dag_id
- self.external_task_id = external_task_id
-
- @provide_session
- def poke(self, context, session=None):
- if self.execution_delta:
- dttm = context['execution_date'] - self.execution_delta
- elif self.execution_date_fn:
- dttm = self.execution_date_fn(context['execution_date'])
- else:
- dttm = context['execution_date']
-
- dttm_filter = dttm if isinstance(dttm, list) else [dttm]
- serialized_dttm_filter = ','.join(
- [datetime.isoformat() for datetime in dttm_filter])
-
- self.log.info(
- 'Poking for '
- '{self.external_dag_id}.'
- '{self.external_task_id} on '
- '{} ... '.format(serialized_dttm_filter, **locals()))
- TI = TaskInstance
-
- count = session.query(TI).filter(
- TI.dag_id == self.external_dag_id,
- TI.task_id == self.external_task_id,
- TI.state.in_(self.allowed_states),
- TI.execution_date.in_(dttm_filter),
- ).count()
- session.commit()
- return count == len(dttm_filter)
-
-
-class NamedHivePartitionSensor(BaseSensorOperator):
- """
- Waits for a set of partitions to show up in Hive.
-
- :param partition_names: List of fully qualified names of the
- partitions to wait for. A fully qualified name is of the
- form ``schema.table/pk1=pv1/pk2=pv2``, for example,
- default.users/ds=2016-01-01. This is passed as is to the metastore
- Thrift client ``get_partitions_by_name`` method. Note that
- you cannot use logical or comparison operators as in
- HivePartitionSensor.
- :type partition_names: list of strings
- :param metastore_conn_id: reference to the metastore thrift service
- connection id
- :type metastore_conn_id: str
- """
-
- template_fields = ('partition_names', )
- ui_color = '#8d99ae'
-
- @apply_defaults
- def __init__(
- self,
- partition_names,
- metastore_conn_id='metastore_default',
- poke_interval=60 * 3,
- *args,
- **kwargs):
- super(NamedHivePartitionSensor, self).__init__(
- poke_interval=poke_interval, *args, **kwargs)
-
- if isinstance(partition_names, basestring):
- raise TypeError('partition_names must be an array of strings')
-
- self.metastore_conn_id = metastore_conn_id
- self.partition_names = partition_names
- self.next_poke_idx = 0
-
- @classmethod
- def parse_partition_name(self, partition):
- try:
- schema, table_partition = partition.split('.', 1)
- table, partition = table_partition.split('/', 1)
- return schema, table, partition
- except ValueError as e:
- raise ValueError('Could not parse ' + partition)
-
- def poke(self, context):
- if not hasattr(self, 'hook'):
- from airflow.hooks.hive_hooks import HiveMetastoreHook
- self.hook = HiveMetastoreHook(
- metastore_conn_id=self.metastore_conn_id)
-
- def poke_partition(partition):
-
- schema, table, partition = self.parse_partition_name(partition)
-
- self.log.info(
- 'Poking for {schema}.{table}/{partition}'.format(**locals())
- )
- return self.hook.check_for_named_partition(
- schema, table, partition)
-
- while self.next_poke_idx < len(self.partition_names):
- if poke_partition(self.partition_names[self.next_poke_idx]):
- self.next_poke_idx += 1
- else:
- return False
-
- return True
-
-
-class HivePartitionSensor(BaseSensorOperator):
- """
- Waits for a partition to show up in Hive.
-
- Note: Because ``partition`` supports general logical operators, it
- can be inefficient. Consider using NamedHivePartitionSensor instead if
- you don't need the full flexibility of HivePartitionSensor.
-
- :param table: The name of the table to wait for, supports the dot
- notation (my_database.my_table)
- :type table: string
- :param partition: The partition clause to wait for. This is passed as
- is to the metastore Thrift client ``get_partitions_by_filter`` method,
- and apparently supports SQL like notation as in ``ds='2015-01-01'
- AND type='value'`` and comparison operators as in ``"ds>=2015-01-01"``
- :type partition: string
- :param metastore_conn_id: reference to the metastore thrift service
- connection id
- :type metastore_conn_id: str
- """
- template_fields = ('schema', 'table', 'partition',)
- ui_color = '#C5CAE9'
-
- @apply_defaults
- def __init__(
- self,
- table, partition="ds='{{ ds }}'",
- metastore_conn_id='metastore_default',
- schema='default',
- poke_interval=60*3,
- *args, **kwargs):
- super(HivePartitionSensor, self).__init__(
- poke_interval=poke_interval, *args, **kwargs)
- if not partition:
- partition = "ds='{{ ds }}'"
- self.metastore_conn_id = metastore_conn_id
- self.table = table
- self.partition = partition
- self.schema = schema
-
- def poke(self, context):
- if '.' in self.table:
- self.schema, self.table = self.table.split('.')
- self.log.info(
- 'Poking for table {self.schema}.{self.table}, '
- 'partition {self.partition}'.format(**locals()))
- if not hasattr(self, 'hook'):
- from airflow.hooks.hive_hooks import HiveMetastoreHook
- self.hook = HiveMetastoreHook(
- metastore_conn_id=self.metastore_conn_id)
- return self.hook.check_for_partition(
- self.schema, self.table, self.partition)
-
-
-class HdfsSensor(BaseSensorOperator):
- """
- Waits for a file or folder to land in HDFS
- """
- template_fields = ('filepath',)
- ui_color = settings.WEB_COLORS['LIGHTBLUE']
-
- @apply_defaults
- def __init__(
- self,
- filepath,
- hdfs_conn_id='hdfs_default',
- ignored_ext=['_COPYING_'],
- ignore_copying=True,
- file_size=None,
- hook=HDFSHook,
- *args, **kwargs):
- super(HdfsSensor, self).__init__(*args, **kwargs)
- self.filepath = filepath
- self.hdfs_conn_id = hdfs_conn_id
- self.file_size = file_size
- self.ignored_ext = ignored_ext
- self.ignore_copying = ignore_copying
- self.hook = hook
-
- @staticmethod
- def filter_for_filesize(result, size=None):
- """
- Will test the filepath result and test if its size is at least self.filesize
-
- :param result: a list of dicts returned by Snakebite ls
- :param size: the file size in MB a file should be at least to trigger True
- :return: (bool) depending on the matching criteria
- """
- if size:
- log = LoggingMixin().log
- log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
- size *= settings.MEGABYTE
- result = [x for x in result if x['length'] >= size]
- log.debug('HdfsSensor.poke: after size filter result is %s', result)
- return result
-
- @staticmethod
- def filter_for_ignored_ext(result, ignored_ext, ignore_copying):
- """
- Will filter if instructed to do so the result to remove matching criteria
-
- :param result: (list) of dicts returned by Snakebite ls
- :param ignored_ext: (list) of ignored extensions
- :param ignore_copying: (bool) shall we ignore ?
- :return: (list) of dicts which were not removed
- """
- if ignore_copying:
- log = LoggingMixin().log
- regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
- ignored_extentions_regex = re.compile(regex_builder)
- log.debug(
- 'Filtering result for ignored extensions: %s in files %s',
- ignored_extentions_regex.pattern, map(lambda x: x['path'], result)
- )
- result = [x for x in result if not ignored_extentions_regex.match(x['path'])]
- log.debug('HdfsSensor.poke: after ext filter result is %s', result)
- return result
-
- def poke(self, context):
- sb = self.hook(self.hdfs_conn_id).get_conn()
- self.log.info('Poking for file {self.filepath}'.format(**locals()))
- try:
- # IMOO it's not right here, as there no raise of any kind.
- # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
- # it's not correct as the directory exists and sb does not raise any error
- # here is a quick fix
- result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
- self.log.debug('HdfsSensor.poke: result is %s', result)
- result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
- result = self.filter_for_filesize(result, self.file_size)
- return bool(result)
- except:
- e = sys.exc_info()
- self.log.debug("Caught an exception !: %s", str(e))
- return False
-
-
-class WebHdfsSensor(BaseSensorOperator):
- """
- Waits for a file or folder to land in HDFS
- """
- template_fields = ('filepath',)
-
- @apply_defaults
- def __init__(
- self,
- filepath,
- webhdfs_conn_id='webhdfs_default',
- *args, **kwargs):
- super(WebHdfsSensor, self).__init__(*args, **kwargs)
- self.filepath = filepath
- self.webhdfs_conn_id = webhdfs_conn_id
-
- def poke(self, context):
- from airflow.hooks.webhdfs_hook import WebHDFSHook
- c = WebHDFSHook(self.webhdfs_conn_id)
- self.log.info('Poking for file {self.filepath}'.format(**locals()))
- return c.check_for_path(hdfs_path=self.filepath)
-
-
-class S3KeySensor(BaseSensorOperator):
- """
- Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
- S3 being a key/value it does not support folders. The path is just a key
- a resource.
-
- :param bucket_key: The key being waited on. Supports full s3:// style url
- or relative path from root level.
- :type bucket_key: str
- :param bucket_name: Name of the S3 bucket
- :type bucket_name: str
- :param wildcard_match: whether the bucket_key should be interpreted as a
- Unix wildcard pattern
- :type wildcard_match: bool
- :param aws_conn_id: a reference to the s3 connection
- :type aws_conn_id: str
- """
- template_fields = ('bucket_key', 'bucket_name')
-
- @apply_defaults
- def __init__(
- self, bucket_key,
- bucket_name=None,
- wildcard_match=False,
- aws_conn_id='aws_default',
- *args, **kwargs):
- super(S3KeySensor, self).__init__(*args, **kwargs)
- # Parse
- if bucket_name is None:
- parsed_url = urlparse(bucket_key)
- if parsed_url.netloc == '':
- raise AirflowException('Please provide a bucket_name')
- else:
- bucket_name = parsed_url.netloc
- if parsed_url.path[0] == '/':
- bucket_key = parsed_url.path[1:]
- else:
- bucket_key = parsed_url.path
- self.bucket_name = bucket_name
- self.bucket_key = bucket_key
- self.wildcard_match = wildcard_match
- self.aws_conn_id = aws_conn_id
-
- def poke(self, context):
- from airflow.hooks.S3_hook import S3Hook
- hook = S3Hook(aws_conn_id=self.aws_conn_id)
- full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
- self.log.info('Poking for key : {full_url}'.format(**locals()))
- if self.wildcard_match:
- return hook.check_for_wildcard_key(self.bucket_key,
- self.bucket_name)
- else:
- return hook.check_for_key(self.bucket_key, self.bucket_name)
-
-
-class S3PrefixSensor(BaseSensorOperator):
- """
- Waits for a prefix to exist. A prefix is the first part of a key,
- thus enabling checking of constructs similar to glob airfl* or
- SQL LIKE 'airfl%'. There is the possibility to precise a delimiter to
- indicate the hierarchy or keys, meaning that the match will stop at that
- delimiter. Current code accepts sane delimiters, i.e. characters that
- are NOT special characters in the Python regex engine.
-
- :param bucket_name: Name of the S3 bucket
- :type bucket_name: str
- :param prefix: The prefix being waited on. Relative path from bucket root level.
- :type prefix: str
- :param delimiter: The delimiter intended to show hierarchy.
- Defaults to '/'.
- :type delimiter: str
- """
- template_fields = ('prefix', 'bucket_name')
-
- @apply_defaults
- def __init__(
- self, bucket_name,
- prefix, delimiter='/',
- aws_conn_id='aws_default',
- *args, **kwargs):
- super(S3PrefixSensor, self).__init__(*args, **kwargs)
- # Parse
- self.bucket_name = bucket_name
- self.prefix = prefix
- self.delimiter = delimiter
- self.full_url = "s3://" + bucket_name + '/' + prefix
- self.aws_conn_id = aws_conn_id
-
- def poke(self, context):
- self.log.info('Poking for prefix : {self.prefix}\n'
- 'in bucket s3://{self.bucket_name}'.format(**locals()))
- from airflow.hooks.S3_hook import S3Hook
- hook = S3Hook(aws_conn_id=self.aws_conn_id)
- return hook.check_for_prefix(
- prefix=self.prefix,
- delimiter=self.delimiter,
- bucket_name=self.bucket_name)
-
-
-class TimeSensor(BaseSensorOperator):
- """
- Waits until the specified time of the day.
-
- :param target_time: time after which the job succeeds
- :type target_time: datetime.time
- """
- template_fields = tuple()
-
- @apply_defaults
- def __init__(self, target_time, *args, **kwargs):
- super(TimeSensor, self).__init__(*args, **kwargs)
- self.target_time = target_time
-
- def poke(self, context):
- self.log.info('Checking if the time (%s) has come', self.target_time)
- return timezone.utcnow().time() > self.target_time
-
-
-class TimeDeltaSensor(BaseSensorOperator):
- """
- Waits for a timedelta after the task's execution_date + schedule_interval.
- In Airflow, the daily task stamped with ``execution_date``
- 2016-01-01 can only start running on 2016-01-02. The timedelta here
- represents the time after the execution period has closed.
-
- :param delta: time length to wait after execution_date before succeeding
- :type delta: datetime.timedelta
- """
- template_fields = tuple()
-
- @apply_defaults
- def __init__(self, delta, *args, **kwargs):
- super(TimeDeltaSensor, self).__init__(*args, **kwargs)
- self.delta = delta
-
- def poke(self, context):
- dag = context['dag']
- target_dttm = dag.following_schedule(context['execution_date'])
- target_dttm += self.delta
- self.log.info('Checking if the time (%s) has come', target_dttm)
- return timezone.utcnow() > target_dttm
-
-
-class HttpSensor(BaseSensorOperator):
- """
- Executes a HTTP get statement and returns False on failure:
- 404 not found or response_check function returned False
-
- :param http_conn_id: The connection to run the sensor against
- :type http_conn_id: string
- :param method: The HTTP request method to use
- :type method: string
- :param endpoint: The relative part of the full url
- :type endpoint: string
- :param request_params: The parameters to be added to the GET url
- :type request_params: a dictionary of string key/value pairs
- :param headers: The HTTP headers to be added to the GET request
- :type headers: a dictionary of string key/value pairs
- :param response_check: A check against the 'requests' response object.
- Returns True for 'pass' and False otherwise.
- :type response_check: A lambda or defined function.
- :param extra_options: Extra options for the 'requests' library, see the
- 'requests' documentation (options to modify timeout, ssl, etc.)
- :type extra_options: A dictionary of options, where key is string and value
- depends on the option that's being modified.
- """
-
- template_fields = ('endpoint', 'request_params')
-
- @apply_defaults
- def __init__(self,
- endpoint,
- http_conn_id='http_default',
- method='GET',
- request_params=None,
- headers=None,
- response_check=None,
- extra_options=None, *args, **kwargs):
- super(HttpSensor, self).__init__(*args, **kwargs)
- self.endpoint = endpoint
- self.http_conn_id = http_conn_id
- self.request_params = request_params or {}
- self.headers = headers or {}
- self.extra_options = extra_options or {}
- self.response_check = response_check
-
- self.hook = HttpHook(
- method=method,
- http_conn_id=http_conn_id)
-
- def poke(self, context):
- self.log.info('Poking: %s', self.endpoint)
- try:
- response = self.hook.run(self.endpoint,
- data=self.request_params,
- headers=self.headers,
- extra_options=self.extra_options)
- if self.response_check:
- # run content check on response
- return self.response_check(response)
- except AirflowException as ae:
- if str(ae).startswith("404"):
- return False
-
- raise ae
-
- return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index d5c3407..22a873c 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -103,6 +103,7 @@ def make_module(name, objects):
# Plugin components to integrate as modules
operators_modules = []
+sensors_modules = []
hooks_modules = []
executors_modules = []
macros_modules = []
@@ -115,6 +116,9 @@ menu_links = []
for p in plugins:
operators_modules.append(
make_module('airflow.operators.' + p.name, p.operators))
+ sensors_modules.append(
+ make_module('airflow.sensors.' + p.name, p.operators)
+ )
hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
executors_modules.append(
make_module('airflow.executors.' + p.name, p.executors))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py
new file mode 100644
index 0000000..2239467
--- /dev/null
+++ b/airflow/sensors/__init__.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+import os as _os
+
+_sensors = {
+ 'base_sensor_operator': ['BaseSensorOperator'],
+ 'external_task_sensor': ['ExternalTaskSensor'],
+ 'hdfs_sensor': ['HdfsSensor'],
+ 'hive_partition_sensor': ['HivePartitionSensor'],
+ 'http_sensor': ['HttpSensor'],
+ 'metastore_partition_sensor': ['MetastorePartitionSensor'],
+ 'named_hive_partition_sensor': ['NamedHivePartitionSensor'],
+ 's3_key_sensor': ['S3KeySensor'],
+ 's3_prefix_sensor': ['S3PrefixSensor'],
+ 'sql_sensor': ['SqlSensor'],
+ 'time_delta_sensor': ['TimeDeltaSensor'],
+ 'time_sensor': ['TimeSensor'],
+ 'web_hdfs_sensor': ['WebHdfsSensor']
+}
+
+if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
+ from airflow.utils.helpers import AirflowImporter
+ airflow_importer = AirflowImporter(sys.modules[__name__], _sensors)
+
+
+def _integrate_plugins():
+ """Integrate plugins to the context"""
+ from airflow.plugins_manager import sensors_modules
+ for sensors_module in sensors_modules:
+ sys.modules[sensors_module.__name__] = sensors_module
+ globals()[sensors_module._name] = sensors_module
+
+ ##########################################################
+ # TODO FIXME Remove in Airflow 2.0
+
+ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
+ from zope.deprecation import deprecated as _deprecated
+ for _operator in sensors_module._objects:
+ operator_name = _operator.__name__
+ globals()[operator_name] = _operator
+ _deprecated(
+ operator_name,
+ "Importing plugin operator '{i}' directly from "
+ "'airflow.operators' has been deprecated. Please "
+ "import from 'airflow.operators.[plugin_module]' "
+ "instead. Support for direct imports will be dropped "
+ "entirely in Airflow 2.0.".format(i=operator_name))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/base_sensor_operator.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/base_sensor_operator.py b/airflow/sensors/base_sensor_operator.py
new file mode 100644
index 0000000..2a48034
--- /dev/null
+++ b/airflow/sensors/base_sensor_operator.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from time import sleep
+
+from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
+ AirflowSkipException
+from airflow.models import BaseOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseSensorOperator(BaseOperator):
+ """
+ Sensor operators are derived from this class an inherit these attributes.
+
+ Sensor operators keep executing at a time interval and succeed when
+ a criteria is met and fail if and when they time out.
+
+ :param soft_fail: Set to true to mark the task as SKIPPED on failure
+ :type soft_fail: bool
+ :param poke_interval: Time in seconds that the job should wait in
+ between each tries
+ :type poke_interval: int
+ :param timeout: Time, in seconds before the task times out and fails.
+ :type timeout: int
+ """
+ ui_color = '#e6f1f2'
+
+ @apply_defaults
+ def __init__(self,
+ poke_interval=60,
+ timeout=60 * 60 * 24 * 7,
+ soft_fail=False,
+ *args,
+ **kwargs):
+ super(BaseSensorOperator, self).__init__(*args, **kwargs)
+ self.poke_interval = poke_interval
+ self.soft_fail = soft_fail
+ self.timeout = timeout
+
+ def poke(self, context):
+ """
+ Function that the sensors defined while deriving this class should
+ override.
+ """
+ raise AirflowException('Override me.')
+
+ def execute(self, context):
+ started_at = timezone.utcnow()
+ while not self.poke(context):
+ if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
+ if self.soft_fail:
+ raise AirflowSkipException('Snap. Time is OUT.')
+ else:
+ raise AirflowSensorTimeout('Snap. Time is OUT.')
+ sleep(self.poke_interval)
+ self.log.info("Success criteria met. Exiting.")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/external_task_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
new file mode 100644
index 0000000..da86c9b
--- /dev/null
+++ b/airflow/sensors/external_task_sensor.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.models import TaskInstance
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.db import provide_session
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.state import State
+
+
+class ExternalTaskSensor(BaseSensorOperator):
+ """
+ Waits for a task to complete in a different DAG
+
+ :param external_dag_id: The dag_id that contains the task you want to
+ wait for
+ :type external_dag_id: string
+ :param external_task_id: The task_id that contains the task you want to
+ wait for
+ :type external_task_id: string
+ :param allowed_states: list of allowed states, default is ``['success']``
+ :type allowed_states: list
+ :param execution_delta: time difference with the previous execution to
+ look at, the default is the same execution_date as the current task.
+ For yesterday, use [positive!] datetime.timedelta(days=1). Either
+ execution_delta or execution_date_fn can be passed to
+ ExternalTaskSensor, but not both.
+ :type execution_delta: datetime.timedelta
+ :param execution_date_fn: function that receives the current execution date
+ and returns the desired execution dates to query. Either execution_delta
+ or execution_date_fn can be passed to ExternalTaskSensor, but not both.
+ :type execution_date_fn: callable
+ """
+ template_fields = ['external_dag_id', 'external_task_id']
+ ui_color = '#19647e'
+
+ @apply_defaults
+ def __init__(self,
+ external_dag_id,
+ external_task_id,
+ allowed_states=None,
+ execution_delta=None,
+ execution_date_fn=None,
+ *args,
+ **kwargs):
+ super(ExternalTaskSensor, self).__init__(*args, **kwargs)
+ self.allowed_states = allowed_states or [State.SUCCESS]
+ if execution_delta is not None and execution_date_fn is not None:
+ raise ValueError(
+ 'Only one of `execution_date` or `execution_date_fn` may'
+ 'be provided to ExternalTaskSensor; not both.')
+
+ self.execution_delta = execution_delta
+ self.execution_date_fn = execution_date_fn
+ self.external_dag_id = external_dag_id
+ self.external_task_id = external_task_id
+
+ @provide_session
+ def poke(self, context, session=None):
+ if self.execution_delta:
+ dttm = context['execution_date'] - self.execution_delta
+ elif self.execution_date_fn:
+ dttm = self.execution_date_fn(context['execution_date'])
+ else:
+ dttm = context['execution_date']
+
+ dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+ serialized_dttm_filter = ','.join(
+ [datetime.isoformat() for datetime in dttm_filter])
+
+ self.log.info(
+ 'Poking for '
+ '{self.external_dag_id}.'
+ '{self.external_task_id} on '
+ '{} ... '.format(serialized_dttm_filter, **locals()))
+ TI = TaskInstance
+
+ count = session.query(TI).filter(
+ TI.dag_id == self.external_dag_id,
+ TI.task_id == self.external_task_id,
+ TI.state.in_(self.allowed_states),
+ TI.execution_date.in_(dttm_filter),
+ ).count()
+ session.commit()
+ return count == len(dttm_filter)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/hdfs_sensor.py b/airflow/sensors/hdfs_sensor.py
new file mode 100644
index 0000000..8d98898
--- /dev/null
+++ b/airflow/sensors/hdfs_sensor.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+from builtins import str
+
+from airflow import settings
+from airflow.hooks.hdfs_hook import HDFSHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class HdfsSensor(BaseSensorOperator):
+ """
+ Waits for a file or folder to land in HDFS
+ """
+ template_fields = ('filepath',)
+ ui_color = settings.WEB_COLORS['LIGHTBLUE']
+
+ @apply_defaults
+ def __init__(self,
+ filepath,
+ hdfs_conn_id='hdfs_default',
+ ignored_ext=['_COPYING_'],
+ ignore_copying=True,
+ file_size=None,
+ hook=HDFSHook,
+ *args,
+ **kwargs):
+ super(HdfsSensor, self).__init__(*args, **kwargs)
+ self.filepath = filepath
+ self.hdfs_conn_id = hdfs_conn_id
+ self.file_size = file_size
+ self.ignored_ext = ignored_ext
+ self.ignore_copying = ignore_copying
+ self.hook = hook
+
+ @staticmethod
+ def filter_for_filesize(result, size=None):
+ """
+ Will test the filepath result and test if its size is at least self.filesize
+
+ :param result: a list of dicts returned by Snakebite ls
+ :param size: the file size in MB a file should be at least to trigger True
+ :return: (bool) depending on the matching criteria
+ """
+ if size:
+ log = LoggingMixin().log
+ log.debug(
+ 'Filtering for file size >= %s in files: %s',
+ size, map(lambda x: x['path'], result)
+ )
+ size *= settings.MEGABYTE
+ result = [x for x in result if x['length'] >= size]
+ log.debug('HdfsSensor.poke: after size filter result is %s', result)
+ return result
+
+ @staticmethod
+ def filter_for_ignored_ext(result, ignored_ext, ignore_copying):
+ """
+ Will filter if instructed to do so the result to remove matching criteria
+
+ :param result: (list) of dicts returned by Snakebite ls
+ :param ignored_ext: (list) of ignored extensions
+ :param ignore_copying: (bool) shall we ignore ?
+ :return: (list) of dicts which were not removed
+ """
+ if ignore_copying:
+ log = LoggingMixin().log
+ regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
+ ignored_extentions_regex = re.compile(regex_builder)
+ log.debug(
+ 'Filtering result for ignored extensions: %s in files %s',
+ ignored_extentions_regex.pattern, map(lambda x: x['path'], result)
+ )
+ result = [x for x in result if not ignored_extentions_regex.match(x['path'])]
+ log.debug('HdfsSensor.poke: after ext filter result is %s', result)
+ return result
+
+ def poke(self, context):
+ sb = self.hook(self.hdfs_conn_id).get_conn()
+ self.log.info('Poking for file {self.filepath}'.format(**locals()))
+ try:
+ # IMOO it's not right here, as there no raise of any kind.
+ # if the filepath is let's say '/data/mydirectory',
+ # it's correct but if it is '/data/mydirectory/*',
+ # it's not correct as the directory exists and sb does not raise any error
+ # here is a quick fix
+ result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
+ self.log.debug('HdfsSensor.poke: result is %s', result)
+ result = self.filter_for_ignored_ext(
+ result, self.ignored_ext, self.ignore_copying
+ )
+ result = self.filter_for_filesize(result, self.file_size)
+ return bool(result)
+ except Exception:
+ e = sys.exc_info()
+ self.log.debug("Caught an exception !: %s", str(e))
+ return False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/hive_partition_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/hive_partition_sensor.py b/airflow/sensors/hive_partition_sensor.py
new file mode 100644
index 0000000..31c4175
--- /dev/null
+++ b/airflow/sensors/hive_partition_sensor.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class HivePartitionSensor(BaseSensorOperator):
+ """
+ Waits for a partition to show up in Hive.
+
+ Note: Because ``partition`` supports general logical operators, it
+ can be inefficient. Consider using NamedHivePartitionSensor instead if
+ you don't need the full flexibility of HivePartitionSensor.
+
+ :param table: The name of the table to wait for, supports the dot
+ notation (my_database.my_table)
+ :type table: string
+ :param partition: The partition clause to wait for. This is passed as
+ is to the metastore Thrift client ``get_partitions_by_filter`` method,
+ and apparently supports SQL like notation as in ``ds='2015-01-01'
+ AND type='value'`` and comparison operators as in ``"ds>=2015-01-01"``
+ :type partition: string
+ :param metastore_conn_id: reference to the metastore thrift service
+ connection id
+ :type metastore_conn_id: str
+ """
+ template_fields = ('schema', 'table', 'partition',)
+ ui_color = '#C5CAE9'
+
+ @apply_defaults
+ def __init__(self,
+ table, partition="ds='{{ ds }}'",
+ metastore_conn_id='metastore_default',
+ schema='default',
+ poke_interval=60 * 3,
+ *args,
+ **kwargs):
+ super(HivePartitionSensor, self).__init__(
+ poke_interval=poke_interval, *args, **kwargs)
+ if not partition:
+ partition = "ds='{{ ds }}'"
+ self.metastore_conn_id = metastore_conn_id
+ self.table = table
+ self.partition = partition
+ self.schema = schema
+
+ def poke(self, context):
+ if '.' in self.table:
+ self.schema, self.table = self.table.split('.')
+ self.log.info(
+ 'Poking for table {self.schema}.{self.table}, '
+ 'partition {self.partition}'.format(**locals()))
+ if not hasattr(self, 'hook'):
+ from airflow.hooks.hive_hooks import HiveMetastoreHook
+ self.hook = HiveMetastoreHook(
+ metastore_conn_id=self.metastore_conn_id)
+ return self.hook.check_for_partition(
+ self.schema, self.table, self.partition)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/http_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/http_sensor.py b/airflow/sensors/http_sensor.py
new file mode 100644
index 0000000..7d64131
--- /dev/null
+++ b/airflow/sensors/http_sensor.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from builtins import str
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.http_hook import HttpHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class HttpSensor(BaseSensorOperator):
+ """
+ Executes a HTTP get statement and returns False on failure:
+ 404 not found or response_check function returned False
+
+ :param http_conn_id: The connection to run the sensor against
+ :type http_conn_id: string
+ :param method: The HTTP request method to use
+ :type method: string
+ :param endpoint: The relative part of the full url
+ :type endpoint: string
+ :param request_params: The parameters to be added to the GET url
+ :type request_params: a dictionary of string key/value pairs
+ :param headers: The HTTP headers to be added to the GET request
+ :type headers: a dictionary of string key/value pairs
+ :param response_check: A check against the 'requests' response object.
+ Returns True for 'pass' and False otherwise.
+ :type response_check: A lambda or defined function.
+ :param extra_options: Extra options for the 'requests' library, see the
+ 'requests' documentation (options to modify timeout, ssl, etc.)
+ :type extra_options: A dictionary of options, where key is string and value
+ depends on the option that's being modified.
+ """
+
+ template_fields = ('endpoint', 'request_params')
+
+ @apply_defaults
+ def __init__(self,
+ endpoint,
+ http_conn_id='http_default',
+ method='GET',
+ request_params=None,
+ headers=None,
+ response_check=None,
+ extra_options=None, *args, **kwargs):
+ super(HttpSensor, self).__init__(*args, **kwargs)
+ self.endpoint = endpoint
+ self.http_conn_id = http_conn_id
+ self.request_params = request_params or {}
+ self.headers = headers or {}
+ self.extra_options = extra_options or {}
+ self.response_check = response_check
+
+ self.hook = HttpHook(
+ method=method,
+ http_conn_id=http_conn_id)
+
+ def poke(self, context):
+ self.log.info('Poking: %s', self.endpoint)
+ try:
+ response = self.hook.run(self.endpoint,
+ data=self.request_params,
+ headers=self.headers,
+ extra_options=self.extra_options)
+ if self.response_check:
+ # run content check on response
+ return self.response_check(response)
+ except AirflowException as ae:
+ if str(ae).startswith("404"):
+ return False
+
+ raise ae
+
+ return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/metastore_partition_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/metastore_partition_sensor.py b/airflow/sensors/metastore_partition_sensor.py
new file mode 100644
index 0000000..6f18109
--- /dev/null
+++ b/airflow/sensors/metastore_partition_sensor.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.sql_sensor import SqlSensor
+from airflow.utils.decorators import apply_defaults
+
+
+class MetastorePartitionSensor(SqlSensor):
+ """
+ An alternative to the HivePartitionSensor that talk directly to the
+ MySQL db. This was created as a result of observing sub optimal
+ queries generated by the Metastore thrift service when hitting
+ subpartitioned tables. The Thrift service's queries were written in a
+ way that wouldn't leverage the indexes.
+
+ :param schema: the schema
+ :type schema: str
+ :param table: the table
+ :type table: str
+ :param partition_name: the partition name, as defined in the PARTITIONS
+ table of the Metastore. Order of the fields does matter.
+ Examples: ``ds=2016-01-01`` or
+ ``ds=2016-01-01/sub=foo`` for a sub partitioned table
+ :type partition_name: str
+ :param mysql_conn_id: a reference to the MySQL conn_id for the metastore
+ :type mysql_conn_id: str
+ """
+ template_fields = ('partition_name', 'table', 'schema')
+ ui_color = '#8da7be'
+
+ @apply_defaults
+ def __init__(self,
+ table,
+ partition_name,
+ schema="default",
+ mysql_conn_id="metastore_mysql",
+ *args,
+ **kwargs):
+
+ self.partition_name = partition_name
+ self.table = table
+ self.schema = schema
+ self.first_poke = True
+ self.conn_id = mysql_conn_id
+ # TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.
+ # The problem is the way apply_defaults works isn't compatible with inheritance.
+ # The inheritance model needs to be reworked in order to support overriding args/
+ # kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the
+ # constructor below and apply_defaults will no longer throw an exception.
+ super(SqlSensor, self).__init__(*args, **kwargs)
+
+ def poke(self, context):
+ if self.first_poke:
+ self.first_poke = False
+ if '.' in self.table:
+ self.schema, self.table = self.table.split('.')
+ self.sql = """
+ SELECT 'X'
+ FROM PARTITIONS A0
+ LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID
+ LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID
+ WHERE
+ B0.TBL_NAME = '{self.table}' AND
+ C0.NAME = '{self.schema}' AND
+ A0.PART_NAME = '{self.partition_name}';
+ """.format(self=self)
+ return super(MetastorePartitionSensor, self).poke(context)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/named_hive_partition_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/named_hive_partition_sensor.py b/airflow/sensors/named_hive_partition_sensor.py
new file mode 100644
index 0000000..75bdb54
--- /dev/null
+++ b/airflow/sensors/named_hive_partition_sensor.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from past.builtins import basestring
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class NamedHivePartitionSensor(BaseSensorOperator):
+ """
+ Waits for a set of partitions to show up in Hive.
+
+ :param partition_names: List of fully qualified names of the
+ partitions to wait for. A fully qualified name is of the
+ form ``schema.table/pk1=pv1/pk2=pv2``, for example,
+ default.users/ds=2016-01-01. This is passed as is to the metastore
+ Thrift client ``get_partitions_by_name`` method. Note that
+ you cannot use logical or comparison operators as in
+ HivePartitionSensor.
+ :type partition_names: list of strings
+ :param metastore_conn_id: reference to the metastore thrift service
+ connection id
+ :type metastore_conn_id: str
+ """
+
+ template_fields = ('partition_names',)
+ ui_color = '#8d99ae'
+
+ @apply_defaults
+ def __init__(self,
+ partition_names,
+ metastore_conn_id='metastore_default',
+ poke_interval=60 * 3,
+ *args,
+ **kwargs):
+ super(NamedHivePartitionSensor, self).__init__(
+ poke_interval=poke_interval, *args, **kwargs)
+
+ if isinstance(partition_names, basestring):
+ raise TypeError('partition_names must be an array of strings')
+
+ self.metastore_conn_id = metastore_conn_id
+ self.partition_names = partition_names
+ self.next_poke_idx = 0
+
+ @classmethod
+ def parse_partition_name(self, partition):
+ try:
+ schema, table_partition = partition.split('.', 1)
+ table, partition = table_partition.split('/', 1)
+ return schema, table, partition
+ except ValueError as e:
+ raise ValueError('Could not parse ' + partition)
+
+ def poke(self, context):
+ if not hasattr(self, 'hook'):
+ from airflow.hooks.hive_hooks import HiveMetastoreHook
+ self.hook = HiveMetastoreHook(
+ metastore_conn_id=self.metastore_conn_id)
+
+ def poke_partition(partition):
+
+ schema, table, partition = self.parse_partition_name(partition)
+
+ self.log.info(
+ 'Poking for {schema}.{table}/{partition}'.format(**locals())
+ )
+ return self.hook.check_for_named_partition(
+ schema, table, partition)
+
+ while self.next_poke_idx < len(self.partition_names):
+ if poke_partition(self.partition_names[self.next_poke_idx]):
+ self.next_poke_idx += 1
+ else:
+ return False
+
+ return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/s3_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/s3_key_sensor.py b/airflow/sensors/s3_key_sensor.py
new file mode 100644
index 0000000..0d63426
--- /dev/null
+++ b/airflow/sensors/s3_key_sensor.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from urllib.parse import urlparse
+
+from airflow.exceptions import AirflowException
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class S3KeySensor(BaseSensorOperator):
+ """
+ Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+ S3 being a key/value it does not support folders. The path is just a key
+ a resource.
+
+ :param bucket_key: The key being waited on. Supports full s3:// style url
+ or relative path from root level.
+ :type bucket_key: str
+ :param bucket_name: Name of the S3 bucket
+ :type bucket_name: str
+ :param wildcard_match: whether the bucket_key should be interpreted as a
+ Unix wildcard pattern
+ :type wildcard_match: bool
+ :param aws_conn_id: a reference to the s3 connection
+ :type aws_conn_id: str
+ """
+ template_fields = ('bucket_key', 'bucket_name')
+
+ @apply_defaults
+ def __init__(self,
+ bucket_key,
+ bucket_name=None,
+ wildcard_match=False,
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
+ super(S3KeySensor, self).__init__(*args, **kwargs)
+ # Parse
+ if bucket_name is None:
+ parsed_url = urlparse(bucket_key)
+ if parsed_url.netloc == '':
+ raise AirflowException('Please provide a bucket_name')
+ else:
+ bucket_name = parsed_url.netloc
+ if parsed_url.path[0] == '/':
+ bucket_key = parsed_url.path[1:]
+ else:
+ bucket_key = parsed_url.path
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+
+ def poke(self, context):
+ from airflow.hooks.S3_hook import S3Hook
+ hook = S3Hook(aws_conn_id=self.aws_conn_id)
+ full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
+ self.log.info('Poking for key : {full_url}'.format(**locals()))
+ if self.wildcard_match:
+ return hook.check_for_wildcard_key(self.bucket_key,
+ self.bucket_name)
+ else:
+ return hook.check_for_key(self.bucket_key, self.bucket_name)