You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/19 17:59:15 UTC
[2/3] incubator-airflow git commit: [AIRFLOW-1889] Split sensors into
separate files
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/s3_prefix_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/s3_prefix_sensor.py b/airflow/sensors/s3_prefix_sensor.py
new file mode 100644
index 0000000..e8002f3
--- /dev/null
+++ b/airflow/sensors/s3_prefix_sensor.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+from airflow.utils.decorators import apply_defaults
+
+
+class S3PrefixSensor(BaseSensorOperator):
+ """
+ Waits for a prefix to exist. A prefix is the first part of a key,
+ thus enabling checking of constructs similar to glob airfl* or
+ SQL LIKE 'airfl%'. There is the possibility to precise a delimiter to
+ indicate the hierarchy or keys, meaning that the match will stop at that
+ delimiter. Current code accepts sane delimiters, i.e. characters that
+ are NOT special characters in the Python regex engine.
+
+ :param bucket_name: Name of the S3 bucket
+ :type bucket_name: str
+ :param prefix: The prefix being waited on. Relative path from bucket root level.
+ :type prefix: str
+ :param delimiter: The delimiter intended to show hierarchy.
+ Defaults to '/'.
+ :type delimiter: str
+ """
+ template_fields = ('prefix', 'bucket_name')
+
+ @apply_defaults
+ def __init__(self,
+ bucket_name,
+ prefix,
+ delimiter='/',
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
+ super(S3PrefixSensor, self).__init__(*args, **kwargs)
+ # Parse
+ self.bucket_name = bucket_name
+ self.prefix = prefix
+ self.delimiter = delimiter
+ self.full_url = "s3://" + bucket_name + '/' + prefix
+ self.aws_conn_id = aws_conn_id
+
+ def poke(self, context):
+ self.log.info('Poking for prefix : {self.prefix}\n'
+ 'in bucket s3://{self.bucket_name}'.format(**locals()))
+ from airflow.hooks.S3_hook import S3Hook
+ hook = S3Hook(aws_conn_id=self.aws_conn_id)
+ return hook.check_for_prefix(
+ prefix=self.prefix,
+ delimiter=self.delimiter,
+ bucket_name=self.bucket_name)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/sql_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/sql_sensor.py b/airflow/sensors/sql_sensor.py
new file mode 100644
index 0000000..cb23faa
--- /dev/null
+++ b/airflow/sensors/sql_sensor.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from builtins import str
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SqlSensor(BaseSensorOperator):
+ """
+ Runs a sql statement until a criteria is met. It will keep trying while
+ sql returns no row, or if the first cell in (0, '0', '').
+
+ :param conn_id: The connection to run the sensor against
+ :type conn_id: string
+ :param sql: The sql to run. To pass, it needs to return at least one cell
+ that contains a non-zero / empty string value.
+ """
+ template_fields = ('sql',)
+ template_ext = ('.hql', '.sql',)
+ ui_color = '#7c7287'
+
+ @apply_defaults
+ def __init__(self, conn_id, sql, *args, **kwargs):
+ self.sql = sql
+ self.conn_id = conn_id
+ super(SqlSensor, self).__init__(*args, **kwargs)
+
+ def poke(self, context):
+ hook = BaseHook.get_connection(self.conn_id).get_hook()
+
+ self.log.info('Poking: %s', self.sql)
+ records = hook.get_records(self.sql)
+ if not records:
+ return False
+ else:
+ if str(records[0][0]) in ('0', '',):
+ return False
+ else:
+ return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/time_delta_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/time_delta_sensor.py b/airflow/sensors/time_delta_sensor.py
new file mode 100644
index 0000000..31f75ca
--- /dev/null
+++ b/airflow/sensors/time_delta_sensor.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class TimeDeltaSensor(BaseSensorOperator):
+ """
+ Waits for a timedelta after the task's execution_date + schedule_interval.
+ In Airflow, the daily task stamped with ``execution_date``
+ 2016-01-01 can only start running on 2016-01-02. The timedelta here
+ represents the time after the execution period has closed.
+
+ :param delta: time length to wait after execution_date before succeeding
+ :type delta: datetime.timedelta
+ """
+
+ @apply_defaults
+ def __init__(self, delta, *args, **kwargs):
+ super(TimeDeltaSensor, self).__init__(*args, **kwargs)
+ self.delta = delta
+
+ def poke(self, context):
+ dag = context['dag']
+ target_dttm = dag.following_schedule(context['execution_date'])
+ target_dttm += self.delta
+ self.log.info('Checking if the time (%s) has come', target_dttm)
+ return timezone.utcnow() > target_dttm
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/time_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
new file mode 100644
index 0000000..ff10b05
--- /dev/null
+++ b/airflow/sensors/time_sensor.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class TimeSensor(BaseSensorOperator):
+ """
+ Waits until the specified time of the day.
+
+ :param target_time: time after which the job succeeds
+ :type target_time: datetime.time
+ """
+
+ @apply_defaults
+ def __init__(self, target_time, *args, **kwargs):
+ super(TimeSensor, self).__init__(*args, **kwargs)
+ self.target_time = target_time
+
+ def poke(self, context):
+ self.log.info('Checking if the time (%s) has come', self.target_time)
+ return timezone.utcnow().time() > self.target_time
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/airflow/sensors/web_hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/sensors/web_hdfs_sensor.py b/airflow/sensors/web_hdfs_sensor.py
new file mode 100644
index 0000000..df023b7
--- /dev/null
+++ b/airflow/sensors/web_hdfs_sensor.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class WebHdfsSensor(BaseSensorOperator):
+ """
+ Waits for a file or folder to land in HDFS
+ """
+ template_fields = ('filepath',)
+
+ @apply_defaults
+ def __init__(self,
+ filepath,
+ webhdfs_conn_id='webhdfs_default',
+ *args,
+ **kwargs):
+ super(WebHdfsSensor, self).__init__(*args, **kwargs)
+ self.filepath = filepath
+ self.webhdfs_conn_id = webhdfs_conn_id
+
+ def poke(self, context):
+ from airflow.hooks.webhdfs_hook import WebHDFSHook
+ c = WebHDFSHook(self.webhdfs_conn_id)
+ self.log.info('Poking for file {self.filepath}'.format(**locals()))
+ return c.check_for_path(hdfs_path=self.filepath)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 045e5a4..d74d00e 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -37,7 +37,7 @@ All sensors are derived from ``BaseSensorOperator``. All sensors inherit
the ``timeout`` and ``poke_interval`` on top of the ``BaseOperator``
attributes.
-.. autoclass:: airflow.operators.sensors.BaseSensorOperator
+.. autoclass:: airflow.operators.base_sensor_operator.BaseSensorOperator
Operator API
@@ -54,34 +54,37 @@ Operator API
.. autoclass:: airflow.operators.docker_operator.DockerOperator
.. autoclass:: airflow.operators.dummy_operator.DummyOperator
.. autoclass:: airflow.operators.email_operator.EmailOperator
-.. autoclass:: airflow.operators.sensors.ExternalTaskSensor
.. autoclass:: airflow.operators.generic_transfer.GenericTransfer
-.. autoclass:: airflow.operators.sensors.HdfsSensor
.. autoclass:: airflow.operators.hive_to_samba_operator.Hive2SambaOperator
.. autoclass:: airflow.operators.hive_operator.HiveOperator
-.. autoclass:: airflow.operators.sensors.HivePartitionSensor
.. autoclass:: airflow.operators.hive_to_druid.HiveToDruidTransfer
.. autoclass:: airflow.operators.hive_to_mysql.HiveToMySqlTransfer
.. autoclass:: airflow.operators.http_operator.SimpleHttpOperator
-.. autoclass:: airflow.operators.sensors.HttpSensor
-.. autoclass:: airflow.operators.sensors.MetastorePartitionSensor
.. autoclass:: airflow.operators.mssql_operator.MsSqlOperator
.. autoclass:: airflow.operators.mssql_to_hive.MsSqlToHiveTransfer
-.. autoclass:: airflow.operators.sensors.NamedHivePartitionSensor
.. autoclass:: airflow.operators.postgres_operator.PostgresOperator
.. autoclass:: airflow.operators.presto_check_operator.PrestoCheckOperator
.. autoclass:: airflow.operators.presto_check_operator.PrestoIntervalCheckOperator
.. autoclass:: airflow.operators.presto_check_operator.PrestoValueCheckOperator
.. autoclass:: airflow.operators.python_operator.PythonOperator
.. autoclass:: airflow.operators.python_operator.PythonVirtualenvOperator
-.. autoclass:: airflow.operators.sensors.S3KeySensor
.. autoclass:: airflow.operators.s3_to_hive_operator.S3ToHiveTransfer
.. autoclass:: airflow.operators.ShortCircuitOperator
.. autoclass:: airflow.operators.slack_operator.SlackAPIOperator
-.. autoclass:: airflow.operators.sensors.SqlSensor
.. autoclass:: airflow.operators.subdag_operator.SubDagOperator
-.. autoclass:: airflow.operators.sensors.TimeSensor
-.. autoclass:: airflow.operators.sensors.HdfsSensor
+
+.. autoclass:: airflow.sensors.external_task_sensor.ExternalTaskSensor
+.. autoclass:: airflow.sensors.hdfs_sensor.HdfsSensor
+.. autoclass:: airflow.sensors.hive_partition_sensor.HivePartitionSensor
+.. autoclass:: airflow.sensors.http_sensor.HttpSensor
+.. autoclass:: airflow.sensors.metastore_partition_sensor.MetastorePartitionSensor
+.. autoclass:: airflow.sensors.named_hive_partition_sensor.NamedHivePartitionSensor
+.. autoclass:: airflow.sensors.s3_key_sensor.S3KeySensor
+.. autoclass:: airflow.sensors.s3_prefix_sensor.S3PrefixSensor
+.. autoclass:: airflow.sensors.sql_sensor.SqlSensor
+.. autoclass:: airflow.sensors.time_sensor.TimeSensor
+.. autoclass:: airflow.sensors.time_delta_sensor.TimeDeltaSensor
+.. autoclass:: airflow.sensors.web_hdfs_sensor.WebHdfsSensor
Community-contributed Operators
'''''''''''''''''''''''''''''''
@@ -91,7 +94,6 @@ Community-contributed Operators
.. deprecated:: 1.8
Use :code:`from airflow.operators.bash_operator import BashOperator` instead.
-.. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
.. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
@@ -106,14 +108,29 @@ Community-contributed Operators
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
-.. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
.. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator
.. autoclass:: airflow.contrib.operators.ssh_operator.SSHOperator
.. autoclass:: airflow.contrib.operators.vertica_operator.VerticaOperator
.. autoclass:: airflow.contrib.operators.vertica_to_hive.VerticaToHiveTransfer
+
+.. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
.. autoclass:: airflow.contrib.sensors.bash_sensor.BashSensor
+.. autoclass:: airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor
+.. autoclass:: airflow.contrib.sensors.datadog_sensor.DatadogSensor
+.. autoclass:: airflow.contrib.sensors.emr_base_sensor.EmrBaseSensor
+.. autoclass:: airflow.contrib.sensors.emr_job_flow_sensor.EmrJobFlowSensor
+.. autoclass:: airflow.contrib.sensors.emr_step_sensor.EmrStepSensor
+.. autoclass:: airflow.contrib.sensors.file_sensor.FileSensor
+.. autoclass:: airflow.contrib.sensors.ftp_sensor.FtpSensor
+.. autoclass:: airflow.contrib.sensors.gcs_sensor.GcsSensor
+.. autoclass:: airflow.contrib.sensors.hdfs_sensor.HdfsSensor
+.. autoclass:: airflow.contrib.sensors.jira_sensor.JiraSensor
+.. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor
+.. autoclass:: airflow.contrib.sensors.qubole_sensor.QuboleSensor
+.. autoclass:: airflow.contrib.sensors.redis_key_sensor.RedisKeySensor
+.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
.. _macros:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/operators/test_fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_fs_operator.py b/tests/contrib/operators/test_fs_operator.py
deleted file mode 100644
index 2ef4286..0000000
--- a/tests/contrib/operators/test_fs_operator.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-
-from airflow import configuration
-from airflow.settings import Session
-from airflow import models, DAG
-from airflow.contrib.operators.fs_operator import FileSensor
-from airflow.utils.timezone import datetime
-
-TEST_DAG_ID = 'unit_tests'
-DEFAULT_DATE = datetime(2015, 1, 1)
-configuration.load_test_config()
-
-
-def reset(dag_id=TEST_DAG_ID):
- session = Session()
- tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
- tis.delete()
- session.commit()
- session.close()
-
-
-reset()
-
-
-class FileSensorTest(unittest.TestCase):
- def setUp(self):
- configuration.load_test_config()
- from airflow.contrib.hooks.fs_hook import FSHook
- hook = FSHook()
- args = {
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE,
- 'provide_context': True
- }
- dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args)
- dag.schedule_interval = '@once'
- self.hook = hook
- self.dag = dag
-
- def test_simple(self):
- task = FileSensor(
- task_id="test",
- filepath="etc/hosts",
- fs_conn_id='fs_default',
- _hook=self.hook,
- dag=self.dag,
- )
- task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py b/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
index a5c9e66..cec24f2 100644
--- a/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
+++ b/tests/contrib/sensors/test_aws_redshift_cluster_sensor.py
@@ -14,6 +14,7 @@
#
import unittest
+
import boto3
from airflow import configuration
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_bash_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_bash_sensor.py b/tests/contrib/sensors/test_bash_sensor.py
index 66b18e4..5beab54 100644
--- a/tests/contrib/sensors/test_bash_sensor.py
+++ b/tests/contrib/sensors/test_bash_sensor.py
@@ -13,9 +13,10 @@
# limitations under the License.
#
-import datetime
import unittest
+import datetime
+
from airflow import DAG, configuration
from airflow.contrib.sensors.bash_sensor import BashSensor
from airflow.exceptions import AirflowSensorTimeout
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_datadog_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_datadog_sensor.py b/tests/contrib/sensors/test_datadog_sensor.py
index d845c54..8ceb7ca 100644
--- a/tests/contrib/sensors/test_datadog_sensor.py
+++ b/tests/contrib/sensors/test_datadog_sensor.py
@@ -14,13 +14,13 @@
import json
import unittest
+
from mock import patch
from airflow import configuration
-from airflow.utils import db
from airflow import models
from airflow.contrib.sensors.datadog_sensor import DatadogSensor
-
+from airflow.utils import db
at_least_one_event = [{'alert_type': 'info',
'comments': [],
@@ -102,5 +102,6 @@ class TestDatadogSensor(unittest.TestCase):
self.assertFalse(sensor.poke({}))
+
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py
index 970d189..587383b 100644
--- a/tests/contrib/sensors/test_emr_base_sensor.py
+++ b/tests/contrib/sensors/test_emr_base_sensor.py
@@ -15,8 +15,8 @@
import unittest
from airflow import configuration
-from airflow.exceptions import AirflowException
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+from airflow.exceptions import AirflowException
class TestEmrBaseSensor(unittest.TestCase):
@@ -92,7 +92,6 @@ class TestEmrBaseSensor(unittest.TestCase):
self.assertEqual(operator.poke(None), False)
-
def test_poke_raises_error_when_job_has_failed(self):
class EmrBaseSensorSubclass(EmrBaseSensor):
NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
@@ -115,7 +114,6 @@ class TestEmrBaseSensor(unittest.TestCase):
)
with self.assertRaises(AirflowException) as context:
-
operator.poke(None)
self.assertIn('EMR job failed', str(context.exception))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_file_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_file_sensor.py b/tests/contrib/sensors/test_file_sensor.py
new file mode 100644
index 0000000..7bc559d
--- /dev/null
+++ b/tests/contrib/sensors/test_file_sensor.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow import configuration
+from airflow import models, DAG
+from airflow.contrib.sensors.file_sensor import FileSensor
+from airflow.settings import Session
+from airflow.utils.timezone import datetime
+
+TEST_DAG_ID = 'unit_tests'
+DEFAULT_DATE = datetime(2015, 1, 1)
+configuration.load_test_config()
+
+
+def reset(dag_id=TEST_DAG_ID):
+ session = Session()
+ tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+ tis.delete()
+ session.commit()
+ session.close()
+
+
+reset()
+
+
+class FileSensorTest(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ from airflow.contrib.hooks.fs_hook import FSHook
+ hook = FSHook()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'provide_context': True
+ }
+ dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
+ dag.schedule_interval = '@once'
+ self.hook = hook
+ self.dag = dag
+
+ def test_simple(self):
+ task = FileSensor(
+ task_id="test",
+ filepath="etc/hosts",
+ fs_conn_id='fs_default',
+ _hook=self.hook,
+ dag=self.dag,
+ )
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_ftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_ftp_sensor.py b/tests/contrib/sensors/test_ftp_sensor.py
index 50f8b8b..f3aff9b 100644
--- a/tests/contrib/sensors/test_ftp_sensor.py
+++ b/tests/contrib/sensors/test_ftp_sensor.py
@@ -13,8 +13,8 @@
# limitations under the License.
import unittest
-from ftplib import error_perm
+from ftplib import error_perm
from mock import MagicMock
from airflow.contrib.hooks.ftp_hook import FTPHook
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
index a76a6c4..cd191f8 100644
--- a/tests/contrib/sensors/test_hdfs_sensors.py
+++ b/tests/contrib/sensors/test_hdfs_sensors.py
@@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import sys
import unittest
+
import re
from datetime import timedelta
+
from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex
from airflow.exceptions import AirflowSensorTimeout
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_jira_sensor_test.py b/tests/contrib/sensors/test_jira_sensor_test.py
index 7c16188..4390f34 100644
--- a/tests/contrib/sensors/test_jira_sensor_test.py
+++ b/tests/contrib/sensors/test_jira_sensor_test.py
@@ -19,13 +19,13 @@ from mock import Mock
from mock import patch
from airflow import DAG, configuration
-from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
from airflow import models
+from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
from airflow.utils import db, timezone
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
jira_client_mock = Mock(
- name="jira_client_for_test"
+ name="jira_client_for_test"
)
minimal_test_ticket = {
@@ -52,10 +52,10 @@ class TestJiraSensor(unittest.TestCase):
dag = DAG('test_dag_id', default_args=args)
self.dag = dag
db.merge_conn(
- models.Connection(
- conn_id='jira_default', conn_type='jira',
- host='https://localhost/jira/', port=443,
- extra='{"verify": "False", "project": "AIRFLOW"}'))
+ models.Connection(
+ conn_id='jira_default', conn_type='jira',
+ host='https://localhost/jira/', port=443,
+ extra='{"verify": "False", "project": "AIRFLOW"}'))
@patch("airflow.contrib.hooks.jira_hook.JIRA",
autospec=True, return_value=jira_client_mock)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_pubsub_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_pubsub_sensor.py b/tests/contrib/sensors/test_pubsub_sensor.py
index ae59bb7..dcbbf6e 100644
--- a/tests/contrib/sensors/test_pubsub_sensor.py
+++ b/tests/contrib/sensors/test_pubsub_sensor.py
@@ -14,9 +14,10 @@
from __future__ import unicode_literals
-from base64 import b64encode as b64e
import unittest
+from base64 import b64encode as b64e
+
from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor
from airflow.exceptions import AirflowSensorTimeout
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_qubole_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_qubole_sensor.py b/tests/contrib/sensors/test_qubole_sensor.py
index 035b231..88c5be6 100644
--- a/tests/contrib/sensors/test_qubole_sensor.py
+++ b/tests/contrib/sensors/test_qubole_sensor.py
@@ -14,15 +14,14 @@
#
import unittest
-from mock import patch
from datetime import datetime
+from mock import patch
+from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, QubolePartitionSensor
+from airflow.exceptions import AirflowException
from airflow.models import DAG, Connection
from airflow.utils import db
-from airflow.exceptions import AirflowException
-
-from airflow.contrib.sensors.qubole_sensor import QuboleFileSensor, QubolePartitionSensor
DAG_ID = "qubole_test_dag"
TASK_ID = "test_task"
@@ -39,14 +38,10 @@ class QuboleSensorTest(unittest.TestCase):
@patch('airflow.contrib.sensors.qubole_sensor.QuboleFileSensor.poke')
def test_file_sensore(self, patched_poke):
patched_poke.return_value = True
-
sensor = QuboleFileSensor(
task_id='test_qubole_file_sensor',
- data={"files":
- ["s3://some_bucket/some_file"]
- }
+ data={"files": ["s3://some_bucket/some_file"]}
)
-
self.assertTrue(sensor.poke({}))
@patch('airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor.poke')
@@ -55,12 +50,11 @@ class QuboleSensorTest(unittest.TestCase):
sensor = QubolePartitionSensor(
task_id='test_qubole_partition_sensor',
- data={"schema":"default",
- "table":"my_partitioned_table",
- "columns":[
- {"column" : "month", "values" : ["1", "2"]},
- ]
- },
+ data={
+ "schema": "default",
+ "table": "my_partitioned_table",
+ "columns": [{"column": "month", "values": ["1", "2"]}]
+ }
)
self.assertTrue(sensor.poke({}))
@@ -72,14 +66,13 @@ class QuboleSensorTest(unittest.TestCase):
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
with self.assertRaises(AirflowException):
- sensor = QubolePartitionSensor(
+ QubolePartitionSensor(
task_id='test_qubole_partition_sensor',
poke_interval=1,
- data={"schema":"default",
- "table":"my_partitioned_table",
- "columns":[
- {"column" : "month", "values" : ["1", "2"]},
- ]
- },
+ data={
+ "schema": "default",
+ "table": "my_partitioned_table",
+ "columns": [{"column": "month", "values": ["1", "2"]}]
+ },
dag=dag
- )
\ No newline at end of file
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/contrib/sensors/test_wasb_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_wasb_sensor.py b/tests/contrib/sensors/test_wasb_sensor.py
index a26ba2d..a11d740 100644
--- a/tests/contrib/sensors/test_wasb_sensor.py
+++ b/tests/contrib/sensors/test_wasb_sensor.py
@@ -13,9 +13,10 @@
# limitations under the License.
#
-import datetime
import unittest
+import datetime
+
from airflow import DAG, configuration
from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor
from airflow.contrib.sensors.wasb_sensor import WasbPrefixSensor
@@ -30,7 +31,6 @@ except ImportError:
class TestWasbBlobSensor(unittest.TestCase):
-
_config = {
'container_name': 'container',
'blob_name': 'blob',
@@ -83,7 +83,6 @@ class TestWasbBlobSensor(unittest.TestCase):
class TestWasbPrefixSensor(unittest.TestCase):
-
_config = {
'container_name': 'container',
'prefix': 'prefix',
@@ -134,5 +133,6 @@ class TestWasbPrefixSensor(unittest.TestCase):
'container', 'prefix', timeout=2
)
+
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0778628..f25d0e7 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -14,28 +14,27 @@
from __future__ import print_function
+import json
+import unittest
+
import bleach
import doctest
-import json
-import logging
+import mock
+import multiprocessing
import os
import re
-import unittest
-import multiprocessing
-import mock
-from numpy.testing import assert_array_almost_equal
+import signal
+import sqlalchemy
import tempfile
-from datetime import time, timedelta
-from email.mime.multipart import MIMEMultipart
+import warnings
+from datetime import timedelta
+from dateutil.relativedelta import relativedelta
from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
from freezegun import freeze_time
-import signal
+from numpy.testing import assert_array_almost_equal
from six.moves.urllib.parse import urlencode
from time import sleep
-import warnings
-
-from dateutil.relativedelta import relativedelta
-import sqlalchemy
from airflow import configuration
from airflow.executors import SequentialExecutor
@@ -49,8 +48,7 @@ from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.http_operator import SimpleHttpOperator
-from airflow.operators import sensors
+
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.sqlite_hook import SqliteHook
from airflow.bin import cli
@@ -77,7 +75,6 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
-
try:
import cPickle as pickle
except ImportError:
@@ -111,7 +108,6 @@ class OperatorSubclass(BaseOperator):
class CoreTest(unittest.TestCase):
-
# These defaults make the test faster to run
default_scheduler_args = {"file_process_interval": 0,
"processor_poll_interval": 0.5,
@@ -122,8 +118,7 @@ class CoreTest(unittest.TestCase):
self.dagbag = models.DagBag(
dag_folder=DEV_NULL, include_examples=True)
self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
- dag = DAG(TEST_DAG_ID, default_args=self.args)
- self.dag = dag
+ self.dag = DAG(TEST_DAG_ID, default_args=self.args)
self.dag_bash = self.dagbag.dags['example_bash_operator']
self.runme_0 = self.dag_bash.get_task('runme_0')
self.run_after_loop = self.dag_bash.get_task('run_after_loop')
@@ -144,9 +139,12 @@ class CoreTest(unittest.TestCase):
self.assertEqual(dag.dag_id, dag_run.dag_id)
self.assertIsNotNone(dag_run.run_id)
self.assertNotEqual('', dag_run.run_id)
- self.assertEqual(datetime(2015, 1, 2, 0, 0), dag_run.execution_date, msg=
- 'dag_run.execution_date did not match expectation: {0}'
- .format(dag_run.execution_date))
+ self.assertEqual(
+ datetime(2015, 1, 2, 0, 0),
+ dag_run.execution_date,
+ msg='dag_run.execution_date did not match expectation: {0}'
+ .format(dag_run.execution_date)
+ )
self.assertEqual(State.RUNNING, dag_run.state)
self.assertFalse(dag_run.external_trigger)
dag.clear()
@@ -175,9 +173,12 @@ class CoreTest(unittest.TestCase):
self.assertEqual(dag.dag_id, dag_run.dag_id)
self.assertIsNotNone(dag_run.run_id)
self.assertNotEqual('', dag_run.run_id)
- self.assertEqual(DEFAULT_DATE + delta, dag_run.execution_date, msg=
- 'dag_run.execution_date did not match expectation: {0}'
- .format(dag_run.execution_date))
+ self.assertEqual(
+ DEFAULT_DATE + delta,
+ dag_run.execution_date,
+ msg='dag_run.execution_date did not match expectation: {0}'
+ .format(dag_run.execution_date)
+ )
self.assertEqual(State.RUNNING, dag_run.state)
self.assertFalse(dag_run.external_trigger)
@@ -348,13 +349,6 @@ class CoreTest(unittest.TestCase):
self.assertNotEqual(hash(dag_diff_name), hash(self.dag))
self.assertNotEqual(hash(dag_subclass), hash(self.dag))
- def test_time_sensor(self):
- t = sensors.TimeSensor(
- task_id='time_sensor_check',
- target_time=time(0),
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
def test_check_operators(self):
conn_id = "sqlite_default"
@@ -407,7 +401,7 @@ class CoreTest(unittest.TestCase):
def test_bash_operator(self):
t = BashOperator(
- task_id='time_sensor_check',
+ task_id='test_bash_operator',
bash_command="echo success",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@@ -421,7 +415,6 @@ class CoreTest(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
def test_bash_operator_kill(self):
- import subprocess
import psutil
sleep_time = "100%d" % os.getpid()
t = BashOperator(
@@ -456,7 +449,7 @@ class CoreTest(unittest.TestCase):
def test_dryrun(self):
t = BashOperator(
- task_id='time_sensor_check',
+ task_id='test_dryrun',
bash_command="echo success",
dag=self.dag)
t.dry_run()
@@ -469,70 +462,6 @@ class CoreTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- def test_timedelta_sensor(self):
- t = sensors.TimeDeltaSensor(
- task_id='timedelta_sensor_check',
- delta=timedelta(seconds=2),
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- def test_external_task_sensor(self):
- t = sensors.ExternalTaskSensor(
- task_id='test_external_task_sensor_check',
- external_dag_id=TEST_DAG_ID,
- external_task_id='time_sensor_check',
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- def test_external_task_sensor_delta(self):
- t = sensors.ExternalTaskSensor(
- task_id='test_external_task_sensor_check_delta',
- external_dag_id=TEST_DAG_ID,
- external_task_id='time_sensor_check',
- execution_delta=timedelta(0),
- allowed_states=['success'],
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- def test_external_task_sensor_fn(self):
- self.test_time_sensor()
- # check that the execution_fn works
- t = sensors.ExternalTaskSensor(
- task_id='test_external_task_sensor_check_delta',
- external_dag_id=TEST_DAG_ID,
- external_task_id='time_sensor_check',
- execution_date_fn=lambda dt: dt + timedelta(0),
- allowed_states=['success'],
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- # double check that the execution is being called by failing the test
- t2 = sensors.ExternalTaskSensor(
- task_id='test_external_task_sensor_check_delta',
- external_dag_id=TEST_DAG_ID,
- external_task_id='time_sensor_check',
- execution_date_fn=lambda dt: dt + timedelta(days=1),
- allowed_states=['success'],
- timeout=1,
- poke_interval=1,
- dag=self.dag)
- with self.assertRaises(exceptions.AirflowSensorTimeout):
- t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- def test_external_task_sensor_error_delta_and_fn(self):
- """
- Test that providing execution_delta and a function raises an error
- """
- with self.assertRaises(ValueError):
- t = sensors.ExternalTaskSensor(
- task_id='test_external_task_sensor_check_delta',
- external_dag_id=TEST_DAG_ID,
- external_task_id='time_sensor_check',
- execution_delta=timedelta(0),
- execution_date_fn=lambda dt: dt,
- allowed_states=['success'],
- dag=self.dag)
-
def test_timeout(self):
t = PythonOperator(
task_id='test_timeout',
@@ -576,7 +505,7 @@ class CoreTest(unittest.TestCase):
Test the availability of variables in templates
"""
val = {
- 'success':False,
+ 'success': False,
'test_value': 'a test value'
}
Variable.set("a_variable", val['test_value'])
@@ -645,9 +574,11 @@ class CoreTest(unittest.TestCase):
"""
Test templates can handle objects with no sense of truthiness
"""
+
class NonBoolObject(object):
def __len__(self):
return NotImplemented
+
def __bool__(self):
return NotImplemented
@@ -693,13 +624,13 @@ class CoreTest(unittest.TestCase):
def test_get_non_existing_var_should_return_default(self):
default_value = "some default val"
self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
- default_var=default_value))
+ default_var=default_value))
def test_get_non_existing_var_should_not_deserialize_json_default(self):
default_value = "}{ this is a non JSON default }{"
self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
- default_var=default_value,
- deserialize_json=True))
+ default_var=default_value,
+ deserialize_json=True))
def test_variable_setdefault_round_trip(self):
key = "tested_var_setdefault_1_id"
@@ -900,14 +831,12 @@ class CoreTest(unittest.TestCase):
ti.refresh_from_db(session=session)
# making sure it's actually running
self.assertEqual(State.RUNNING, ti.state)
- ti = (
- session.query(TI)
- .filter_by(
- dag_id=task.dag_id,
- task_id=task.task_id,
- execution_date=DEFAULT_DATE)
- .one()
- )
+ ti = session.query(TI).filter_by(
+ dag_id=task.dag_id,
+ task_id=task.task_id,
+ execution_date=DEFAULT_DATE
+ ).one()
+
# deleting the instance should result in a failure
session.delete(ti)
session.commit()
@@ -986,7 +915,7 @@ class CoreTest(unittest.TestCase):
run2 = self.dag_bash.create_dagrun(
run_id="run2",
- execution_date=DEFAULT_DATE+timedelta(days=1),
+ execution_date=DEFAULT_DATE + timedelta(days=1),
state=State.RUNNING)
models.DagStat.update([self.dag_bash.dag_id], session=session)
@@ -1277,8 +1206,7 @@ class CliTests(unittest.TestCase):
# Check deletions
for index in range(1, 7):
conn_id = 'new%s' % index
- result = (session
- .query(models.Connection)
+ result = (session.query(models.Connection)
.filter(models.Connection.conn_id == conn_id)
.first())
@@ -1625,20 +1553,27 @@ class SecurityTests(unittest.TestCase):
session.add(chart2)
session.add(chart3)
session.commit()
- chart1_id = session.query(Chart).filter(Chart.label=='insecure_chart').first().id
+ chart1 = session.query(Chart).filter(Chart.label == 'insecure_chart').first()
with self.assertRaises(SecurityError):
- response = self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart1_id))
- chart2_id = session.query(Chart).filter(Chart.label=="{{ ''.__class__.__mro__[1].__subclasses__() }}").first().id
+ self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart1.id))
+
+ chart2 = session.query(Chart).filter(
+ Chart.label == "{{ ''.__class__.__mro__[1].__subclasses__() }}"
+ ).first()
with self.assertRaises(SecurityError):
- response = self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart2_id))
- chart3_id = session.query(Chart).filter(Chart.label=="{{ subprocess.check_output('ls') }}").first().id
+ self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart2.id))
+
+ chart3 = session.query(Chart).filter(
+ Chart.label == "{{ subprocess.check_output('ls') }}"
+ ).first()
with self.assertRaises(UndefinedError):
- response = self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart3_id))
+ self.app.get("/admin/airflow/chart_data?chart_id={}".format(chart3.id))
def tearDown(self):
configuration.conf.set("webserver", "expose_config", "False")
self.dag_bash.clear(start_date=DEFAULT_DATE, end_date=timezone.utcnow())
+
class WebUiTests(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
@@ -1688,7 +1623,7 @@ class WebUiTests(unittest.TestCase):
url = "/admin/airflow/graph?" + urlencode({
"dag_id": self.dag_bash2.dag_id,
"execution_date": self.dagrun_bash2.execution_date,
- }).replace("&", "&")
+ }).replace("&", "&")
self.assertIn(url, resp_html)
self.assertIn(self.dagrun_bash2.execution_date.strftime("%Y-%m-%d %H:%M"), resp_html)
@@ -1697,8 +1632,8 @@ class WebUiTests(unittest.TestCase):
self.assertIn("Ad Hoc Query", response.data.decode('utf-8'))
response = self.app.post(
"/admin/queryview/", data=dict(
- conn_id="airflow_db",
- sql="SELECT+COUNT%281%29+as+TEST+FROM+task_instance"))
+ conn_id="airflow_db",
+ sql="SELECT+COUNT%281%29+as+TEST+FROM+task_instance"))
self.assertIn("TEST", response.data.decode('utf-8'))
def test_health(self):
@@ -2061,69 +1996,6 @@ class LdapGroupTest(unittest.TestCase):
configuration.conf.set("webserver", "authenticate", "False")
-class FakeSession(object):
- def __init__(self):
- from requests import Response
- self.response = Response()
- self.response.status_code = 200
- self.response._content = 'airbnb/airflow'.encode('ascii', 'ignore')
-
- def send(self, request, **kwargs):
- return self.response
-
- def prepare_request(self, request):
- if 'date' in request.params:
- self.response._content += (
- '/' + request.params['date']).encode('ascii', 'ignore')
- return self.response
-
-class HttpOpSensorTest(unittest.TestCase):
- def setUp(self):
- configuration.load_test_config()
- args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
- dag = DAG(TEST_DAG_ID, default_args=args)
- self.dag = dag
-
- @mock.patch('requests.Session', FakeSession)
- def test_get(self):
- t = SimpleHttpOperator(
- task_id='get_op',
- method='GET',
- endpoint='/search',
- data={"client": "ubuntu", "q": "airflow"},
- headers={},
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- @mock.patch('requests.Session', FakeSession)
- def test_get_response_check(self):
- t = SimpleHttpOperator(
- task_id='get_op',
- method='GET',
- endpoint='/search',
- data={"client": "ubuntu", "q": "airflow"},
- response_check=lambda response: ("airbnb/airflow" in response.text),
- headers={},
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- @mock.patch('requests.Session', FakeSession)
- def test_sensor(self):
-
- sensor = sensors.HttpSensor(
- task_id='http_sensor_check',
- http_conn_id='http_default',
- endpoint='/search',
- request_params={"client": "ubuntu", "q": "airflow", 'date': '{{ds}}'},
- headers={},
- response_check=lambda response: (
- "airbnb/airflow/" + DEFAULT_DATE.strftime('%Y-%m-%d')
- in response.text),
- poke_interval=5,
- timeout=15,
- dag=self.dag)
- sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
class FakeWebHDFSHook(object):
def __init__(self, conn_id):
self.conn_id = conn_id
@@ -2154,45 +2026,126 @@ class FakeSnakeBiteClient(object):
if path[0] == '/datadirectory/empty_directory' and not include_toplevel:
return []
elif path[0] == '/datadirectory/datafile':
- return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/datafile'}]
+ return [{
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 0,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/datafile'
+ }]
elif path[0] == '/datadirectory/empty_directory' and include_toplevel:
- return [
- {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 'access_time': 0, 'block_replication': 0,
- 'modification_time': 1481132141540, 'length': 0, 'blocksize': 0, 'owner': u'hdfs',
- 'path': '/datadirectory/empty_directory'}]
+ return [{
+ 'group': u'supergroup',
+ 'permission': 493,
+ 'file_type': 'd',
+ 'access_time': 0,
+ 'block_replication': 0,
+ 'modification_time': 1481132141540,
+ 'length': 0,
+ 'blocksize': 0,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/empty_directory'
+ }]
elif path[0] == '/datadirectory/not_empty_directory' and include_toplevel:
- return [
- {'group': u'supergroup', 'permission': 493, 'file_type': 'd', 'access_time': 0, 'block_replication': 0,
- 'modification_time': 1481132141540, 'length': 0, 'blocksize': 0, 'owner': u'hdfs',
- 'path': '/datadirectory/empty_directory'},
- {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/not_empty_directory/test_file'}]
+ return [{
+ 'group': u'supergroup',
+ 'permission': 493,
+ 'file_type': 'd',
+ 'access_time': 0,
+ 'block_replication': 0,
+ 'modification_time': 1481132141540,
+ 'length': 0,
+ 'blocksize': 0,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/empty_directory'
+ }, {
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 0,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/not_empty_directory/test_file'
+ }]
elif path[0] == '/datadirectory/not_empty_directory':
- return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 0, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/not_empty_directory/test_file'}]
+ return [{
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 0,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/not_empty_directory/test_file'
+ }]
elif path[0] == '/datadirectory/not_existing_file_or_directory':
raise FakeSnakeBiteClientException
elif path[0] == '/datadirectory/regex_dir':
- return [{'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test1file'},
- {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test2file'},
- {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/test3file'},
- {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/copying_file_1.txt._COPYING_'},
- {'group': u'supergroup', 'permission': 420, 'file_type': 'f', 'access_time': 1481122343796,
- 'block_replication': 3, 'modification_time': 1481122343862, 'length': 12582912, 'blocksize': 134217728,
- 'owner': u'hdfs', 'path': '/datadirectory/regex_dir/copying_file_3.txt.sftp'}
- ]
+ return [{
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862, 'length': 12582912,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/regex_dir/test1file'
+ }, {
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 12582912,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/regex_dir/test2file'
+ }, {
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 12582912,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/regex_dir/test3file'
+ }, {
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 12582912,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/regex_dir/copying_file_1.txt._COPYING_'
+ }, {
+ 'group': u'supergroup',
+ 'permission': 420,
+ 'file_type': 'f',
+ 'access_time': 1481122343796,
+ 'block_replication': 3,
+ 'modification_time': 1481122343862,
+ 'length': 12582912,
+ 'blocksize': 134217728,
+ 'owner': u'hdfs',
+ 'path': '/datadirectory/regex_dir/copying_file_3.txt.sftp'
+ }]
else:
raise FakeSnakeBiteClientException
@@ -2424,7 +2377,10 @@ class EmailTest(unittest.TestCase):
def test_custom_backend(self, mock_send_email):
configuration.set('email', 'EMAIL_BACKEND', 'tests.core.send_email_test')
utils.email.send_email('to', 'subject', 'content')
- send_email_test.assert_called_with('to', 'subject', 'content', files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed')
+ send_email_test.assert_called_with(
+ 'to', 'subject', 'content', files=None, dryrun=False,
+ cc=None, bcc=None, mime_subtype='mixed'
+ )
self.assertFalse(mock_send_email.called)
@@ -2447,7 +2403,7 @@ class EmailSmtpTest(unittest.TestCase):
self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
self.assertEqual(2, len(msg.get_payload()))
self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"',
- msg.get_payload()[-1].get(u'Content-Disposition'))
+ msg.get_payload()[-1].get(u'Content-Disposition'))
mimeapp = MIMEApplication('attachment')
self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload())
@@ -2466,11 +2422,10 @@ class EmailSmtpTest(unittest.TestCase):
self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
self.assertEqual(2, len(msg.get_payload()))
self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"',
- msg.get_payload()[-1].get(u'Content-Disposition'))
+ msg.get_payload()[-1].get(u'Content-Disposition'))
mimeapp = MIMEApplication('attachment')
self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload())
-
@mock.patch('smtplib.SMTP_SSL')
@mock.patch('smtplib.SMTP')
def test_send_mime(self, mock_smtp, mock_smtp_ssl):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py
index e6f6830..77d415c 100644
--- a/tests/operators/__init__.py
+++ b/tests/operators/__init__.py
@@ -15,7 +15,6 @@
from .docker_operator import *
from .subdag_operator import *
from .operators import *
-from .sensors import *
from .hive_operator import *
from .s3_to_hive_operator import *
from .python_operator import *
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 40f0ffd..ae0bec8 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -105,14 +105,6 @@ class MySqlTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- def test_sql_sensor(self):
- t = operators.sensors.SqlSensor(
- task_id='sql_sensor_check',
- conn_id='mysql_default',
- sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
def test_overwrite_schema(self):
"""
Verifies option to overwrite connection schema
@@ -191,14 +183,6 @@ class PostgresTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- def test_sql_sensor(self):
- t = operators.sensors.SqlSensor(
- task_id='sql_sensor_check',
- conn_id='postgres_default',
- sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
- dag=self.dag)
- t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
def test_vacuum(self):
"""
Verifies the VACUUM operation runs well with the PostgresOperator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
deleted file mode 100644
index d60f15c..0000000
--- a/tests/operators/sensors.py
+++ /dev/null
@@ -1,382 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-import sys
-import time
-import unittest
-from datetime import timedelta
-from mock import patch
-
-from airflow import DAG, configuration, settings
-from airflow.exceptions import (AirflowException,
- AirflowSensorTimeout,
- AirflowSkipException)
-from airflow.models import TaskInstance
-from airflow.operators.bash_operator import BashOperator
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.sensors import HttpSensor, BaseSensorOperator, HdfsSensor, ExternalTaskSensor
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.state import State
-from airflow.utils import timezone
-from airflow.utils.timezone import datetime
-
-try:
- from unittest import mock
-except ImportError:
- try:
- import mock
- except ImportError:
- mock = None
-
-configuration.load_test_config()
-
-DEFAULT_DATE = datetime(2015, 1, 1)
-TEST_DAG_ID = 'unit_test_dag'
-
-
-class TimeoutTestSensor(BaseSensorOperator):
- """
- Sensor that always returns the return_value provided
-
- :param return_value: Set to true to mark the task as SKIPPED on failure
- :type return_value: any
- """
-
- @apply_defaults
- def __init__(
- self,
- return_value=False,
- *args,
- **kwargs):
- self.return_value = return_value
- super(TimeoutTestSensor, self).__init__(*args, **kwargs)
-
- def poke(self, context):
- return self.return_value
-
- def execute(self, context):
- started_at = timezone.utcnow()
- time_jump = self.params.get('time_jump')
- while not self.poke(context):
- if time_jump:
- started_at -= time_jump
- if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
- if self.soft_fail:
- raise AirflowSkipException('Snap. Time is OUT.')
- else:
- raise AirflowSensorTimeout('Snap. Time is OUT.')
- time.sleep(self.poke_interval)
- self.log.info("Success criteria met. Exiting.")
-
-
-class SensorTimeoutTest(unittest.TestCase):
- def setUp(self):
- configuration.load_test_config()
- args = {
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE
- }
- dag = DAG(TEST_DAG_ID, default_args=args)
- self.dag = dag
-
- def test_timeout(self):
- t = TimeoutTestSensor(
- task_id='test_timeout',
- execution_timeout=timedelta(days=2),
- return_value=False,
- poke_interval=5,
- params={'time_jump': timedelta(days=2, seconds=1)},
- dag=self.dag
- )
- self.assertRaises(
- AirflowSensorTimeout,
- t.run,
- start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-
-class HttpSensorTests(unittest.TestCase):
- def setUp(self):
- configuration.load_test_config()
- args = {
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE
- }
- dag = DAG(TEST_DAG_ID, default_args=args)
- self.dag = dag
-
- def test_poke_exception(self):
- """
- Exception occurs in poke function should not be ignored.
- """
- def resp_check(resp):
- raise AirflowException('AirflowException raised here!')
-
- task = HttpSensor(
- task_id='http_sensor_poke_exception',
- http_conn_id='http_default',
- endpoint='',
- request_params={},
- response_check=resp_check,
- poke_interval=5)
- with self.assertRaisesRegexp(AirflowException, 'AirflowException raised here!'):
- task.execute(None)
-
- @patch("airflow.hooks.http_hook.requests.Session.send")
- def test_head_method(self, mock_session_send):
- def resp_check(resp):
- return True
-
- task = HttpSensor(
- dag=self.dag,
- task_id='http_sensor_head_method',
- http_conn_id='http_default',
- endpoint='',
- request_params={},
- method='HEAD',
- response_check=resp_check,
- timeout=5,
- poke_interval=1)
-
- import requests
- task.execute(None)
-
- args, kwargs = mock_session_send.call_args
- received_request = args[0]
-
- prep_request = requests.Request(
- 'HEAD',
- 'https://www.google.com',
- {}).prepare()
-
- self.assertEqual(prep_request.url, received_request.url)
- self.assertTrue(prep_request.method, received_request.method)
-
- @patch("airflow.hooks.http_hook.requests.Session.send")
- def test_logging_head_error_request(
- self,
- mock_session_send
- ):
-
- def resp_check(resp):
- return True
-
- import requests
- response = requests.Response()
- response.status_code = 404
- response.reason = 'Not Found'
- mock_session_send.return_value = response
-
- task = HttpSensor(
- dag=self.dag,
- task_id='http_sensor_head_method',
- http_conn_id='http_default',
- endpoint='',
- request_params={},
- method='HEAD',
- response_check=resp_check,
- timeout=5,
- poke_interval=1
- )
-
- with mock.patch.object(task.hook.log, 'error') as mock_errors:
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
- self.assertTrue(mock_errors.called)
- mock_errors.assert_called_with('HTTP error: %s', 'Not Found')
-
-
-class HdfsSensorTests(unittest.TestCase):
-
- def setUp(self):
- from tests.core import FakeHDFSHook
- self.hook = FakeHDFSHook
-
- def test_legacy_file_exist(self):
- """
- Test the legacy behaviour
- :return:
- """
- # Given
- logging.info("Test for existing file with the legacy behaviour")
- # When
- task = HdfsSensor(task_id='Should_be_file_legacy',
- filepath='/datadirectory/datafile',
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
- task.execute(None)
-
- # Then
- # Nothing happens, nothing is raised exec is ok
-
- def test_legacy_file_exist_but_filesize(self):
- """
- Test the legacy behaviour with the filesize
- :return:
- """
- # Given
- logging.info("Test for existing file with the legacy behaviour")
- # When
- task = HdfsSensor(task_id='Should_be_file_legacy',
- filepath='/datadirectory/datafile',
- timeout=1,
- file_size=20,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
- def test_legacy_file_does_not_exists(self):
- """
- Test the legacy behaviour
- :return:
- """
- # Given
- logging.info("Test for non existing file with the legacy behaviour")
- task = HdfsSensor(task_id='Should_not_be_file_legacy',
- filepath='/datadirectory/not_existing_file_or_directory',
- timeout=1,
- retry_delay=timedelta(seconds=1),
- poke_interval=1,
- hook=self.hook)
-
- # When
- # Then
- with self.assertRaises(AirflowSensorTimeout):
- task.execute(None)
-
-
-class ExternalTaskSensorTests(unittest.TestCase):
-
- def setUp(self):
- configuration.load_test_config()
- self.args = {
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE,
- 'depends_on_past': False}
-
- def test_templated_sensor(self):
- dag = DAG(TEST_DAG_ID, self.args)
-
- with dag:
- sensor = ExternalTaskSensor(
- task_id='templated_task',
- external_dag_id='dag_{{ ds }}',
- external_task_id='task_{{ ds }}',
- start_date=DEFAULT_DATE
- )
-
- instance = TaskInstance(sensor, DEFAULT_DATE)
- instance.render_templates()
-
- self.assertEqual(sensor.external_dag_id,
- "dag_{}".format(DEFAULT_DATE.date()))
- self.assertEqual(sensor.external_task_id,
- "task_{}".format(DEFAULT_DATE.date()))
-
- def test_external_task_sensor_fn_multiple_execution_dates(self):
- bash_command_code = """
-{% set s=execution_date.time().second %}
-echo "second is {{ s }}"
-if [[ $(( {{ s }} % 60 )) == 1 ]]
- then
- exit 1
-fi
-exit 0
-"""
- dag_external_id = TEST_DAG_ID + '_external'
- dag_external = DAG(
- dag_external_id,
- default_args=self.args,
- schedule_interval=timedelta(seconds=1))
- task_external_with_failure = BashOperator(
- task_id="task_external_with_failure",
- bash_command=bash_command_code,
- retries=0,
- dag=dag_external)
- task_external_without_failure = DummyOperator(
- task_id="task_external_without_failure",
- retries=0,
- dag=dag_external)
-
- task_external_without_failure.run(
- start_date=DEFAULT_DATE,
- end_date=DEFAULT_DATE + timedelta(seconds=1),
- ignore_ti_state=True)
-
- session = settings.Session()
- TI = TaskInstance
- try:
- task_external_with_failure.run(
- start_date=DEFAULT_DATE,
- end_date=DEFAULT_DATE + timedelta(seconds=1),
- ignore_ti_state=True)
- # The test_with_failure task is excepted to fail
- # once per minute (the run on the first second of
- # each minute).
- except Exception as e:
- failed_tis = session.query(TI).filter(
- TI.dag_id == dag_external_id,
- TI.state == State.FAILED,
- TI.execution_date == DEFAULT_DATE + timedelta(seconds=1)).all()
- if (len(failed_tis) == 1 and
- failed_tis[0].task_id == 'task_external_with_failure'):
- pass
- else:
- raise e
-
- dag_id = TEST_DAG_ID
- dag = DAG(
- dag_id,
- default_args=self.args,
- schedule_interval=timedelta(minutes=1))
- task_without_failure = ExternalTaskSensor(
- task_id='task_without_failure',
- external_dag_id=dag_external_id,
- external_task_id='task_external_without_failure',
- execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
- for i in range(2)],
- allowed_states=['success'],
- retries=0,
- timeout=1,
- poke_interval=1,
- dag=dag)
- task_with_failure = ExternalTaskSensor(
- task_id='task_with_failure',
- external_dag_id=dag_external_id,
- external_task_id='task_external_with_failure',
- execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
- for i in range(2)],
- allowed_states=['success'],
- retries=0,
- timeout=1,
- poke_interval=1,
- dag=dag)
-
- task_without_failure.run(
- start_date=DEFAULT_DATE,
- end_date=DEFAULT_DATE,
- ignore_ti_state=True)
-
- with self.assertRaises(AirflowSensorTimeout):
- task_with_failure.run(
- start_date=DEFAULT_DATE,
- end_date=DEFAULT_DATE,
- ignore_ti_state=True)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/sensors/__init__.py b/tests/sensors/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/sensors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_external_task_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
new file mode 100644
index 0000000..32e073d
--- /dev/null
+++ b/tests/sensors/test_external_task_sensor.py
@@ -0,0 +1,244 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from datetime import timedelta, time
+
+from airflow import DAG, configuration, settings
+from airflow import exceptions
+from airflow.exceptions import AirflowSensorTimeout
+from airflow.models import TaskInstance, DagBag
+from airflow.operators.bash_operator import BashOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.sensors.external_task_sensor import ExternalTaskSensor
+from airflow.sensors.time_sensor import TimeSensor
+from airflow.utils.state import State
+from airflow.utils.timezone import datetime
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+TEST_TASK_ID = 'time_sensor_check'
+DEV_NULL = '/dev/null'
+
+
+class ExternalTaskSensorTests(unittest.TestCase):
+
+ def setUp(self):
+ configuration.load_test_config()
+ self.dagbag = DagBag(
+ dag_folder=DEV_NULL,
+ include_examples=True
+ )
+ self.args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+ self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+
+ def test_time_sensor(self):
+ t = TimeSensor(
+ task_id=TEST_TASK_ID,
+ target_time=time(0),
+ dag=self.dag
+ )
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ def test_external_task_sensor(self):
+ self.test_time_sensor()
+ t = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ dag=self.dag
+ )
+ t.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+
+ def test_templated_sensor(self):
+ dag = DAG(TEST_DAG_ID, self.args)
+
+ with dag:
+ sensor = ExternalTaskSensor(
+ task_id='templated_task',
+ external_dag_id='dag_{{ ds }}',
+ external_task_id='task_{{ ds }}',
+ start_date=DEFAULT_DATE
+ )
+
+ instance = TaskInstance(sensor, DEFAULT_DATE)
+ instance.render_templates()
+
+ self.assertEqual(sensor.external_dag_id,
+ "dag_{}".format(DEFAULT_DATE.date()))
+ self.assertEqual(sensor.external_task_id,
+ "task_{}".format(DEFAULT_DATE.date()))
+
+ def test_external_task_sensor_fn_multiple_execution_dates(self):
+ bash_command_code = """
+{% set s=execution_date.time().second %}
+echo "second is {{ s }}"
+if [[ $(( {{ s }} % 60 )) == 1 ]]
+ then
+ exit 1
+fi
+exit 0
+"""
+ dag_external_id = TEST_DAG_ID + '_external'
+ dag_external = DAG(
+ dag_external_id,
+ default_args=self.args,
+ schedule_interval=timedelta(seconds=1))
+ task_external_with_failure = BashOperator(
+ task_id="task_external_with_failure",
+ bash_command=bash_command_code,
+ retries=0,
+ dag=dag_external)
+ task_external_without_failure = DummyOperator(
+ task_id="task_external_without_failure",
+ retries=0,
+ dag=dag_external)
+
+ task_external_without_failure.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + timedelta(seconds=1),
+ ignore_ti_state=True)
+
+ session = settings.Session()
+ TI = TaskInstance
+ try:
+ task_external_with_failure.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + timedelta(seconds=1),
+ ignore_ti_state=True)
+ # The test_with_failure task is excepted to fail
+ # once per minute (the run on the first second of
+ # each minute).
+ except Exception as e:
+ failed_tis = session.query(TI).filter(
+ TI.dag_id == dag_external_id,
+ TI.state == State.FAILED,
+ TI.execution_date == DEFAULT_DATE + timedelta(seconds=1)).all()
+ if len(failed_tis) == 1 and \
+ failed_tis[0].task_id == 'task_external_with_failure':
+ pass
+ else:
+ raise e
+
+ dag_id = TEST_DAG_ID
+ dag = DAG(
+ dag_id,
+ default_args=self.args,
+ schedule_interval=timedelta(minutes=1))
+ task_without_failure = ExternalTaskSensor(
+ task_id='task_without_failure',
+ external_dag_id=dag_external_id,
+ external_task_id='task_external_without_failure',
+ execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
+ for i in range(2)],
+ allowed_states=['success'],
+ retries=0,
+ timeout=1,
+ poke_interval=1,
+ dag=dag)
+ task_with_failure = ExternalTaskSensor(
+ task_id='task_with_failure',
+ external_dag_id=dag_external_id,
+ external_task_id='task_external_with_failure',
+ execution_date_fn=lambda dt: [dt + timedelta(seconds=i)
+ for i in range(2)],
+ allowed_states=['success'],
+ retries=0,
+ timeout=1,
+ poke_interval=1,
+ dag=dag)
+
+ task_without_failure.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
+
+ with self.assertRaises(AirflowSensorTimeout):
+ task_with_failure.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True)
+
+ def test_external_task_sensor_delta(self):
+ self.test_time_sensor()
+ t = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ execution_delta=timedelta(0),
+ allowed_states=['success'],
+ dag=self.dag
+ )
+ t.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+
+ def test_external_task_sensor_fn(self):
+ self.test_time_sensor()
+ # check that the execution_fn works
+ t = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ execution_date_fn=lambda dt: dt + timedelta(0),
+ allowed_states=['success'],
+ dag=self.dag
+ )
+ t.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+ # double check that the execution is being called by failing the test
+ t2 = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ execution_date_fn=lambda dt: dt + timedelta(days=1),
+ allowed_states=['success'],
+ timeout=1,
+ poke_interval=1,
+ dag=self.dag
+ )
+ with self.assertRaises(exceptions.AirflowSensorTimeout):
+ t2.run(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ ignore_ti_state=True
+ )
+
+ def test_external_task_sensor_error_delta_and_fn(self):
+ self.test_time_sensor()
+ # Test that providing execution_delta and a function raises an error
+ with self.assertRaises(ValueError):
+ ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id=TEST_TASK_ID,
+ execution_delta=timedelta(0),
+ execution_date_fn=lambda dt: dt,
+ allowed_states=['success'],
+ dag=self.dag
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/33c72042/tests/sensors/test_hdfs_sensor.py
----------------------------------------------------------------------
diff --git a/tests/sensors/test_hdfs_sensor.py b/tests/sensors/test_hdfs_sensor.py
new file mode 100644
index 0000000..4ff9c52
--- /dev/null
+++ b/tests/sensors/test_hdfs_sensor.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from datetime import timedelta
+
+from airflow import configuration
+from airflow.exceptions import AirflowSensorTimeout
+from airflow.sensors.hdfs_sensor import HdfsSensor
+from airflow.utils.timezone import datetime
+from tests.core import FakeHDFSHook
+
+configuration.load_test_config()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class HdfsSensorTests(unittest.TestCase):
+
+ def setUp(self):
+ self.hook = FakeHDFSHook
+
+ def test_legacy_file_exist(self):
+ """
+ Test the legacy behaviour
+ :return:
+ """
+ # When
+ task = HdfsSensor(task_id='Should_be_file_legacy',
+ filepath='/datadirectory/datafile',
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+ task.execute(None)
+
+ # Then
+ # Nothing happens, nothing is raised exec is ok
+
+ def test_legacy_file_exist_but_filesize(self):
+ """
+ Test the legacy behaviour with the filesize
+ :return:
+ """
+ # When
+ task = HdfsSensor(task_id='Should_be_file_legacy',
+ filepath='/datadirectory/datafile',
+ timeout=1,
+ file_size=20,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)
+
+ def test_legacy_file_does_not_exists(self):
+ """
+ Test the legacy behaviour
+ :return:
+ """
+ task = HdfsSensor(task_id='Should_not_be_file_legacy',
+ filepath='/datadirectory/not_existing_file_or_directory',
+ timeout=1,
+ retry_delay=timedelta(seconds=1),
+ poke_interval=1,
+ hook=self.hook)
+
+ # When
+ # Then
+ with self.assertRaises(AirflowSensorTimeout):
+ task.execute(None)