You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/05 07:56:45 UTC

incubator-airflow git commit: [AIRFLOW-1065] Add functionality for Azure Blob Storage over wasb://

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f5462c78f -> f1bc5f38a


[AIRFLOW-1065] Add functionality for Azure Blob Storage over wasb://

This PR implements a hook to interface with Azure
storage over wasb://
via azure-storage; adds sensors to check for blobs
or prefixes; and
adds an operator to transfer a local file to the
Blob Storage.

Design is similar to that of the S3Hook in
airflow.operators.S3_hook.

Closes #2216 from hgrif/AIRFLOW-1065


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f1bc5f38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f1bc5f38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f1bc5f38

Branch: refs/heads/master
Commit: f1bc5f38ac88054270e8b1863d48e953b58a7c74
Parents: f5462c7
Author: Henk Griffioen <hg...@users.noreply.github.com>
Authored: Wed Apr 5 09:56:23 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Apr 5 09:56:23 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py            |   3 +-
 airflow/contrib/hooks/wasb_hook.py           |  94 +++++++++++++++
 airflow/contrib/operators/__init__.py        |   1 +
 airflow/contrib/operators/file_to_wasb.py    |  61 ++++++++++
 airflow/contrib/sensors/wasb_sensor.py       |  97 +++++++++++++++
 airflow/models.py                            |   4 +
 airflow/utils/db.py                          |   4 +
 docs/code.rst                                |   1 +
 docs/integration.rst                         |  50 ++++++++
 scripts/ci/requirements.txt                  |   1 +
 setup.py                                     |   2 +
 tests/contrib/hooks/test_wasb_hook.py        | 113 ++++++++++++++++++
 tests/contrib/operators/test_file_to_wasb.py |  88 ++++++++++++++
 tests/contrib/sensors/test_wasb_sensor.py    | 138 ++++++++++++++++++++++
 tests/core.py                                |   1 +
 15 files changed, 657 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 19fc2b4..182a49f 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -44,7 +44,8 @@ _hooks = {
     'gcp_dataflow_hook': ['DataFlowHook'],
     'spark_submit_operator': ['SparkSubmitOperator'],
     'cloudant_hook': ['CloudantHook'],
-    'fs_hook': ['FSHook']
+    'fs_hook': ['FSHook'],
+    'wasb_hook': ['WasbHook']
 }
 
 import os as _os

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/hooks/wasb_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/wasb_hook.py b/airflow/contrib/hooks/wasb_hook.py
new file mode 100644
index 0000000..89eaa5b
--- /dev/null
+++ b/airflow/contrib/hooks/wasb_hook.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from airflow.hooks.base_hook import BaseHook
+
+from azure.storage.blob import BlockBlobService
+
+
+class WasbHook(BaseHook):
+    """
+    Interacts with Azure Blob Storage through the wasb:// protocol.
+    
+    Additional options passed in the 'extra' field of the connection will be
+    passed to the `BlockBlockService()` constructor. For example, authenticate
+    using a SAS token by adding {"sas_token": "YOUR_TOKEN"}.
+    
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    """
+
+    def __init__(self, wasb_conn_id='wasb_default'):
+        self.conn_id = wasb_conn_id
+        self.connection = self.get_conn()
+
+    def get_conn(self):
+        """Return the BlockBlobService object."""
+        conn = self.get_connection(self.conn_id)
+        service_options = conn.extra_dejson
+        return BlockBlobService(account_name=conn.login,
+                                account_key=conn.password, **service_options)
+
+    def check_for_blob(self, container_name, blob_name, **kwargs):
+        """
+        Check if a blob exists on Azure Blob Storage.
+        
+        :param container_name: Name of the container.
+        :type container_name: str
+        :param blob_name: Name of the blob.
+        :type blob_name: str
+        :param kwargs: Optional keyword arguments that
+            `BlockBlobService.exists()` takes.
+        :type kwargs: object
+        :return: True if the blob exists, False otherwise.
+        :rtype bool
+        """
+        return self.connection.exists(container_name, blob_name, **kwargs)
+
+    def check_for_prefix(self, container_name, prefix, **kwargs):
+        """
+        Check if a prefix exists on Azure Blob storage.
+        
+        :param container_name: Name of the container.
+        :type container_name: str
+        :param prefix: Prefix of the blob.
+        :type prefix: str
+        :param kwargs: Optional keyword arguments that
+            `BlockBlobService.list_blobs()` takes.
+        :type kwargs: object
+        :return: True if blobs matching the prefix exist, False otherwise.
+        :rtype bool
+        """
+        matches = self.connection.list_blobs(container_name, prefix,
+                                             num_results=1, **kwargs)
+        return len(list(matches)) > 0
+
+    def load_file(self, file_path, container_name, blob_name, **kwargs):
+        """
+        Upload a file to Azure Blob Storage.
+        
+        :param file_path: Path to the file to load.
+        :type file_path: str
+        :param container_name: Name of the container.
+        :type container_name: str
+        :param blob_name: Name of the blob.
+        :type blob_name: str
+        :param kwargs: Optional keyword arguments that
+            `BlockBlobService.create_blob_from_path()` takes.
+        :type kwargs: object
+        """
+        # Reorder the argument order from airflow.hooks.S3_hook.load_file.
+        self.connection.create_blob_from_path(container_name, blob_name,
+                                              file_path, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index bef3433..4ea6c17 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -37,6 +37,7 @@ _operators = {
     'vertica_to_hive': ['VerticaToHiveTransfer'],
     'qubole_operator': ['QuboleOperator'],
     'spark_submit_operator': ['SparkSubmitOperator'],
+    'file_to_wasb': ['FileToWasbOperator'],
     'fs_operator': ['FileSensor']
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
new file mode 100644
index 0000000..32e6b29
--- /dev/null
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class FileToWasbOperator(BaseOperator):
+    """
+    Uploads a file to Azure Blob Storage.
+    
+    :param file_path: Path to the file to load.
+    :type file_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_name: Name of the blob.
+    :type blob_name: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    """
+    template_fields = ('file_path', 'container_name', 'blob_name')
+
+    @apply_defaults
+    def __init__(self, file_path, container_name, blob_name,
+                 wasb_conn_id='wasb_default', load_options=None, *args,
+                 **kwargs):
+        super(FileToWasbOperator, self).__init__(*args, **kwargs)
+        if load_options is None:
+            load_options = {}
+        self.file_path = file_path
+        self.container_name = container_name
+        self.blob_name = blob_name
+        self.wasb_conn_id = wasb_conn_id
+        self.load_options = load_options
+
+    def execute(self, context):
+        """Upload a file to Azure Blob Storage."""
+        hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+        logging.info(
+            'Uploading {self.file_path} to wasb://{self.container_name} as '
+            '{self.blob_name}'.format(**locals()))
+        hook.load_file(self.file_path, self.container_name, self.blob_name,
+                       **self.load_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
new file mode 100644
index 0000000..3f3d56c
--- /dev/null
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class WasbBlobSensor(BaseSensorOperator):
+    """
+    Waits for a blob to arrive on Azure Blob Storage.
+    
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_name: Name of the blob.
+    :type blob_name: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param check_options: Optional keyword arguments that
+        `WasbHook.check_for_blob()` takes.
+    :type check_options: dict
+    """
+
+    template_fields = ('container_name', 'blob_name')
+
+    @apply_defaults
+    def __init__(self, container_name, blob_name,
+                 wasb_conn_id='wasb_default', check_options=None, *args,
+                 **kwargs):
+        super(WasbBlobSensor, self).__init__(*args, **kwargs)
+        if check_options is None:
+            check_options = {}
+        self.wasb_conn_id = wasb_conn_id
+        self.container_name = container_name
+        self.blob_name = blob_name
+        self.check_options = check_options
+
+    def poke(self, context):
+        logging.info(
+            'Poking for blob: {self.blob_name}\n'
+            'in wasb://{self.container_name}'.format(**locals())
+        )
+        hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+        return hook.check_for_blob(self.container_name, self.blob_name,
+                                   **self.check_options)
+
+
+class WasbPrefixSensor(BaseSensorOperator):
+    """
+    Waits for blobs matching a prefix to arrive on Azure Blob Storage.
+    
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param prefix: Prefix of the blob.
+    :type prefix: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param check_options: Optional keyword arguments that
+        `WasbHook.check_for_prefix()` takes.
+    :type check_options: dict
+    """
+
+    template_fields = ('container_name', 'prefix')
+
+    @apply_defaults
+    def __init__(self, container_name, prefix, wasb_conn_id='wasb_default',
+                 check_options=None, *args, **kwargs):
+        super(WasbPrefixSensor, self).__init__(*args, **kwargs)
+        if check_options is None:
+            check_options = {}
+        self.wasb_conn_id = wasb_conn_id
+        self.container_name = container_name
+        self.prefix = prefix
+        self.check_options = check_options
+
+    def poke(self, context):
+        logging.info(
+            'Poking for prefix: {self.prefix}\n'
+            'in wasb://{self.container_name}'.format(**locals())
+        )
+        hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+        return hook.check_for_prefix(self.container_name, self.prefix,
+                                     **self.check_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7171c05..8628100 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -542,6 +542,7 @@ class Connection(Base):
         ('mesos_framework-id', 'Mesos Framework ID'),
         ('jira', 'JIRA',),
         ('redis', 'Redis',),
+        ('wasb', 'Azure Blob Storage'),
     ]
 
     def __init__(
@@ -674,6 +675,9 @@ class Connection(Base):
             elif self.conn_type == 'redis':
                 from airflow.contrib.hooks.redis_hook import RedisHook
                 return RedisHook(redis_conn_id=self.conn_id)
+            elif self.conn_type == 'wasb':
+                from airflow.contrib.hooks.wasb_hook import WasbHook
+                return WasbHook(wasb_conn_id=self.conn_id)
         except:
             pass
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index a619a41..7da9217 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -171,6 +171,10 @@ def initdb():
             host='localhost', port=5433))
     merge_conn(
         models.Connection(
+            conn_id='wasb_default', conn_type='wasb',
+            extra='{"sas_token": null}'))
+    merge_conn(
+        models.Connection(
             conn_id='webhdfs_default', conn_type='hdfs',
             host='localhost', port=50070))
     merge_conn(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index fabe6db..683e85f 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -98,6 +98,7 @@ Community-contributed Operators
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
 .. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
 .. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator
+.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
 .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
 .. autoclass:: airflow.contrib.operators.QuboleOperator
 .. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 10bc038..4a6b676 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -1,9 +1,59 @@
 Integration
 ===========
 
+- :ref:`Azure`
 - :ref:`AWS`
 - :ref:`GCP`
 
+.. _Azure:
+
+Azure: Microsoft Azure
+----------------------
+
+Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob
+Storage. Note that the Hook, Sensor and Operator are in the contrib section.
+
+Azure Blob Storage
+''''''''''''''''''
+
+All classes communicate via the Window Azure Storage Blob protocol. Make sure that a
+Airflow connection of type `wasb` exists. Authorization can be done by supplying a
+login (=Storage account name) and password (=KEY), or login and SAS token in the extra
+field (see connection `wasb_default` for an example).
+
+- :ref:`WasbBlobSensor`: Checks if a blob is present on Azure Blob storage.
+- :ref:`WasbPrefixSensor`: Checks if blobs matching a prefix are present on Azure Blob storage.
+- :ref:`FileToWasbOperator`: Uploads a local file to a container as a blob.
+- :ref:`WasbHook`: Interface with Azure Blob Storage.
+
+.. _WasbBlobSensor:
+
+WasbBlobSensor
+"""""""""""""""
+
+.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
+
+.. _WasbPrefixSensor:
+
+WasbPrefixSensor
+"""""""""""""""""
+
+.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor
+
+.. _FileToWasbOperator:
+
+FileToWasbOperator
+"""""""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
+
+.. _WasbHook:
+
+WasbHook
+"""""""""
+
+.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
+
 .. _AWS:
 
 AWS: Amazon Webservices

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index d206f16..1905398 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -1,4 +1,5 @@
 alembic
+azure-storage>=0.34.0
 bcrypt
 bleach
 boto

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index a582499..ea60dca 100644
--- a/setup.py
+++ b/setup.py
@@ -104,6 +104,7 @@ async = [
     'eventlet>= 0.9.7',
     'gevent>=0.13'
 ]
+azure = ['azure-storage>=0.34.0']
 celery = [
     'celery>=3.1.17',
     'flower>=0.7.3'
@@ -237,6 +238,7 @@ def do_setup():
             'all': devel_all,
             'all_dbs': all_dbs,
             'async': async,
+            'azure': azure,
             'celery': celery,
             'cgroups': cgroups,
             'cloudant': cloudant,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/hooks/test_wasb_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_wasb_hook.py b/tests/contrib/hooks/test_wasb_hook.py
new file mode 100644
index 0000000..aa92937
--- /dev/null
+++ b/tests/contrib/hooks/test_wasb_hook.py
@@ -0,0 +1,113 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import unittest
+
+from airflow import configuration
+from airflow import models
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.utils import db
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestWasbHook(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='wasb_test_key', conn_type='wasb',
+                login='login', password='key'
+            )
+        )
+        db.merge_conn(
+            models.Connection(
+                conn_id='wasb_test_sas_token', conn_type='wasb',
+                login='login', extra=json.dumps({'sas_token': 'token'})
+            )
+        )
+
+    def test_key(self):
+        from azure.storage.blob import BlockBlobService
+        hook = WasbHook(wasb_conn_id='wasb_test_key')
+        self.assertEqual(hook.conn_id, 'wasb_test_key')
+        self.assertIsInstance(hook.connection, BlockBlobService)
+
+    def test_sas_token(self):
+        from azure.storage.blob import BlockBlobService
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertEqual(hook.conn_id, 'wasb_test_sas_token')
+        self.assertIsInstance(hook.connection, BlockBlobService)
+
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_check_for_blob(self, mock_service):
+        mock_instance = mock_service.return_value
+        mock_instance.exists.return_value = True
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertTrue(hook.check_for_blob('container', 'blob', timeout=3))
+        mock_instance.exists.assert_called_once_with(
+            'container', 'blob', timeout=3
+        )
+
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_check_for_blob_empty(self, mock_service):
+        mock_service.return_value.exists.return_value = False
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertFalse(hook.check_for_blob('container', 'blob'))
+
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_check_for_prefix(self, mock_service):
+        mock_instance = mock_service.return_value
+        mock_instance.list_blobs.return_value = iter(['blob_1'])
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertTrue(hook.check_for_prefix('container', 'prefix',
+                                              timeout=3))
+        mock_instance.list_blobs.assert_called_once_with(
+            'container', 'prefix', timeout=3
+        )
+
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_check_for_prefix_empty(self, mock_service):
+        mock_instance = mock_service.return_value
+        mock_instance.list_blobs.return_value = iter([])
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        self.assertFalse(hook.check_for_prefix('container', 'prefix'))
+
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_check_for_prefix(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        hook.load_file('path', 'container', 'blob', max_connections=1)
+        mock_instance.create_blob_from_path.assert_called_once_with(
+            'container', 'blob', 'path', max_connections=1
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/operators/test_file_to_wasb.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_file_to_wasb.py b/tests/contrib/operators/test_file_to_wasb.py
new file mode 100644
index 0000000..bdaeb79
--- /dev/null
+++ b/tests/contrib/operators/test_file_to_wasb.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.file_to_wasb import FileToWasbOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestFileToWasbOperator(unittest.TestCase):
+
+    _config = {
+        'file_path': 'file',
+        'container_name': 'container',
+        'blob_name': 'blob',
+        'wasb_conn_id': 'wasb_default',
+        'retries': 3,
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_init(self):
+        operator = FileToWasbOperator(
+            task_id='wasb_operator',
+            dag=self.dag,
+            **self._config
+        )
+        self.assertEqual(operator.file_path, self._config['file_path'])
+        self.assertEqual(operator.container_name,
+                         self._config['container_name'])
+        self.assertEqual(operator.blob_name, self._config['blob_name'])
+        self.assertEqual(operator.wasb_conn_id, self._config['wasb_conn_id'])
+        self.assertEqual(operator.load_options, {})
+        self.assertEqual(operator.retries, self._config['retries'])
+
+        operator = FileToWasbOperator(
+            task_id='wasb_operator',
+            dag=self.dag,
+            load_options={'timeout': 2},
+            **self._config
+        )
+        self.assertEqual(operator.load_options, {'timeout': 2})
+
+    @mock.patch('airflow.contrib.operators.file_to_wasb.WasbHook',
+                autospec=True)
+    def test_execute(self, mock_hook):
+        mock_instance = mock_hook.return_value
+        operator = FileToWasbOperator(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            load_options={'timeout': 2},
+            **self._config
+        )
+        operator.execute(None)
+        mock_instance.load_file.assert_called_once_with(
+            'file', 'container', 'blob', timeout=2
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/contrib/sensors/test_wasb_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_wasb_sensor.py b/tests/contrib/sensors/test_wasb_sensor.py
new file mode 100644
index 0000000..a26ba2d
--- /dev/null
+++ b/tests/contrib/sensors/test_wasb_sensor.py
@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.sensors.wasb_sensor import WasbBlobSensor
+from airflow.contrib.sensors.wasb_sensor import WasbPrefixSensor
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestWasbBlobSensor(unittest.TestCase):
+
+    _config = {
+        'container_name': 'container',
+        'blob_name': 'blob',
+        'wasb_conn_id': 'conn_id',
+        'timeout': 100,
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_init(self):
+        sensor = WasbBlobSensor(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            **self._config
+        )
+        self.assertEqual(sensor.container_name, self._config['container_name'])
+        self.assertEqual(sensor.blob_name, self._config['blob_name'])
+        self.assertEqual(sensor.wasb_conn_id, self._config['wasb_conn_id'])
+        self.assertEqual(sensor.check_options, {})
+        self.assertEqual(sensor.timeout, self._config['timeout'])
+
+        sensor = WasbBlobSensor(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            check_options={'timeout': 2},
+            **self._config
+        )
+        self.assertEqual(sensor.check_options, {'timeout': 2})
+
+    @mock.patch('airflow.contrib.sensors.wasb_sensor.WasbHook',
+                autospec=True)
+    def test_poke(self, mock_hook):
+        mock_instance = mock_hook.return_value
+        sensor = WasbBlobSensor(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            check_options={'timeout': 2},
+            **self._config
+        )
+        sensor.poke(None)
+        mock_instance.check_for_blob.assert_called_once_with(
+            'container', 'blob', timeout=2
+        )
+
+
+class TestWasbPrefixSensor(unittest.TestCase):
+
+    _config = {
+        'container_name': 'container',
+        'prefix': 'prefix',
+        'wasb_conn_id': 'conn_id',
+        'timeout': 100,
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_init(self):
+        sensor = WasbPrefixSensor(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            **self._config
+        )
+        self.assertEqual(sensor.container_name, self._config['container_name'])
+        self.assertEqual(sensor.prefix, self._config['prefix'])
+        self.assertEqual(sensor.wasb_conn_id, self._config['wasb_conn_id'])
+        self.assertEqual(sensor.check_options, {})
+        self.assertEqual(sensor.timeout, self._config['timeout'])
+
+        sensor = WasbPrefixSensor(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            check_options={'timeout': 2},
+            **self._config
+        )
+        self.assertEqual(sensor.check_options, {'timeout': 2})
+
+    @mock.patch('airflow.contrib.sensors.wasb_sensor.WasbHook',
+                autospec=True)
+    def test_poke(self, mock_hook):
+        mock_instance = mock_hook.return_value
+        sensor = WasbPrefixSensor(
+            task_id='wasb_sensor',
+            dag=self.dag,
+            check_options={'timeout': 2},
+            **self._config
+        )
+        sensor.poke(None)
+        mock_instance.check_for_prefix.assert_called_once_with(
+            'container', 'prefix', timeout=2
+        )
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1bc5f38/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 8b3d1b8..a6bf613 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1111,6 +1111,7 @@ class CliTests(unittest.TestCase):
         self.assertIn(['mssql_default', 'mssql'], conns)
         self.assertIn(['mysql_default', 'mysql'], conns)
         self.assertIn(['postgres_default', 'postgres'], conns)
+        self.assertIn(['wasb_default', 'wasb'], conns)
 
         # Attempt to list connections with invalid cli args
         with mock.patch('sys.stdout',