You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/07 02:14:33 UTC

[GitHub] [beam] AldairCoronel opened a new pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

AldairCoronel opened a new pull request #12492:
URL: https://github.com/apache/beam/pull/12492


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r468877314



##########
File path: sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
##########
@@ -0,0 +1,315 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for Azure Blob Storage File System."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+
+# patches unittest.TestCase to be python3 compatible.
+import future.tests.base  # pylint: disable=unused-import
+import mock
+
+from apache_beam.io.azure import blobstorageio

Review comment:
       the import error occurs here, so you should move this import to line 40




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel removed a comment on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel removed a comment on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680433635


   @pabloem Let's see:
   
   - Authentication. At the moment the only way to authenticate is with a connection string obtained from environment variables. (The code is here: 
   
   - **Integration tests with Azurite.** Integration tests with Azurite are practically ready. The only thing left is to define a function in `build.gradle` that runs and stops Azurite. (You can find the branch here: [https://github.com/AldairCoronel/beam/commits/azurite](url)).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-671609611


   Some formatting complaints - https://ci-beam.apache.org/job/beam_PreCommit_PythonFormatter_Commit/3163/console - you can run `tox -e py3-yapf` to fix them automatically.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680434016


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=h1) Report
   > Merging [#12492](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e164d170eb6b5ec1dddd99f09e79dfb0147b84ae?el=desc) will **decrease** coverage by `0.18%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12492/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12492      +/-   ##
   ==========================================
   - Coverage   34.47%   34.28%   -0.19%     
   ==========================================
     Files         684      699      +15     
     Lines       81483    82775    +1292     
     Branches     9180     9361     +181     
   ==========================================
   + Hits        28090    28382     +292     
   - Misses      52972    53970     +998     
   - Partials      421      423       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [typehints/typecheck\_test\_py3.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVja190ZXN0X3B5My5weQ==) | `31.54% <0.00%> (-16.00%)` | :arrow_down: |
   | [typehints/typecheck.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVjay5weQ==) | `29.44% <0.00%> (-6.18%)` | :arrow_down: |
   | [utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `30.95% <0.00%> (-2.39%)` | :arrow_down: |
   | [runners/worker/opcounters.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cnVubmVycy93b3JrZXIvb3Bjb3VudGVycy5weQ==) | `33.81% <0.00%> (-0.87%)` | :arrow_down: |
   | [pipeline.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cGlwZWxpbmUucHk=) | `22.04% <0.00%> (-0.28%)` | :arrow_down: |
   | [dataframe/transforms\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-ZGF0YWZyYW1lL3RyYW5zZm9ybXNfdGVzdC5weQ==) | `25.00% <0.00%> (-0.21%)` | :arrow_down: |
   | [io/gcp/bigquery\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZ2NwL2JpZ3F1ZXJ5X3Rlc3QucHk=) | `27.39% <0.00%> (-0.18%)` | :arrow_down: |
   | [io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZmlsZXN5c3RlbXMucHk=) | `55.00% <0.00%> (-0.18%)` | :arrow_down: |
   | [options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-b3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `52.99% <0.00%> (-0.16%)` | :arrow_down: |
   | [transforms/ptransform\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHJhbnNmb3Jtcy9wdHJhbnNmb3JtX3Rlc3QucHk=) | `18.37% <0.00%> (-0.09%)` | :arrow_down: |
   | ... and [37 more](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=footer). Last update [086b985...4c5ab4c](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r477004099



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+    This can handle directory or file paths.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    directories, blobs = [], []
+
+    # Retrieve directories and not directories.
+    for path in paths:
+      if path.endswith('/'):
+        directories.append(path)
+      else:
+        blobs.append(path)
+
+    results = {}
+
+    for directory in directories:
+      directory_result = dict(self.delete_tree(directory))
+      results.update(directory_result)
+
+    blobs_results = dict(self.delete_files(blobs))
+    results.update(blobs_results)
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_tree(self, root):
+    """Deletes all blobs under the given Azure BlobStorage virtual
+    directory.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name]
+            (ending with a "/").
+
+    Returns:
+      List of tuples of (path, exception), where each path is a blob
+      under the given root. exception is 202 if the operation succeeded
+      or the relevant exception if the operation failed.
+    """
+    assert root.endswith('/')
+
+    # Get the blob under the root directory.
+    paths_to_delete = self.list_prefix(root)
+
+    return self.delete_files(paths_to_delete)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_files(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not paths:
+      return []
+
+    # Group blobs into containers.
+    containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \
+        for path in paths])
+
+    grouped_blobs = {container: [] for container in containers}
+
+    # Fill dictionary.
+    for container, blob in zip(containers, blobs):
+      grouped_blobs[container].append(blob)
+
+    results = {}
+
+    # Delete minibatches of blobs for each container.
+    for container, blobs in grouped_blobs.items():
+      for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE):
+        blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE]
+        results.update(self._delete_batch(container, blobs_to_delete))
+
+    final_results = \
+        [(path, results[parse_azfs_path(path, get_account=False)]) \
+        for path in paths]
+
+    return final_results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _delete_batch(self, container, blobs):
+    """A helper method. Azure Blob Storage Python Client allows batch
+    deletions for blobs within the same container.
+
+    Args:
+      container: container name.
+      blobs: list of blobs to be deleted.
+
+    Returns:
+      Dictionary of the form {(container, blob): error}, where error is
+      202 if the operation succeeded.
+    """
+    container_client = self.client.get_container_client(container)
+    results = {}
+
+    try:
+      response = container_client.delete_blobs(
+          *blobs, raise_on_any_failure=False)
+
+      for blob, error in zip(blobs, response):
+        results[(container, blob)] = error.status_code
+
+    except BlobStorageError as e:
+      for blob in blobs:
+        results[(container, blob)] = e.message
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def list_prefix(self, path):
+    """Lists files matching the prefix.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      Dictionary of file name -> size.
+    """
+    storage_account, container, blob = parse_azfs_path(
+        path, blob_optional=True, get_account=True)
+    file_sizes = {}
+    counter = 0
+    start_time = time.time()
+
+    logging.info("Starting the size estimation of the input")
+    container_client = self.client.get_container_client(container)
+
+    while True:
+      response = container_client.list_blobs(name_starts_with=blob)
+      for item in response:
+        file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
+        file_sizes[file_name] = item.size
+        counter += 1
+        if counter % 10000 == 0:
+          logging.info("Finished computing size of: %s files", len(file_sizes))
+      break
+
+    logging.info(
+        "Finished listing %s files in %s seconds.",
+        counter,
+        time.time() - start_time)
+    return file_sizes
+
+
+class BlobStorageDownloader(Downloader):
+  def __init__(self, client, path, buffer_size):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._buffer_size = buffer_size
+
+    self._blob_to_download = self._client.get_blob_client(
+        self._container, self._blob)
+
+    try:
+      properties = self._get_object_properties()
+    except ResourceNotFoundError as http_error:
+      if http_error.status_code == 404:
+        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
+      else:
+        _LOGGER.error(
+            'HTTP error while requesting file %s: %s', self._path, http_error)
+        raise
+
+    self._size = properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _get_object_properties(self):
+    return self._blob_to_download.get_blob_properties()
+
+  @property
+  def size(self):
+    return self._size
+
+  def get_range(self, start, end):
+    # Download_blob first parameter is offset and second is length (exclusive).
+    blob_data = self._blob_to_download.download_blob(start, end - start)
+    # Returns the content as bytes.
+    return blob_data.readall()
+
+
+class BlobStorageUploader(Uploader):
+  def __init__(self, client, path, mime_type='application/octet-stream'):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._content_settings = ContentSettings(mime_type)
+
+    self._blob_to_upload = self._client.get_blob_client(
+        self._container, self._blob)
+
+    # Temporary file.
+    self._temporary_file = tempfile.NamedTemporaryFile()
+
+  def put(self, data):
+    self._temporary_file.write(data.tobytes())
+
+  def finish(self):
+    self._temporary_file.seek(0)
+    # The temporary file is deleted immediately after the operation.
+    with open(self._temporary_file.name, "rb") as f:
+      self._blob_to_upload.upload_blob(
+          f.read(), overwrite=True, content_settings=self._content_settings)

Review comment:
       @pabloem What happens when we're trying to upload a large file?
   
   Azure complains when you try to upload large files although in the documentation it states: ***Calls to write a blob, write a block, or write a page are permitted 10 minutes per megabyte to complete. If an operation is taking longer than 10 minutes per megabyte on average, it will time out.***
    
   Refer to this issue as well: [https://github.com/Azure/azure-sdk-for-python/issues/12166](url)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-671501004


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] epicfaace commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
epicfaace commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r477285941



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):

Review comment:
       Interesting. To be clear, I think using `delete_blobs` would be ideal, since we would only require a single batch request, rather than having to call `delete_blob` over and over again (which is just a workaround for the error I mentioned above). If it's not supported by Azurite, though, it might be fine to just change it to use the `delete_blob` workaround.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12492:
URL: https://github.com/apache/beam/pull/12492


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680434016


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=h1) Report
   > Merging [#12492](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e164d170eb6b5ec1dddd99f09e79dfb0147b84ae?el=desc) will **decrease** coverage by `0.18%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12492/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12492      +/-   ##
   ==========================================
   - Coverage   34.47%   34.28%   -0.19%     
   ==========================================
     Files         684      699      +15     
     Lines       81483    82775    +1292     
     Branches     9180     9361     +181     
   ==========================================
   + Hits        28090    28382     +292     
   - Misses      52972    53970     +998     
   - Partials      421      423       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [typehints/typecheck\_test\_py3.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVja190ZXN0X3B5My5weQ==) | `31.54% <0.00%> (-16.00%)` | :arrow_down: |
   | [typehints/typecheck.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVjay5weQ==) | `29.44% <0.00%> (-6.18%)` | :arrow_down: |
   | [utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `30.95% <0.00%> (-2.39%)` | :arrow_down: |
   | [runners/worker/opcounters.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cnVubmVycy93b3JrZXIvb3Bjb3VudGVycy5weQ==) | `33.81% <0.00%> (-0.87%)` | :arrow_down: |
   | [pipeline.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cGlwZWxpbmUucHk=) | `22.04% <0.00%> (-0.28%)` | :arrow_down: |
   | [dataframe/transforms\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-ZGF0YWZyYW1lL3RyYW5zZm9ybXNfdGVzdC5weQ==) | `25.00% <0.00%> (-0.21%)` | :arrow_down: |
   | [io/gcp/bigquery\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZ2NwL2JpZ3F1ZXJ5X3Rlc3QucHk=) | `27.39% <0.00%> (-0.18%)` | :arrow_down: |
   | [io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZmlsZXN5c3RlbXMucHk=) | `55.00% <0.00%> (-0.18%)` | :arrow_down: |
   | [options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-b3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `52.99% <0.00%> (-0.16%)` | :arrow_down: |
   | [transforms/ptransform\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHJhbnNmb3Jtcy9wdHJhbnNmb3JtX3Rlc3QucHk=) | `18.37% <0.00%> (-0.09%)` | :arrow_down: |
   | ... and [37 more](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=footer). Last update [086b985...4c5ab4c](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r468878930



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio_test.py
##########
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for Azure Blob Storage client.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+
+from apache_beam.io.azure import blobstorageio

Review comment:
       there's also an import error happening here. you need to catch it and skip the test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r477619250



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+    This can handle directory or file paths.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    directories, blobs = [], []
+
+    # Retrieve directories and not directories.
+    for path in paths:
+      if path.endswith('/'):
+        directories.append(path)
+      else:
+        blobs.append(path)
+
+    results = {}
+
+    for directory in directories:
+      directory_result = dict(self.delete_tree(directory))
+      results.update(directory_result)
+
+    blobs_results = dict(self.delete_files(blobs))
+    results.update(blobs_results)
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_tree(self, root):
+    """Deletes all blobs under the given Azure BlobStorage virtual
+    directory.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name]
+            (ending with a "/").
+
+    Returns:
+      List of tuples of (path, exception), where each path is a blob
+      under the given root. exception is 202 if the operation succeeded
+      or the relevant exception if the operation failed.
+    """
+    assert root.endswith('/')
+
+    # Get the blob under the root directory.
+    paths_to_delete = self.list_prefix(root)
+
+    return self.delete_files(paths_to_delete)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_files(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not paths:
+      return []
+
+    # Group blobs into containers.
+    containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \
+        for path in paths])
+
+    grouped_blobs = {container: [] for container in containers}
+
+    # Fill dictionary.
+    for container, blob in zip(containers, blobs):
+      grouped_blobs[container].append(blob)
+
+    results = {}
+
+    # Delete minibatches of blobs for each container.
+    for container, blobs in grouped_blobs.items():
+      for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE):
+        blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE]
+        results.update(self._delete_batch(container, blobs_to_delete))
+
+    final_results = \
+        [(path, results[parse_azfs_path(path, get_account=False)]) \
+        for path in paths]
+
+    return final_results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _delete_batch(self, container, blobs):
+    """A helper method. Azure Blob Storage Python Client allows batch
+    deletions for blobs within the same container.
+
+    Args:
+      container: container name.
+      blobs: list of blobs to be deleted.
+
+    Returns:
+      Dictionary of the form {(container, blob): error}, where error is
+      202 if the operation succeeded.
+    """
+    container_client = self.client.get_container_client(container)
+    results = {}
+
+    try:
+      response = container_client.delete_blobs(
+          *blobs, raise_on_any_failure=False)
+
+      for blob, error in zip(blobs, response):
+        results[(container, blob)] = error.status_code
+
+    except BlobStorageError as e:
+      for blob in blobs:
+        results[(container, blob)] = e.message
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def list_prefix(self, path):
+    """Lists files matching the prefix.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      Dictionary of file name -> size.
+    """
+    storage_account, container, blob = parse_azfs_path(
+        path, blob_optional=True, get_account=True)
+    file_sizes = {}
+    counter = 0
+    start_time = time.time()
+
+    logging.info("Starting the size estimation of the input")
+    container_client = self.client.get_container_client(container)
+
+    while True:
+      response = container_client.list_blobs(name_starts_with=blob)
+      for item in response:
+        file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
+        file_sizes[file_name] = item.size
+        counter += 1
+        if counter % 10000 == 0:
+          logging.info("Finished computing size of: %s files", len(file_sizes))
+      break
+
+    logging.info(
+        "Finished listing %s files in %s seconds.",
+        counter,
+        time.time() - start_time)
+    return file_sizes
+
+
+class BlobStorageDownloader(Downloader):
+  def __init__(self, client, path, buffer_size):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._buffer_size = buffer_size
+
+    self._blob_to_download = self._client.get_blob_client(
+        self._container, self._blob)
+
+    try:
+      properties = self._get_object_properties()
+    except ResourceNotFoundError as http_error:
+      if http_error.status_code == 404:
+        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
+      else:
+        _LOGGER.error(
+            'HTTP error while requesting file %s: %s', self._path, http_error)
+        raise
+
+    self._size = properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _get_object_properties(self):
+    return self._blob_to_download.get_blob_properties()
+
+  @property
+  def size(self):
+    return self._size
+
+  def get_range(self, start, end):
+    # Download_blob first parameter is offset and second is length (exclusive).
+    blob_data = self._blob_to_download.download_blob(start, end - start)
+    # Returns the content as bytes.
+    return blob_data.readall()
+
+
+class BlobStorageUploader(Uploader):
+  def __init__(self, client, path, mime_type='application/octet-stream'):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._content_settings = ContentSettings(mime_type)
+
+    self._blob_to_upload = self._client.get_blob_client(
+        self._container, self._blob)
+
+    # Temporary file.
+    self._temporary_file = tempfile.NamedTemporaryFile()
+
+  def put(self, data):
+    self._temporary_file.write(data.tobytes())
+
+  def finish(self):
+    self._temporary_file.seek(0)
+    # The temporary file is deleted immediately after the operation.
+    with open(self._temporary_file.name, "rb") as f:
+      self._blob_to_upload.upload_blob(
+          f.read(), overwrite=True, content_settings=self._content_settings)

Review comment:
       > Authentication. At the moment the only way to authenticate is with a connection string obtained from environment variables.
   
   Okay this is not acceptable. We need to enable authentication via pipeline options as we already discussed privately. This PR is ready to go, but we need to enable pipelineoptions-based authentication in a follow up, okay?
   
   Also, please address comments by @epicfaace to catch `PartialBatchErrorException`, and then we can move forward to merge this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tanya-borisova commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
tanya-borisova commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-692624622


   This is some great functionality in this PR, is it expected to be in a release soon?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-670282714


   R: @pabloem 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel closed pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel closed pull request #12492:
URL: https://github.com/apache/beam/pull/12492


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-672273518


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel removed a comment on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel removed a comment on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-673697021


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] epicfaace commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
epicfaace commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r476774438



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):

Review comment:
       @AldairCoronel not sure if you've faced this issue when testing yourself, but when I tried using this code in my own project, I ran into this error: https://github.com/Azure/azure-sdk-for-python/issues/13183
   
   I had to work around it by calling `delete_blob` instead of `delete_blobs`: https://github.com/codalab/codalab-worksheets/pull/2769/commits/1e3dd3046e6435185ae09ec9903b74b163b7f143.
   
   Not sure if you faced a similar issue, but adding this here in case it's helpful.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-682293505


   Run Spotless PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680434016


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=h1) Report
   > Merging [#12492](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e164d170eb6b5ec1dddd99f09e79dfb0147b84ae?el=desc) will **decrease** coverage by `0.18%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12492/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12492      +/-   ##
   ==========================================
   - Coverage   34.47%   34.28%   -0.19%     
   ==========================================
     Files         684      699      +15     
     Lines       81483    82775    +1292     
     Branches     9180     9361     +181     
   ==========================================
   + Hits        28090    28382     +292     
   - Misses      52972    53970     +998     
   - Partials      421      423       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [typehints/typecheck\_test\_py3.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVja190ZXN0X3B5My5weQ==) | `31.54% <0.00%> (-16.00%)` | :arrow_down: |
   | [typehints/typecheck.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVjay5weQ==) | `29.44% <0.00%> (-6.18%)` | :arrow_down: |
   | [utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `30.95% <0.00%> (-2.39%)` | :arrow_down: |
   | [runners/worker/opcounters.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cnVubmVycy93b3JrZXIvb3Bjb3VudGVycy5weQ==) | `33.81% <0.00%> (-0.87%)` | :arrow_down: |
   | [pipeline.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cGlwZWxpbmUucHk=) | `22.04% <0.00%> (-0.28%)` | :arrow_down: |
   | [dataframe/transforms\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-ZGF0YWZyYW1lL3RyYW5zZm9ybXNfdGVzdC5weQ==) | `25.00% <0.00%> (-0.21%)` | :arrow_down: |
   | [io/gcp/bigquery\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZ2NwL2JpZ3F1ZXJ5X3Rlc3QucHk=) | `27.39% <0.00%> (-0.18%)` | :arrow_down: |
   | [io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZmlsZXN5c3RlbXMucHk=) | `55.00% <0.00%> (-0.18%)` | :arrow_down: |
   | [options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-b3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `52.99% <0.00%> (-0.16%)` | :arrow_down: |
   | [transforms/ptransform\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHJhbnNmb3Jtcy9wdHJhbnNmb3JtX3Rlc3QucHk=) | `18.37% <0.00%> (-0.09%)` | :arrow_down: |
   | ... and [37 more](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=footer). Last update [086b985...4c5ab4c](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] epicfaace commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
epicfaace commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r476776994



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+    This can handle directory or file paths.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    directories, blobs = [], []
+
+    # Retrieve directories and not directories.
+    for path in paths:
+      if path.endswith('/'):
+        directories.append(path)
+      else:
+        blobs.append(path)
+
+    results = {}
+
+    for directory in directories:
+      directory_result = dict(self.delete_tree(directory))
+      results.update(directory_result)
+
+    blobs_results = dict(self.delete_files(blobs))
+    results.update(blobs_results)
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_tree(self, root):
+    """Deletes all blobs under the given Azure BlobStorage virtual
+    directory.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name]
+            (ending with a "/").
+
+    Returns:
+      List of tuples of (path, exception), where each path is a blob
+      under the given root. exception is 202 if the operation succeeded
+      or the relevant exception if the operation failed.
+    """
+    assert root.endswith('/')
+
+    # Get the blob under the root directory.
+    paths_to_delete = self.list_prefix(root)
+
+    return self.delete_files(paths_to_delete)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_files(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not paths:
+      return []
+
+    # Group blobs into containers.
+    containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \
+        for path in paths])
+
+    grouped_blobs = {container: [] for container in containers}
+
+    # Fill dictionary.
+    for container, blob in zip(containers, blobs):
+      grouped_blobs[container].append(blob)
+
+    results = {}
+
+    # Delete minibatches of blobs for each container.
+    for container, blobs in grouped_blobs.items():
+      for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE):
+        blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE]
+        results.update(self._delete_batch(container, blobs_to_delete))
+
+    final_results = \
+        [(path, results[parse_azfs_path(path, get_account=False)]) \
+        for path in paths]
+
+    return final_results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _delete_batch(self, container, blobs):
+    """A helper method. Azure Blob Storage Python Client allows batch
+    deletions for blobs within the same container.
+
+    Args:
+      container: container name.
+      blobs: list of blobs to be deleted.
+
+    Returns:
+      Dictionary of the form {(container, blob): error}, where error is
+      202 if the operation succeeded.
+    """
+    container_client = self.client.get_container_client(container)
+    results = {}
+
+    try:
+      response = container_client.delete_blobs(
+          *blobs, raise_on_any_failure=False)
+
+      for blob, error in zip(blobs, response):
+        results[(container, blob)] = error.status_code
+
+    except BlobStorageError as e:

Review comment:
       I think you should handle both `BlobStorageError` and `PartialBatchErrorException` on all blob storage operations (PartialBatchErrorException is raised in, for example, https://github.com/Azure/azure-sdk-for-python/issues/13183) -- otherwise, what ends up happening is that only the status code from PartialBatchErrorException is retrieved, but the message is silenced and not logged at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680434016


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=h1) Report
   > Merging [#12492](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e164d170eb6b5ec1dddd99f09e79dfb0147b84ae?el=desc) will **decrease** coverage by `0.18%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12492/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12492      +/-   ##
   ==========================================
   - Coverage   34.47%   34.28%   -0.19%     
   ==========================================
     Files         684      699      +15     
     Lines       81483    82775    +1292     
     Branches     9180     9361     +181     
   ==========================================
   + Hits        28090    28382     +292     
   - Misses      52972    53970     +998     
   - Partials      421      423       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [typehints/typecheck\_test\_py3.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVja190ZXN0X3B5My5weQ==) | `31.54% <0.00%> (-16.00%)` | :arrow_down: |
   | [typehints/typecheck.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVjay5weQ==) | `29.44% <0.00%> (-6.18%)` | :arrow_down: |
   | [utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `30.95% <0.00%> (-2.39%)` | :arrow_down: |
   | [runners/worker/opcounters.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cnVubmVycy93b3JrZXIvb3Bjb3VudGVycy5weQ==) | `33.81% <0.00%> (-0.87%)` | :arrow_down: |
   | [pipeline.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-cGlwZWxpbmUucHk=) | `22.04% <0.00%> (-0.28%)` | :arrow_down: |
   | [dataframe/transforms\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-ZGF0YWZyYW1lL3RyYW5zZm9ybXNfdGVzdC5weQ==) | `25.00% <0.00%> (-0.21%)` | :arrow_down: |
   | [io/gcp/bigquery\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZ2NwL2JpZ3F1ZXJ5X3Rlc3QucHk=) | `27.39% <0.00%> (-0.18%)` | :arrow_down: |
   | [io/filesystems.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-aW8vZmlsZXN5c3RlbXMucHk=) | `55.00% <0.00%> (-0.18%)` | :arrow_down: |
   | [options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-b3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `52.99% <0.00%> (-0.16%)` | :arrow_down: |
   | [transforms/ptransform\_test.py](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree#diff-dHJhbnNmb3Jtcy9wdHJhbnNmb3JtX3Rlc3QucHk=) | `18.37% <0.00%> (-0.09%)` | :arrow_down: |
   | ... and [37 more](https://codecov.io/gh/apache/beam/pull/12492/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=footer). Last update [086b985...3884527](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680434016


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12492?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request head (`BEAM-6807@4c5ab4c`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-head-commit).
   > The diff coverage is `n/a`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] epicfaace commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
epicfaace commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r478798927



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):

Review comment:
       FYI, it appears that Microsoft might have fixed the `delete_blobs` issue: https://github.com/Azure/azure-sdk-for-python/issues/13183




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-671610791


   https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14420/#showFailuresLink - the dependency is missing because you did not add it to be installed. You can add the aazure dependency in BeamModulePlugin.groovy and tox.ini like in this PR: https://github.com/apache/beam/pull/11149/files
   
   And skip the tests whenever the dependency is missing, like here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/aws/s3filesystem_test.py#L37-L44


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-682300724


   Thanks @AldairCoronel ! To conclude:
   - Let's figure out the authentication story via pipeline options
   - Let's set up integraiton tests using Azurite


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-693640293


   @tanya-borisova This change should be included in the next Beam release (2.25.0), which will begin a week from today, and will probably be finalized some weeks after.
   
   https://beam.apache.org/contribute/#when-will-my-change-show-up-in-an-apache-beam-release


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680283866


   als9o, fwiw, sorry about the delay in reviewing this : (


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r476976566



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):

Review comment:
       @epicfaace It did not give me problems when testing with my Azure account. The only drawback was when testing with Azurite (emulator) because `delete_blobs` is not implemented yet.
   
   I will make the changes from `delete_blobs` to `delete_blob` in another PR when I add the tests with Azurite.
   
   **Thank you very much!**




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-671501174


   (the previous comment was to start running automated tests)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-673697021


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-671609374


   https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Commit/5464/console - trailing whitespace in blobstorageio ; )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-680433635


   @pabloem Let's see:
   
   - Authentication. At the moment the only way to authenticate is with a connection string obtained from environment variables. (The code is here: 
   
   - **Integration tests with Azurite.** Integration tests with Azurite are practically ready. The only thing left is to define a function in `build.gradle` that runs and stops Azurite. (You can find the branch here: [https://github.com/AldairCoronel/beam/commits/azurite](url)).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r476759950



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+    This can handle directory or file paths.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    directories, blobs = [], []
+
+    # Retrieve directories and not directories.
+    for path in paths:
+      if path.endswith('/'):
+        directories.append(path)
+      else:
+        blobs.append(path)
+
+    results = {}
+
+    for directory in directories:
+      directory_result = dict(self.delete_tree(directory))
+      results.update(directory_result)
+
+    blobs_results = dict(self.delete_files(blobs))
+    results.update(blobs_results)
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_tree(self, root):
+    """Deletes all blobs under the given Azure BlobStorage virtual
+    directory.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name]
+            (ending with a "/").
+
+    Returns:
+      List of tuples of (path, exception), where each path is a blob
+      under the given root. exception is 202 if the operation succeeded
+      or the relevant exception if the operation failed.
+    """
+    assert root.endswith('/')
+
+    # Get the blob under the root directory.
+    paths_to_delete = self.list_prefix(root)
+
+    return self.delete_files(paths_to_delete)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_files(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not paths:
+      return []
+
+    # Group blobs into containers.
+    containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \
+        for path in paths])
+
+    grouped_blobs = {container: [] for container in containers}
+
+    # Fill dictionary.
+    for container, blob in zip(containers, blobs):
+      grouped_blobs[container].append(blob)
+
+    results = {}
+
+    # Delete minibatches of blobs for each container.
+    for container, blobs in grouped_blobs.items():
+      for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE):
+        blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE]
+        results.update(self._delete_batch(container, blobs_to_delete))
+
+    final_results = \
+        [(path, results[parse_azfs_path(path, get_account=False)]) \
+        for path in paths]
+
+    return final_results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _delete_batch(self, container, blobs):
+    """A helper method. Azure Blob Storage Python Client allows batch
+    deletions for blobs within the same container.
+
+    Args:
+      container: container name.
+      blobs: list of blobs to be deleted.
+
+    Returns:
+      Dictionary of the form {(container, blob): error}, where error is
+      202 if the operation succeeded.
+    """
+    container_client = self.client.get_container_client(container)
+    results = {}
+
+    try:
+      response = container_client.delete_blobs(
+          *blobs, raise_on_any_failure=False)
+
+      for blob, error in zip(blobs, response):
+        results[(container, blob)] = error.status_code
+
+    except BlobStorageError as e:
+      for blob in blobs:
+        results[(container, blob)] = e.message
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def list_prefix(self, path):
+    """Lists files matching the prefix.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      Dictionary of file name -> size.
+    """
+    storage_account, container, blob = parse_azfs_path(
+        path, blob_optional=True, get_account=True)
+    file_sizes = {}
+    counter = 0
+    start_time = time.time()
+
+    logging.info("Starting the size estimation of the input")
+    container_client = self.client.get_container_client(container)
+
+    while True:
+      response = container_client.list_blobs(name_starts_with=blob)
+      for item in response:
+        file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
+        file_sizes[file_name] = item.size
+        counter += 1
+        if counter % 10000 == 0:
+          logging.info("Finished computing size of: %s files", len(file_sizes))
+      break
+
+    logging.info(
+        "Finished listing %s files in %s seconds.",
+        counter,
+        time.time() - start_time)
+    return file_sizes
+
+
+class BlobStorageDownloader(Downloader):
+  def __init__(self, client, path, buffer_size):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._buffer_size = buffer_size
+
+    self._blob_to_download = self._client.get_blob_client(
+        self._container, self._blob)
+
+    try:
+      properties = self._get_object_properties()
+    except ResourceNotFoundError as http_error:
+      if http_error.status_code == 404:
+        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
+      else:
+        _LOGGER.error(
+            'HTTP error while requesting file %s: %s', self._path, http_error)
+        raise
+
+    self._size = properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _get_object_properties(self):
+    return self._blob_to_download.get_blob_properties()
+
+  @property
+  def size(self):
+    return self._size
+
+  def get_range(self, start, end):
+    # Download_blob first parameter is offset and second is length (exclusive).
+    blob_data = self._blob_to_download.download_blob(start, end - start)
+    # Returns the content as bytes.
+    return blob_data.readall()
+
+
+class BlobStorageUploader(Uploader):
+  def __init__(self, client, path, mime_type='application/octet-stream'):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._content_settings = ContentSettings(mime_type)
+
+    self._blob_to_upload = self._client.get_blob_client(
+        self._container, self._blob)
+
+    # Temporary file.
+    self._temporary_file = tempfile.NamedTemporaryFile()
+
+  def put(self, data):
+    self._temporary_file.write(data.tobytes())
+
+  def finish(self):
+    self._temporary_file.seek(0)
+    # The temporary file is deleted immediately after the operation.
+    with open(self._temporary_file.name, "rb") as f:
+      self._blob_to_upload.upload_blob(
+          f.read(), overwrite=True, content_settings=self._content_settings)

Review comment:
       I recall an issue related to very large files. What happens when we're trying to upload a large file?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12492:
URL: https://github.com/apache/beam/pull/12492#issuecomment-682300779


   Thanks for taking a look @epicfaace ! : )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] AldairCoronel commented on a change in pull request #12492: [BEAM-6807] Implement an Azure blobstore filesystem for Python SDK

Posted by GitBox <gi...@apache.org>.
AldairCoronel commented on a change in pull request #12492:
URL: https://github.com/apache/beam/pull/12492#discussion_r477000643



##########
File path: sdks/python/apache_beam/io/azure/blobstorageio.py
##########
@@ -0,0 +1,664 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure Blob Storage client.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import errno
+import io
+import logging
+import os
+import re
+import tempfile
+import time
+from builtins import object
+
+from apache_beam.io.filesystemio import Downloader
+from apache_beam.io.filesystemio import DownloaderStream
+from apache_beam.io.filesystemio import Uploader
+from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.utils import retry
+
+_LOGGER = logging.getLogger(__name__)
+
+try:
+  # pylint: disable=wrong-import-order, wrong-import-position
+  # pylint: disable=ungrouped-imports
+  from azure.core.exceptions import ResourceNotFoundError
+  from azure.storage.blob import (
+      BlobServiceClient,
+      ContentSettings,
+  )
+  AZURE_DEPS_INSTALLED = True
+except ImportError:
+  AZURE_DEPS_INSTALLED = False
+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+MAX_BATCH_OPERATION_SIZE = 100
+
+
+def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
+  """Return the storage account, the container and
+  blob names of the given azfs:// path.
+  """
+  match = re.match(
+      '^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
+      '[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
+      azfs_path)
+  if match is None or (match.group(3) == '' and not blob_optional):
+    raise ValueError(
+        'Azure Blob Storage path must be in the form '
+        'azfs://<storage-account>/<container>/<path>.')
+  result = None
+  if get_account:
+    result = match.group(1), match.group(2), match.group(3)
+  else:
+    result = match.group(2), match.group(3)
+  return result
+
+
+def get_azfs_url(storage_account, container, blob=''):
+  """Returns the url in the form of
+   https://account.blob.core.windows.net/container/blob-name
+  """
+  return 'https://' + storage_account + '.blob.core.windows.net/' + \
+          container + '/' + blob
+
+
+class Blob():
+  """A Blob in Azure Blob Storage."""
+  def __init__(self, etag, name, last_updated, size, mime_type):
+    self.etag = etag
+    self.name = name
+    self.last_updated = last_updated
+    self.size = size
+    self.mime_type = mime_type
+
+
+class BlobStorageIOError(IOError, retry.PermanentException):
+  """Blob Strorage IO error that should not be retried."""
+  pass
+
+
+class BlobStorageError(Exception):
+  """Blob Storage client error."""
+  def __init__(self, message=None, code=None):
+    self.message = message
+    self.code = code
+
+
+class BlobStorageIO(object):
+  """Azure Blob Storage I/O client."""
+  def __init__(self, client=None):
+    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+    if client is None:
+      self.client = BlobServiceClient.from_connection_string(connect_str)
+    else:
+      self.client = client
+    if not AZURE_DEPS_INSTALLED:
+      raise RuntimeError('Azure dependencies are not installed. Unable to run.')
+
+  def open(
+      self,
+      filename,
+      mode='r',
+      read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
+      mime_type='application/octet-stream'):
+    """Open an Azure Blob Storage file path for reading or writing.
+
+    Args:
+      filename (str): Azure Blob Storage file path in the form
+                      ``azfs://<storage-account>/<container>/<path>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
+
+    Returns:
+      Azure Blob Storage file object.
+    Raises:
+      ValueError: Invalid open file mode.
+    """
+    if mode == 'r' or mode == 'rb':
+      downloader = BlobStorageDownloader(
+          self.client, filename, buffer_size=read_buffer_size)
+      return io.BufferedReader(
+          DownloaderStream(
+              downloader, read_buffer_size=read_buffer_size, mode=mode),
+          buffer_size=read_buffer_size)
+    elif mode == 'w' or mode == 'wb':
+      uploader = BlobStorageUploader(self.client, filename, mime_type)
+      return io.BufferedWriter(
+          UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
+    else:
+      raise ValueError('Invalid file open mode: %s.' % mode)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def copy(self, src, dest):
+    """Copies a single Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Raises:
+      TimeoutError: on timeout.
+    """
+    src_storage_account, src_container, src_blob = parse_azfs_path(
+        src, get_account=True)
+    dest_container, dest_blob = parse_azfs_path(dest)
+
+    source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
+    copied_blob = self.client.get_blob_client(dest_container, dest_blob)
+
+    try:
+      copied_blob.start_copy_from_url(source_blob)
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_tree(self, src, dest):
+    """Renames the given Azure Blob storage directory and its contents
+    recursively from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      List of tuples of (src, dest, exception) where exception is None if the
+      operation succeeded or the relevant exception if the operation failed.
+    """
+    assert src.endswith('/')
+    assert dest.endswith('/')
+
+    results = []
+    for entry in self.list_prefix(src):
+      rel_path = entry[len(src):]
+      try:
+        self.copy(entry, dest + rel_path)
+        results.append((entry, dest + rel_path, None))
+      except BlobStorageError as e:
+        results.append((entry, dest + rel_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy operation is already an idempotent operation protected
+  # by retry decorators.
+  def copy_paths(self, src_dest_pairs):
+    """Copies the given Azure Blob Storage blobs from src to dest. This can
+    handle directory or file paths.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name] file paths
+                      to copy from src to dest.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is None if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    results = []
+
+    for src_path, dest_path in src_dest_pairs:
+      # Case 1. They are directories.
+      if src_path.endswith('/') and dest_path.endswith('/'):
+        try:
+          results += self.copy_tree(src_path, dest_path)
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Case 2. They are individual blobs.
+      elif not src_path.endswith('/') and not dest_path.endswith('/'):
+        try:
+          self.copy(src_path, dest_path)
+          results.append((src_path, dest_path, None))
+        except BlobStorageError as e:
+          results.append((src_path, dest_path, e))
+
+      # Mismatched paths (one directory, one non-directory) get an error.
+      else:
+        e = BlobStorageError(
+            "Unable to copy mismatched paths" +
+            "(directory, non-directory): %s, %s" % (src_path, dest_path),
+            400)
+        results.append((src_path, dest_path, e))
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename(self, src, dest):
+    """Renames the given Azure Blob Storage blob from src to dest.
+
+    Args:
+      src: Blob Storage file path pattern in the form
+           azfs://<storage-account>/<container>/[name].
+      dest: Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    self.copy(src, dest)
+    self.delete(src)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def rename_files(self, src_dest_pairs):
+    """Renames the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      src_dest_pairs: List of (src, dest) tuples of
+                      azfs://<storage-account>/<container>/[name]
+                      file paths to rename from src to dest.
+    Returns: List of tuples of (src, dest, exception) in the same order as the
+             src_dest_pairs argument, where exception is None if the operation
+             succeeded or the relevant exception if the operation failed.
+    """
+    if not src_dest_pairs:
+      return []
+
+    for src, dest in src_dest_pairs:
+      if src.endswith('/') or dest.endswith('/'):
+        raise ValueError('Unable to rename a directory.')
+
+    # Results from copy operation.
+    copy_results = self.copy_paths(src_dest_pairs)
+    paths_to_delete = \
+        [src for (src, _, error) in copy_results if error is None]
+    # Results from delete operation.
+    delete_results = self.delete_files(paths_to_delete)
+
+    # Get rename file results (list of tuples).
+    results = []
+
+    # Using a dictionary will make the operation faster.
+    delete_results_dict = {src: error for (src, error) in delete_results}
+
+    for src, dest, error in copy_results:
+      # If there was an error in the copy operation.
+      if error is not None:
+        results.append((src, dest, error))
+      # If there was an error in the delete operation.
+      elif delete_results_dict[src] is not None:
+        results.append((src, dest, delete_results_dict[src]))
+      # If there was no error in the operations.
+      else:
+        results.append((src, dest, None))
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def exists(self, path):
+    """Returns whether the given Azure Blob Storage blob exists.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_check.get_blob_properties()
+      return True
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # HTTP 404 indicates that the file did not exist.
+        return False
+      else:
+        # We re-raise all other exceptions.
+        raise
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def size(self, path):
+    """Returns the size of a single Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Blob Storage blob.
+
+    Returns: size of the Blob Storage blob in bytes.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single
+    Azure Blob Storage blob.
+
+    This method does not perform glob expansion. Hence the
+    given path must be for a single Azure Blob Storage blob.
+
+    Returns: last updated time of the Azure Blob Storage blob
+    in seconds.
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    datatime = properties.last_modified
+    return (
+        time.mktime(datatime.timetuple()) - time.timezone +
+        datatime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def checksum(self, path):
+    """Looks up the checksum of an Azure Blob Storage blob.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_check = self.client.get_blob_client(container, blob)
+    try:
+      properties = blob_to_check.get_blob_properties()
+    except ResourceNotFoundError as e:
+      message = e.reason
+      code = e.status_code
+      raise BlobStorageError(message, code)
+
+    return properties.etag
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def delete(self, path):
+    """Deletes a single blob at the given Azure Blob Storage path.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+    """
+    container, blob = parse_azfs_path(path)
+    blob_to_delete = self.client.get_blob_client(container, blob)
+    try:
+      blob_to_delete.delete_blob()
+    except ResourceNotFoundError as e:
+      if e.status_code == 404:
+        # Return success when the file doesn't exist anymore for idempotency.
+        return
+      else:
+        logging.error('HTTP error while deleting file %s', path)
+        raise e
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_paths(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+    This can handle directory or file paths.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    directories, blobs = [], []
+
+    # Retrieve directories and not directories.
+    for path in paths:
+      if path.endswith('/'):
+        directories.append(path)
+      else:
+        blobs.append(path)
+
+    results = {}
+
+    for directory in directories:
+      directory_result = dict(self.delete_tree(directory))
+      results.update(directory_result)
+
+    blobs_results = dict(self.delete_files(blobs))
+    results.update(blobs_results)
+
+    return results
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_tree(self, root):
+    """Deletes all blobs under the given Azure BlobStorage virtual
+    directory.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name]
+            (ending with a "/").
+
+    Returns:
+      List of tuples of (path, exception), where each path is a blob
+      under the given root. exception is 202 if the operation succeeded
+      or the relevant exception if the operation failed.
+    """
+    assert root.endswith('/')
+
+    # Get the blob under the root directory.
+    paths_to_delete = self.list_prefix(root)
+
+    return self.delete_files(paths_to_delete)
+
+  # We intentionally do not decorate this method with a retry, since the
+  # underlying copy and delete operations are already idempotent operations
+  # protected by retry decorators.
+  def delete_files(self, paths):
+    """Deletes the given Azure Blob Storage blobs from src to dest.
+
+    Args:
+      paths: list of Azure Blob Storage paths in the form
+             azfs://<storage-account>/<container>/[name] that give the
+             file blobs to be deleted.
+
+    Returns:
+      List of tuples of (src, dest, exception) in the same order as the
+      src_dest_pairs argument, where exception is 202 if the operation
+      succeeded or the relevant exception if the operation failed.
+    """
+    if not paths:
+      return []
+
+    # Group blobs into containers.
+    containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \
+        for path in paths])
+
+    grouped_blobs = {container: [] for container in containers}
+
+    # Fill dictionary.
+    for container, blob in zip(containers, blobs):
+      grouped_blobs[container].append(blob)
+
+    results = {}
+
+    # Delete minibatches of blobs for each container.
+    for container, blobs in grouped_blobs.items():
+      for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE):
+        blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE]
+        results.update(self._delete_batch(container, blobs_to_delete))
+
+    final_results = \
+        [(path, results[parse_azfs_path(path, get_account=False)]) \
+        for path in paths]
+
+    return final_results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _delete_batch(self, container, blobs):
+    """A helper method. Azure Blob Storage Python Client allows batch
+    deletions for blobs within the same container.
+
+    Args:
+      container: container name.
+      blobs: list of blobs to be deleted.
+
+    Returns:
+      Dictionary of the form {(container, blob): error}, where error is
+      202 if the operation succeeded.
+    """
+    container_client = self.client.get_container_client(container)
+    results = {}
+
+    try:
+      response = container_client.delete_blobs(
+          *blobs, raise_on_any_failure=False)
+
+      for blob, error in zip(blobs, response):
+        results[(container, blob)] = error.status_code
+
+    except BlobStorageError as e:
+      for blob in blobs:
+        results[(container, blob)] = e.message
+
+    return results
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def list_prefix(self, path):
+    """Lists files matching the prefix.
+
+    Args:
+      path: Azure Blob Storage file path pattern in the form
+            azfs://<storage-account>/<container>/[name].
+
+    Returns:
+      Dictionary of file name -> size.
+    """
+    storage_account, container, blob = parse_azfs_path(
+        path, blob_optional=True, get_account=True)
+    file_sizes = {}
+    counter = 0
+    start_time = time.time()
+
+    logging.info("Starting the size estimation of the input")
+    container_client = self.client.get_container_client(container)
+
+    while True:
+      response = container_client.list_blobs(name_starts_with=blob)
+      for item in response:
+        file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
+        file_sizes[file_name] = item.size
+        counter += 1
+        if counter % 10000 == 0:
+          logging.info("Finished computing size of: %s files", len(file_sizes))
+      break
+
+    logging.info(
+        "Finished listing %s files in %s seconds.",
+        counter,
+        time.time() - start_time)
+    return file_sizes
+
+
+class BlobStorageDownloader(Downloader):
+  def __init__(self, client, path, buffer_size):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._buffer_size = buffer_size
+
+    self._blob_to_download = self._client.get_blob_client(
+        self._container, self._blob)
+
+    try:
+      properties = self._get_object_properties()
+    except ResourceNotFoundError as http_error:
+      if http_error.status_code == 404:
+        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
+      else:
+        _LOGGER.error(
+            'HTTP error while requesting file %s: %s', self._path, http_error)
+        raise
+
+    self._size = properties.size
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_beam_io_error_filter)
+  def _get_object_properties(self):
+    return self._blob_to_download.get_blob_properties()
+
+  @property
+  def size(self):
+    return self._size
+
+  def get_range(self, start, end):
+    # Download_blob first parameter is offset and second is length (exclusive).
+    blob_data = self._blob_to_download.download_blob(start, end - start)
+    # Returns the content as bytes.
+    return blob_data.readall()
+
+
+class BlobStorageUploader(Uploader):
+  def __init__(self, client, path, mime_type='application/octet-stream'):
+    self._client = client
+    self._path = path
+    self._container, self._blob = parse_azfs_path(path)
+    self._content_settings = ContentSettings(mime_type)
+
+    self._blob_to_upload = self._client.get_blob_client(
+        self._container, self._blob)
+
+    # Temporary file.
+    self._temporary_file = tempfile.NamedTemporaryFile()
+
+  def put(self, data):
+    self._temporary_file.write(data.tobytes())
+
+  def finish(self):
+    self._temporary_file.seek(0)
+    # The temporary file is deleted immediately after the operation.
+    with open(self._temporary_file.name, "rb") as f:
+      self._blob_to_upload.upload_blob(
+          f.read(), overwrite=True, content_settings=self._content_settings)

Review comment:
       @pabloem Let's see:
   
   - Authentication. At the moment the only way to authenticate is with a connection string obtained from environment variables. 
   - **Integration tests with Azurite.** Integration tests with Azurite are practically ready. The only thing left is to define a function in `build.gradle` that runs and stops Azurite. (You can find the branch here: [https://github.com/AldairCoronel/beam/commits/azurite](url)).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org