You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/06/20 21:20:40 UTC

[reef] branch master updated: [REEF-2022] Fix AzureBlockBlobFileSystem.GetChildren API and unit tests

This is an automated email from the ASF dual-hosted git repository.

motus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/reef.git


The following commit(s) were added to refs/heads/master by this push:
     new 396fd82  [REEF-2022] Fix AzureBlockBlobFileSystem.GetChildren API and unit tests
396fd82 is described below

commit 396fd82503a2f8205a0c1d5cb742ce17bee1ad51
Author: sharathmalladi <ms...@live.com>
AuthorDate: Wed Jun 20 14:20:38 2018 -0700

    [REEF-2022] Fix AzureBlockBlobFileSystem.GetChildren API and unit tests
    
    * Fix blob filesystem to align with local file system behavior
    * Fix the `GetChildren` API and added unit tests.
    
    JIRA: [REEF-2022](https://issues.apache.org/jira/browse/REEF-2022)
    
    Closes #1464
---
 .../Evaluator/EvaluatorRequest.cs                  | 76 ++++++++++++++++++----
 .../TestAzureBlockBlobFileSystemE2E.cs             | 43 +++++++++++-
 .../AzureBlob/AzureBlockBlobFileSystem.cs          | 69 +++++++++++++++-----
 .../FileSystem/AzureBlob/AzureCloudBlobClient.cs   | 29 +++++++--
 .../FileSystem/AzureBlob/ICloudBlobClient.cs       | 20 +++++-
 5 files changed, 198 insertions(+), 39 deletions(-)

diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
index 0538dfd..c0c1952 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
@@ -28,44 +28,92 @@ namespace Org.Apache.REEF.Driver.Evaluator
     [DataContract]
     internal class EvaluatorRequest : IEvaluatorRequest
     {
-        internal EvaluatorRequest()
-            : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty)
+        internal EvaluatorRequest() : this(number: 0, megaBytes: 0)
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes)
-            : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty)
+        internal EvaluatorRequest(int number, int megaBytes) : this(number: number, megaBytes: megaBytes, core: 1)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core)
-            : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty)
+            : this(
+                  number: number,
+                  megaBytes: megaBytes,
+                  core: core,
+                  rack: string.Empty)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, string rack)
-            : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty)
+            : this(
+                  number: number,
+                  megaBytes: megaBytes,
+                  core: 1,
+                  rack: rack)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core, string rack)
-            : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty)
+            : this(
+                  number: number,
+                  megaBytes: megaBytes,
+                  core: core,
+                  rack: rack,
+                  evaluatorBatchId: Guid.NewGuid().ToString("N"),
+                  nodeNames: new string[] { })
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames)
-            : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true, string.Empty)
-
+        internal EvaluatorRequest(
+            int number,
+            int megaBytes,
+            int core,
+            string rack,
+            string evaluatorBatchId,
+            ICollection<string> nodeNames)
+                : this(
+                number: number,
+                megaBytes: megaBytes,
+                core: core,
+                rack: rack,
+                evaluatorBatchId: evaluatorBatchId,
+                nodeNames: nodeNames,
+                relaxLocality: true)
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames, bool relaxLocality)
-           : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality, string.Empty)
-
+        internal EvaluatorRequest(
+            int number,
+            int megaBytes,
+            int core,
+            string rack,
+            string evaluatorBatchId,
+            ICollection<string> nodeNames,
+            bool relaxLocality)
+                : this(
+                number: number,
+                megaBytes: megaBytes,
+                core: core,
+                rack: rack,
+                evaluatorBatchId: evaluatorBatchId,
+                runtimeName: string.Empty,
+                nodeNames: nodeNames,
+                relaxLocality: relaxLocality,
+                nodeLabelExpression: string.Empty)
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality, string nodeLabelExpression)
+        internal EvaluatorRequest(
+            int number,
+            int megaBytes,
+            int core,
+            string rack,
+            string evaluatorBatchId,
+            string runtimeName,
+            ICollection<string> nodeNames,
+            bool relaxLocality,
+            string nodeLabelExpression)
         {
             Number = number;
             MemoryMegaBytes = megaBytes;
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
index bbe2b9e..852f63c 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.IO;
+using System.Linq;
 using System.Text;
 using Microsoft.WindowsAzure.Storage;
 using Microsoft.WindowsAzure.Storage.Blob;
@@ -43,7 +44,7 @@ namespace Org.Apache.REEF.IO.Tests
         public TestAzureBlockBlobFileSystemE2E()
         {
             // Fill in before running test!
-            const string ConnectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;";
+            const string ConnectionString = "DefaultEndpointsProtocol=https;AccountName=myAccount;AccountKey=myKey;EndpointSuffix=core.windows.net";
             var defaultContainerName = "reef-test-container-" + Guid.NewGuid();
             var conf = AzureBlockBlobFileSystemConfiguration.ConfigurationModule
                 .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, ConnectionString)
@@ -139,6 +140,46 @@ namespace Org.Apache.REEF.IO.Tests
         }
 
         [Fact(Skip = SkipMessage)]
+        public void TestGetChildBlobsInContainerE2E()
+        {
+            // setup
+            string[] fileNames = new string[] { "sample1", "sample2", "sample3", "folder1/sample4", "folder1/sample5" };
+            string[] expectedFolderChildren = new string[] { "folder1/sample4", "folder1/sample5" };
+            string[] expectedRootChildren = new string[] { "sample1", "sample2", "sample3", "folder1/" };
+
+            foreach (string uploadedBlobName in fileNames)
+            {
+                CloudBlockBlob blob = _container.GetBlockBlobReference(uploadedBlobName);
+                UploadFromString(blob, "hello");
+            }
+
+            Uri containerUri = new Uri(_container.Uri.AbsoluteUri + '/');
+
+            // List files in the root level in container
+            ValidateChildren(_container.Uri, expectedRootChildren.Select(child => new Uri(containerUri, child)));
+
+            // List files only in the sub-folder in the container
+            Uri folderUri = _container.GetDirectoryReference("folder1").Uri;
+            ValidateChildren(folderUri, expectedFolderChildren.Select(child => new Uri(containerUri, child)));
+        }
+
+        [Fact(Skip = SkipMessage)]
+        public void TestGetChildContainerInStorageAccountE2E()
+        {
+            // List containers in the storage account
+            Uri rootUri = _fileSystem.CreateUriForPath(string.Empty);
+            ValidateChildren(rootUri, new List<Uri> { _container.Uri });
+        }
+
+        private void ValidateChildren(Uri storageBlobUri, IEnumerable<Uri> expectedChildBlobs)
+        {
+            IEnumerable<Uri> blobs = _fileSystem.GetChildren(storageBlobUri);
+            Assert.Equal(
+                expectedChildBlobs.Select(uri => uri.AbsoluteUri).OrderBy(uri => uri),
+                blobs.Select(uri => uri.AbsoluteUri).OrderBy(uri => uri));
+        }
+
+        [Fact(Skip = SkipMessage)]
         public void TestExistsE2E()
         {
             var helloFilePath = PathToFile(HelloFile);
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
index 9292f42..d1004c3 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
@@ -134,7 +134,9 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
             var uriSplit = directoryUri.AbsolutePath.Split(new[] { "/" }, StringSplitOptions.RemoveEmptyEntries);
             if (!uriSplit.Any())
             {
-                throw new StorageException(string.Format("URI {0} must contain at least the container.", directoryUri));
+                throw new StorageException(
+                    string.Format("URI {0} must contain at least the container.",
+                    directoryUri));
             }
 
             var containerName = uriSplit[0];
@@ -152,32 +154,69 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
                 directory = directory.GetDirectoryReference(uriSplit[i]);
             }
 
-            foreach (var blob in directory.ListBlobs(true).OfType<ICloudBlob>())
-            {
-                blob.DeleteIfExistsAsync().Wait();
-            }
+            Task.WaitAll(directory
+                .ListBlobs(true)
+                .OfType<ICloudBlob>()
+                .Select(blob => blob.DeleteIfExistsAsync())
+                .ToArray());
         }
 
         /// <summary>
-        /// Gets the children of the blob "directory."
+        /// Gets the children of the container (if the uri has segments)
+        /// or containers in the storage account (if the uri has no segments)
         /// </summary>
         public IEnumerable<Uri> GetChildren(Uri directoryUri)
         {
+            string[] segments = directoryUri.Segments;
             BlobContinuationToken blobContinuationToken = null;
-            var path = directoryUri.AbsolutePath.Trim('/');
 
-            do
+            // If at the root, return all containers
+            if (segments.Length <= 1)
             {
-                var listing = _client.ListBlobsSegmented(path, false,
-                    BlobListingDetails.None, null,
-                    blobContinuationToken, new BlobRequestOptions(), new OperationContext());
-
-                if (listing.Results != null)
+                do
                 {
-                    foreach (var listBlobItem in listing.Results)
+                    ContainerResultSegment containerListing = _client.ListContainersSegmented(blobContinuationToken);
+
+                    if (containerListing?.Results != null)
                     {
-                        yield return listBlobItem.Uri;
+                        foreach (CloudBlobContainer containerItem in containerListing.Results)
+                        {
+                            yield return containerItem.Uri;
+                        }
                     }
+
+                    blobContinuationToken = containerListing?.ContinuationToken;
+                }
+                while (blobContinuationToken != null);
+                yield break;
+            }
+
+            // If not at the root folder, return all blobs within the container
+            string containerName = segments[1];
+            string relativeAddress = directoryUri.PathAndQuery.Substring(containerName.Length + 1);
+
+            do
+            {
+                BlobResultSegment listing = _client.ListBlobsSegmented(
+                    containerName,
+                    relativeAddress,
+                    useFlatListing: false,
+                    BlobListingDetails.None,
+                    maxResults: null,
+                    blobContinuationToken,
+                    new BlobRequestOptions(),
+                    new OperationContext());
+
+                if (listing == null || listing.Results.Count() == 0)
+                {
+                    throw new ArgumentException(
+                        "Call to ListBlobsSegmented returned no results. Uri is invalid or does not have children.",
+                        nameof(directoryUri));
+                }
+
+                foreach (IListBlobItem listBlobItem in listing.Results)
+                {
+                    yield return listBlobItem.Uri;
                 }
 
                 blobContinuationToken = listing.ContinuationToken;
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
index 86d55fa..5b54e68 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Threading.Tasks;
 using Microsoft.WindowsAzure.Storage;
 using Microsoft.WindowsAzure.Storage.Auth;
 using Microsoft.WindowsAzure.Storage.Blob;
@@ -67,14 +68,30 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
             return new AzureCloudBlockBlob(uri, _client.Credentials);
         }
 
-        public BlobResultSegment ListBlobsSegmented(string prefix, bool useFlatListing, BlobListingDetails blobListingDetails,
-            int? maxResults, BlobContinuationToken continuationToken, BlobRequestOptions blobRequestOptions,
+        public BlobResultSegment ListBlobsSegmented(
+            string containerName,
+            string relativeAddress,
+            bool useFlatListing,
+            BlobListingDetails blobListingDetails,
+            int? maxResults,
+            BlobContinuationToken continuationToken,
+            BlobRequestOptions blobRequestOptions,
             OperationContext operationContext)
         {
-            var task = _client.ListBlobsSegmentedAsync(prefix, useFlatListing, blobListingDetails, maxResults,  continuationToken, 
-                                                        blobRequestOptions, operationContext);
-            task.Wait();
-            return task.Result;
+            CloudBlobContainer container = _client.GetContainerReference(containerName);
+            CloudBlobDirectory directory = container.GetDirectoryReference(relativeAddress);
+            return directory.ListBlobsSegmentedAsync(
+                useFlatListing,
+                blobListingDetails,
+                maxResults,
+                continuationToken,
+                blobRequestOptions,
+                operationContext).GetAwaiter().GetResult();
+        }
+
+        public ContainerResultSegment ListContainersSegmented(BlobContinuationToken continuationToken)
+        {
+            return _client.ListContainersSegmentedAsync(continuationToken).GetAwaiter().GetResult();
         }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs
index 0acc2c3..f2a43aa 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs
@@ -56,9 +56,23 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
         ICloudBlockBlob GetBlockBlobReference(Uri uri);
 
         /// <summary>
-        /// Paginates a blob listing with prefix.
+        /// Paginates a blob listing with container and relative path.
         /// </summary>
-        BlobResultSegment ListBlobsSegmented(string prefix, bool useFlatListing, BlobListingDetails blobListingDetails, int? maxResults, 
-            BlobContinuationToken continuationToken, BlobRequestOptions blobRequestOptions, OperationContext operationContext);
+        BlobResultSegment ListBlobsSegmented(
+            string containerName,
+            string relativeAddress,
+            bool useFlatListing,
+            BlobListingDetails blobListingDetails,
+            int? maxResults, 
+            BlobContinuationToken continuationToken,
+            BlobRequestOptions blobRequestOptions,
+            OperationContext operationContext);
+
+        /// <summary>
+        /// Paginates a container listing.
+        /// </summary>
+        /// <param name="continuationToken">Continuation token for pagination</param>
+        /// <returns>List of containers</returns>
+        ContainerResultSegment ListContainersSegmented(BlobContinuationToken continuationToken);
     }
 }