You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/05 07:56:45 UTC
incubator-airflow git commit: [AIRFLOW-1065] Add functionality for
Azure Blob Storage over wasb://
Repository: incubator-airflow
Updated Branches:
refs/heads/master f5462c78f -> f1bc5f38a
[AIRFLOW-1065] Add functionality for Azure Blob Storage over wasb://
This PR implements a hook to interface with Azure
storage over wasb://
via azure-storage; adds sensors to check for blobs
or prefixes; and
adds an operator to transfer a local file to the
Blob Storage.
Design is similar to that of the S3Hook in
airflow.operators.S3_hook.
Closes #2216 from hgrif/AIRFLOW-1065
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f1bc5f38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f1bc5f38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f1bc5f38
Branch: refs/heads/master
Commit: f1bc5f38ac88054270e8b1863d48e953b58a7c74
Parents: f5462c7
Author: Henk Griffioen <hg...@users.noreply.github.com>
Authored: Wed Apr 5 09:56:23 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Apr 5 09:56:23 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/__init__.py | 3 +-
airflow/contrib/hooks/wasb_hook.py | 94 +++++++++++++++
airflow/contrib/operators/__init__.py | 1 +
airflow/contrib/operators/file_to_wasb.py | 61 ++++++++++
airflow/contrib/sensors/wasb_sensor.py | 97 +++++++++++++++
airflow/models.py | 4 +
airflow/utils/db.py | 4 +
docs/code.rst | 1 +
docs/integration.rst | 50 ++++++++
scripts/ci/requirements.txt | 1 +
setup.py | 2 +
tests/contrib/hooks/test_wasb_hook.py | 113 ++++++++++++++++++
tests/contrib/operators/test_file_to_wasb.py | 88 ++++++++++++++
tests/contrib/sensors/test_wasb_sensor.py | 138 ++++++++++++++++++++++
tests/core.py | 1 +
15 files changed, 657 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 19fc2b4..182a49f 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -44,7 +44,8 @@ _hooks = {
'gcp_dataflow_hook': ['DataFlowHook'],
'spark_submit_operator': ['SparkSubmitOperator'],
'cloudant_hook': ['CloudantHook'],
- 'fs_hook': ['FSHook']
+ 'fs_hook': ['FSHook'],
+ 'wasb_hook': ['WasbHook']
}
import os as _os
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/hooks/wasb_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/wasb_hook.py b/airflow/contrib/hooks/wasb_hook.py
new file mode 100644
index 0000000..89eaa5b
--- /dev/null
+++ b/airflow/contrib/hooks/wasb_hook.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from airflow.hooks.base_hook import BaseHook
+
+from azure.storage.blob import BlockBlobService
+
+
+class WasbHook(BaseHook):
+ """
+ Interacts with Azure Blob Storage through the wasb:// protocol.
+
+ Additional options passed in the 'extra' field of the connection will be
+ passed to the `BlockBlockService()` constructor. For example, authenticate
+ using a SAS token by adding {"sas_token": "YOUR_TOKEN"}.
+
+ :param wasb_conn_id: Reference to the wasb connection.
+ :type wasb_conn_id: str
+ """
+
+ def __init__(self, wasb_conn_id='wasb_default'):
+ self.conn_id = wasb_conn_id
+ self.connection = self.get_conn()
+
+ def get_conn(self):
+ """Return the BlockBlobService object."""
+ conn = self.get_connection(self.conn_id)
+ service_options = conn.extra_dejson
+ return BlockBlobService(account_name=conn.login,
+ account_key=conn.password, **service_options)
+
+ def check_for_blob(self, container_name, blob_name, **kwargs):
+ """
+ Check if a blob exists on Azure Blob Storage.
+
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param blob_name: Name of the blob.
+ :type blob_name: str
+ :param kwargs: Optional keyword arguments that
+ `BlockBlobService.exists()` takes.
+ :type kwargs: object
+ :return: True if the blob exists, False otherwise.
+ :rtype bool
+ """
+ return self.connection.exists(container_name, blob_name, **kwargs)
+
+ def check_for_prefix(self, container_name, prefix, **kwargs):
+ """
+ Check if a prefix exists on Azure Blob storage.
+
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param prefix: Prefix of the blob.
+ :type prefix: str
+ :param kwargs: Optional keyword arguments that
+ `BlockBlobService.list_blobs()` takes.
+ :type kwargs: object
+ :return: True if blobs matching the prefix exist, False otherwise.
+ :rtype bool
+ """
+ matches = self.connection.list_blobs(container_name, prefix,
+ num_results=1, **kwargs)
+ return len(list(matches)) > 0
+
+ def load_file(self, file_path, container_name, blob_name, **kwargs):
+ """
+ Upload a file to Azure Blob Storage.
+
+ :param file_path: Path to the file to load.
+ :type file_path: str
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param blob_name: Name of the blob.
+ :type blob_name: str
+ :param kwargs: Optional keyword arguments that
+ `BlockBlobService.create_blob_from_path()` takes.
+ :type kwargs: object
+ """
+ # Reorder the argument order from airflow.hooks.S3_hook.load_file.
+ self.connection.create_blob_from_path(container_name, blob_name,
+ file_path, **kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index bef3433..4ea6c17 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -37,6 +37,7 @@ _operators = {
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator'],
'spark_submit_operator': ['SparkSubmitOperator'],
+ 'file_to_wasb': ['FileToWasbOperator'],
'fs_operator': ['FileSensor']
}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
new file mode 100644
index 0000000..32e6b29
--- /dev/null
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class FileToWasbOperator(BaseOperator):
+ """
+ Uploads a file to Azure Blob Storage.
+
+ :param file_path: Path to the file to load.
+ :type file_path: str
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param blob_name: Name of the blob.
+ :type blob_name: str
+ :param wasb_conn_id: Reference to the wasb connection.
+ :type wasb_conn_id: str
+ :param load_options: Optional keyword arguments that
+ `WasbHook.load_file()` takes.
+ :type load_options: dict
+ """
+ template_fields = ('file_path', 'container_name', 'blob_name')
+
+ @apply_defaults
+ def __init__(self, file_path, container_name, blob_name,
+ wasb_conn_id='wasb_default', load_options=None, *args,
+ **kwargs):
+ super(FileToWasbOperator, self).__init__(*args, **kwargs)
+ if load_options is None:
+ load_options = {}
+ self.file_path = file_path
+ self.container_name = container_name
+ self.blob_name = blob_name
+ self.wasb_conn_id = wasb_conn_id
+ self.load_options = load_options
+
+ def execute(self, context):
+ """Upload a file to Azure Blob Storage."""
+ hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+ logging.info(
+ 'Uploading {self.file_path} to wasb://{self.container_name} as '
+ '{self.blob_name}'.format(**locals()))
+ hook.load_file(self.file_path, self.container_name, self.blob_name,
+ **self.load_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
new file mode 100644
index 0000000..3f3d56c
--- /dev/null
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class WasbBlobSensor(BaseSensorOperator):
+ """
+ Waits for a blob to arrive on Azure Blob Storage.
+
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param blob_name: Name of the blob.
+ :type blob_name: str
+ :param wasb_conn_id: Reference to the wasb connection.
+ :type wasb_conn_id: str
+ :param check_options: Optional keyword arguments that
+ `WasbHook.check_for_blob()` takes.
+ :type check_options: dict
+ """
+
+ template_fields = ('container_name', 'blob_name')
+
+ @apply_defaults
+ def __init__(self, container_name, blob_name,
+ wasb_conn_id='wasb_default', check_options=None, *args,
+ **kwargs):
+ super(WasbBlobSensor, self).__init__(*args, **kwargs)
+ if check_options is None:
+ check_options = {}
+ self.wasb_conn_id = wasb_conn_id
+ self.container_name = container_name
+ self.blob_name = blob_name
+ self.check_options = check_options
+
+ def poke(self, context):
+ logging.info(
+ 'Poking for blob: {self.blob_name}\n'
+ 'in wasb://{self.container_name}'.format(**locals())
+ )
+ hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+ return hook.check_for_blob(self.container_name, self.blob_name,
+ **self.check_options)
+
+
+class WasbPrefixSensor(BaseSensorOperator):
+ """
+ Waits for blobs matching a prefix to arrive on Azure Blob Storage.
+
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param prefix: Prefix of the blob.
+ :type prefix: str
+ :param wasb_conn_id: Reference to the wasb connection.
+ :type wasb_conn_id: str
+ :param check_options: Optional keyword arguments that
+ `WasbHook.check_for_prefix()` takes.
+ :type check_options: dict
+ """
+
+ template_fields = ('container_name', 'prefix')
+
+ @apply_defaults
+ def __init__(self, container_name, prefix, wasb_conn_id='wasb_default',
+ check_options=None, *args, **kwargs):
+ super(WasbPrefixSensor, self).__init__(*args, **kwargs)
+ if check_options is None:
+ check_options = {}
+ self.wasb_conn_id = wasb_conn_id
+ self.container_name = container_name
+ self.prefix = prefix
+ self.check_options = check_options
+
+ def poke(self, context):
+ logging.info(
+ 'Poking for prefix: {self.prefix}\n'
+ 'in wasb://{self.container_name}'.format(**locals())
+ )
+ hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+ return hook.check_for_prefix(self.container_name, self.prefix,
+ **self.check_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7171c05..8628100 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -542,6 +542,7 @@ class Connection(Base):
('mesos_framework-id', 'Mesos Framework ID'),
('jira', 'JIRA',),
('redis', 'Redis',),
+ ('wasb', 'Azure Blob Storage'),
]
def __init__(
@@ -674,6 +675,9 @@ class Connection(Base):
elif self.conn_type == 'redis':
from airflow.contrib.hooks.redis_hook import RedisHook
return RedisHook(redis_conn_id=self.conn_id)
+ elif self.conn_type == 'wasb':
+ from airflow.contrib.hooks.wasb_hook import WasbHook
+ return WasbHook(wasb_conn_id=self.conn_id)
except:
pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index a619a41..7da9217 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -171,6 +171,10 @@ def initdb():
host='localhost', port=5433))
merge_conn(
models.Connection(
+ conn_id='wasb_default', conn_type='wasb',
+ extra='{"sas_token": null}'))
+ merge_conn(
+ models.Connection(
conn_id='webhdfs_default', conn_type='hdfs',
host='localhost', port=50070))
merge_conn(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index fabe6db..683e85f 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -98,6 +98,7 @@ Community-contributed Operators
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
.. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator
+.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
.. autoclass:: airflow.contrib.operators.QuboleOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 10bc038..4a6b676 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -1,9 +1,59 @@
Integration
===========
+- :ref:`Azure`
- :ref:`AWS`
- :ref:`GCP`
+.. _Azure:
+
+Azure: Microsoft Azure
+----------------------
+
+Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob
+Storage. Note that the Hook, Sensor and Operator are in the contrib section.
+
+Azure Blob Storage
+''''''''''''''''''
+
+All classes communicate via the Window Azure Storage Blob protocol. Make sure that a
+Airflow connection of type `wasb` exists. Authorization can be done by supplying a
+login (=Storage account name) and password (=KEY), or login and SAS token in the extra
+field (see connection `wasb_default` for an example).
+
+- :ref:`WasbBlobSensor`: Checks if a blob is present on Azure Blob storage.
+- :ref:`WasbPrefixSensor`: Checks if blobs matching a prefix are present on Azure Blob storage.
+- :ref:`FileToWasbOperator`: Uploads a local file to a container as a blob.
+- :ref:`WasbHook`: Interface with Azure Blob Storage.
+
+.. _WasbBlobSensor:
+
+WasbBlobSensor
+"""""""""""""""
+
+.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
+
+.. _WasbPrefixSensor:
+
+WasbPrefixSensor
+"""""""""""""""""
+
+.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor
+
+.. _FileToWasbOperator:
+
+FileToWasbOperator
+"""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
+
+.. _WasbHook:
+
+WasbHook
+"""""""""
+
+.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
+
.. _AWS:
AWS: Amazon Webservices
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index d206f16..1905398 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -1,4 +1,5 @@
alembic
+azure-storage>=0.34.0
bcrypt
bleach
boto
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index a582499..ea60dca 100644
--- a/setup.py
+++ b/setup.py
@@ -104,6 +104,7 @@ async = [
'eventlet>= 0.9.7',
'gevent>=0.13'
]
+azure = ['azure-storage>=0.34.0']
celery = [
'celery>=3.1.17',
'flower>=0.7.3'
@@ -237,6 +238,7 @@ def do_setup():
'all': devel_all,
'all_dbs': all_dbs,
'async': async,
+ 'azure': azure,
'celery': celery,
'cgroups': cgroups,
'cloudant': cloudant,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/hooks/test_wasb_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_wasb_hook.py b/tests/contrib/hooks/test_wasb_hook.py
new file mode 100644
index 0000000..aa92937
--- /dev/null
+++ b/tests/contrib/hooks/test_wasb_hook.py
@@ -0,0 +1,113 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import unittest
+
+from airflow import configuration
+from airflow import models
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.utils import db
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+
+class TestWasbHook(unittest.TestCase):
+
+ def setUp(self):
+ configuration.load_test_config()
+ db.merge_conn(
+ models.Connection(
+ conn_id='wasb_test_key', conn_type='wasb',
+ login='login', password='key'
+ )
+ )
+ db.merge_conn(
+ models.Connection(
+ conn_id='wasb_test_sas_token', conn_type='wasb',
+ login='login', extra=json.dumps({'sas_token': 'token'})
+ )
+ )
+
+ def test_key(self):
+ from azure.storage.blob import BlockBlobService
+ hook = WasbHook(wasb_conn_id='wasb_test_key')
+ self.assertEqual(hook.conn_id, 'wasb_test_key')
+ self.assertIsInstance(hook.connection, BlockBlobService)
+
+ def test_sas_token(self):
+ from azure.storage.blob import BlockBlobService
+ hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+ self.assertEqual(hook.conn_id, 'wasb_test_sas_token')
+ self.assertIsInstance(hook.connection, BlockBlobService)
+
+ @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+ autospec=True)
+ def test_check_for_blob(self, mock_service):
+ mock_instance = mock_service.return_value
+ mock_instance.exists.return_value = True
+ hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+ self.assertTrue(hook.check_for_blob('container', 'blob', timeout=3))
+ mock_instance.exists.assert_called_once_with(
+ 'container', 'blob', timeout=3
+ )
+
+ @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+ autospec=True)
+ def test_check_for_blob_empty(self, mock_service):
+ mock_service.return_value.exists.return_value = False
+ hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+ self.assertFalse(hook.check_for_blob('container', 'blob'))
+
+ @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+ autospec=True)
+ def test_check_for_prefix(self, mock_service):
+ mock_instance = mock_service.return_value
+ mock_instance.list_blobs.return_value = iter(['blob_1'])
+ hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+ self.assertTrue(hook.check_for_prefix('container', 'prefix',
+ timeout=3))
+ mock_instance.list_blobs.assert_called_once_with(
+ 'container', 'prefix', timeout=3
+ )
+
+ @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+ autospec=True)
+ def test_check_for_prefix_empty(self, mock_service):
+ mock_instance = mock_service.return_value
+ mock_instance.list_blobs.return_value = iter([])
+ hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+ self.assertFalse(hook.check_for_prefix('container', 'prefix'))
+
+ @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+ autospec=True)
+ def test_check_for_prefix(self, mock_service):
+ mock_instance = mock_service.return_value
+ hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+ hook.load_file('path', 'container', 'blob', max_connections=1)
+ mock_instance.create_blob_from_path.assert_called_once_with(
+ 'container', 'blob', 'path', max_connections=1
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/operators/test_file_to_wasb.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_file_to_wasb.py b/tests/contrib/operators/test_file_to_wasb.py
new file mode 100644
index 0000000..bdaeb79
--- /dev/null
+++ b/tests/contrib/operators/test_file_to_wasb.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.file_to_wasb import FileToWasbOperator
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+
+class TestFileToWasbOperator(unittest.TestCase):
+
+ _config = {
+ 'file_path': 'file',
+ 'container_name': 'container',
+ 'blob_name': 'blob',
+ 'wasb_conn_id': 'wasb_default',
+ 'retries': 3,
+ }
+
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': datetime.datetime(2017, 1, 1)
+ }
+ self.dag = DAG('test_dag_id', default_args=args)
+
+ def test_init(self):
+ operator = FileToWasbOperator(
+ task_id='wasb_operator',
+ dag=self.dag,
+ **self._config
+ )
+ self.assertEqual(operator.file_path, self._config['file_path'])
+ self.assertEqual(operator.container_name,
+ self._config['container_name'])
+ self.assertEqual(operator.blob_name, self._config['blob_name'])
+ self.assertEqual(operator.wasb_conn_id, self._config['wasb_conn_id'])
+ self.assertEqual(operator.load_options, {})
+ self.assertEqual(operator.retries, self._config['retries'])
+
+ operator = FileToWasbOperator(
+ task_id='wasb_operator',
+ dag=self.dag,
+ load_options={'timeout': 2},
+ **self._config
+ )
+ self.assertEqual(operator.load_options, {'timeout': 2})
+
+ @mock.patch('airflow.contrib.operators.file_to_wasb.WasbHook',
+ autospec=True)
+ def test_execute(self, mock_hook):
+ mock_instance = mock_hook.return_value
+ operator = FileToWasbOperator(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ load_options={'timeout': 2},
+ **self._config
+ )
+ operator.execute(None)
+ mock_instance.load_file.assert_called_once_with(
+ 'file', 'container', 'blob', timeout=2
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/sensors/test_wasb_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_wasb_sensor.py b/tests/contrib/sensors/test_wasb_sensor.py
new file mode 100644
index 0000000..a26ba2d
--- /dev/null
+++ b/tests/contrib/sensors/test_wasb_sensor.py
@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor
+from airflow.contrib.sensors.wasb_sensor import WasbPrefixSensor
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+
+class TestWasbBlobSensor(unittest.TestCase):
+
+ _config = {
+ 'container_name': 'container',
+ 'blob_name': 'blob',
+ 'wasb_conn_id': 'conn_id',
+ 'timeout': 100,
+ }
+
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': datetime.datetime(2017, 1, 1)
+ }
+ self.dag = DAG('test_dag_id', default_args=args)
+
+ def test_init(self):
+ sensor = WasbBlobSensor(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ **self._config
+ )
+ self.assertEqual(sensor.container_name, self._config['container_name'])
+ self.assertEqual(sensor.blob_name, self._config['blob_name'])
+ self.assertEqual(sensor.wasb_conn_id, self._config['wasb_conn_id'])
+ self.assertEqual(sensor.check_options, {})
+ self.assertEqual(sensor.timeout, self._config['timeout'])
+
+ sensor = WasbBlobSensor(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ check_options={'timeout': 2},
+ **self._config
+ )
+ self.assertEqual(sensor.check_options, {'timeout': 2})
+
+ @mock.patch('airflow.contrib.sensors.wasb_sensor.WasbHook',
+ autospec=True)
+ def test_poke(self, mock_hook):
+ mock_instance = mock_hook.return_value
+ sensor = WasbBlobSensor(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ check_options={'timeout': 2},
+ **self._config
+ )
+ sensor.poke(None)
+ mock_instance.check_for_blob.assert_called_once_with(
+ 'container', 'blob', timeout=2
+ )
+
+
+class TestWasbPrefixSensor(unittest.TestCase):
+
+ _config = {
+ 'container_name': 'container',
+ 'prefix': 'prefix',
+ 'wasb_conn_id': 'conn_id',
+ 'timeout': 100,
+ }
+
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': datetime.datetime(2017, 1, 1)
+ }
+ self.dag = DAG('test_dag_id', default_args=args)
+
+ def test_init(self):
+ sensor = WasbPrefixSensor(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ **self._config
+ )
+ self.assertEqual(sensor.container_name, self._config['container_name'])
+ self.assertEqual(sensor.prefix, self._config['prefix'])
+ self.assertEqual(sensor.wasb_conn_id, self._config['wasb_conn_id'])
+ self.assertEqual(sensor.check_options, {})
+ self.assertEqual(sensor.timeout, self._config['timeout'])
+
+ sensor = WasbPrefixSensor(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ check_options={'timeout': 2},
+ **self._config
+ )
+ self.assertEqual(sensor.check_options, {'timeout': 2})
+
+ @mock.patch('airflow.contrib.sensors.wasb_sensor.WasbHook',
+ autospec=True)
+ def test_poke(self, mock_hook):
+ mock_instance = mock_hook.return_value
+ sensor = WasbPrefixSensor(
+ task_id='wasb_sensor',
+ dag=self.dag,
+ check_options={'timeout': 2},
+ **self._config
+ )
+ sensor.poke(None)
+ mock_instance.check_for_prefix.assert_called_once_with(
+ 'container', 'prefix', timeout=2
+ )
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 8b3d1b8..a6bf613 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1111,6 +1111,7 @@ class CliTests(unittest.TestCase):
self.assertIn(['mssql_default', 'mssql'], conns)
self.assertIn(['mysql_default', 'mysql'], conns)
self.assertIn(['postgres_default', 'postgres'], conns)
+ self.assertIn(['wasb_default', 'wasb'], conns)
# Attempt to list connections with invalid cli args
with mock.patch('sys.stdout',