You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/06/20 21:20:40 UTC
[reef] branch master updated: [REEF-2022] Fix
AzureBlockBlobFileSystem.GetChildren API and unit tests
This is an automated email from the ASF dual-hosted git repository.
motus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/reef.git
The following commit(s) were added to refs/heads/master by this push:
new 396fd82 [REEF-2022] Fix AzureBlockBlobFileSystem.GetChildren API and unit tests
396fd82 is described below
commit 396fd82503a2f8205a0c1d5cb742ce17bee1ad51
Author: sharathmalladi <ms...@live.com>
AuthorDate: Wed Jun 20 14:20:38 2018 -0700
[REEF-2022] Fix AzureBlockBlobFileSystem.GetChildren API and unit tests
* Fix blob filesystem to align with local file system behavior
* Fix the `GetChildren` API and added unit tests.
JIRA: [REEF-2022](https://issues.apache.org/jira/browse/REEF-2022)
Closes #1464
---
.../Evaluator/EvaluatorRequest.cs | 76 ++++++++++++++++++----
.../TestAzureBlockBlobFileSystemE2E.cs | 43 +++++++++++-
.../AzureBlob/AzureBlockBlobFileSystem.cs | 69 +++++++++++++++-----
.../FileSystem/AzureBlob/AzureCloudBlobClient.cs | 29 +++++++--
.../FileSystem/AzureBlob/ICloudBlobClient.cs | 20 +++++-
5 files changed, 198 insertions(+), 39 deletions(-)
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
index 0538dfd..c0c1952 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
@@ -28,44 +28,92 @@ namespace Org.Apache.REEF.Driver.Evaluator
[DataContract]
internal class EvaluatorRequest : IEvaluatorRequest
{
- internal EvaluatorRequest()
- : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty)
+ internal EvaluatorRequest() : this(number: 0, megaBytes: 0)
{
}
- internal EvaluatorRequest(int number, int megaBytes)
- : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty)
+ internal EvaluatorRequest(int number, int megaBytes) : this(number: number, megaBytes: megaBytes, core: 1)
{
}
internal EvaluatorRequest(int number, int megaBytes, int core)
- : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty)
+ : this(
+ number: number,
+ megaBytes: megaBytes,
+ core: core,
+ rack: string.Empty)
{
}
internal EvaluatorRequest(int number, int megaBytes, string rack)
- : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty)
+ : this(
+ number: number,
+ megaBytes: megaBytes,
+ core: 1,
+ rack: rack)
{
}
internal EvaluatorRequest(int number, int megaBytes, int core, string rack)
- : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty)
+ : this(
+ number: number,
+ megaBytes: megaBytes,
+ core: core,
+ rack: rack,
+ evaluatorBatchId: Guid.NewGuid().ToString("N"),
+ nodeNames: new string[] { })
{
}
- internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames)
- : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true, string.Empty)
-
+ internal EvaluatorRequest(
+ int number,
+ int megaBytes,
+ int core,
+ string rack,
+ string evaluatorBatchId,
+ ICollection<string> nodeNames)
+ : this(
+ number: number,
+ megaBytes: megaBytes,
+ core: core,
+ rack: rack,
+ evaluatorBatchId: evaluatorBatchId,
+ nodeNames: nodeNames,
+ relaxLocality: true)
{
}
- internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames, bool relaxLocality)
- : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality, string.Empty)
-
+ internal EvaluatorRequest(
+ int number,
+ int megaBytes,
+ int core,
+ string rack,
+ string evaluatorBatchId,
+ ICollection<string> nodeNames,
+ bool relaxLocality)
+ : this(
+ number: number,
+ megaBytes: megaBytes,
+ core: core,
+ rack: rack,
+ evaluatorBatchId: evaluatorBatchId,
+ runtimeName: string.Empty,
+ nodeNames: nodeNames,
+ relaxLocality: relaxLocality,
+ nodeLabelExpression: string.Empty)
{
}
- internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality, string nodeLabelExpression)
+ internal EvaluatorRequest(
+ int number,
+ int megaBytes,
+ int core,
+ string rack,
+ string evaluatorBatchId,
+ string runtimeName,
+ ICollection<string> nodeNames,
+ bool relaxLocality,
+ string nodeLabelExpression)
{
Number = number;
MemoryMegaBytes = megaBytes;
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
index bbe2b9e..852f63c 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
@@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.IO;
+using System.Linq;
using System.Text;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
@@ -43,7 +44,7 @@ namespace Org.Apache.REEF.IO.Tests
public TestAzureBlockBlobFileSystemE2E()
{
// Fill in before running test!
- const string ConnectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;";
+ const string ConnectionString = "DefaultEndpointsProtocol=https;AccountName=myAccount;AccountKey=myKey;EndpointSuffix=core.windows.net";
var defaultContainerName = "reef-test-container-" + Guid.NewGuid();
var conf = AzureBlockBlobFileSystemConfiguration.ConfigurationModule
.Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, ConnectionString)
@@ -139,6 +140,46 @@ namespace Org.Apache.REEF.IO.Tests
}
[Fact(Skip = SkipMessage)]
+ public void TestGetChildBlobsInContainerE2E()
+ {
+ // setup
+ string[] fileNames = new string[] { "sample1", "sample2", "sample3", "folder1/sample4", "folder1/sample5" };
+ string[] expectedFolderChildren = new string[] { "folder1/sample4", "folder1/sample5" };
+ string[] expectedRootChildren = new string[] { "sample1", "sample2", "sample3", "folder1/" };
+
+ foreach (string uploadedBlobName in fileNames)
+ {
+ CloudBlockBlob blob = _container.GetBlockBlobReference(uploadedBlobName);
+ UploadFromString(blob, "hello");
+ }
+
+ Uri containerUri = new Uri(_container.Uri.AbsoluteUri + '/');
+
+ // List files in the root level in container
+ ValidateChildren(_container.Uri, expectedRootChildren.Select(child => new Uri(containerUri, child)));
+
+ // List files only in the sub-folder in the container
+ Uri folderUri = _container.GetDirectoryReference("folder1").Uri;
+ ValidateChildren(folderUri, expectedFolderChildren.Select(child => new Uri(containerUri, child)));
+ }
+
+ [Fact(Skip = SkipMessage)]
+ public void TestGetChildContainerInStorageAccountE2E()
+ {
+ // List containers in the storage account
+ Uri rootUri = _fileSystem.CreateUriForPath(string.Empty);
+ ValidateChildren(rootUri, new List<Uri> { _container.Uri });
+ }
+
+ private void ValidateChildren(Uri storageBlobUri, IEnumerable<Uri> expectedChildBlobs)
+ {
+ IEnumerable<Uri> blobs = _fileSystem.GetChildren(storageBlobUri);
+ Assert.Equal(
+ expectedChildBlobs.Select(uri => uri.AbsoluteUri).OrderBy(uri => uri),
+ blobs.Select(uri => uri.AbsoluteUri).OrderBy(uri => uri));
+ }
+
+ [Fact(Skip = SkipMessage)]
public void TestExistsE2E()
{
var helloFilePath = PathToFile(HelloFile);
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
index 9292f42..d1004c3 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
@@ -134,7 +134,9 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
var uriSplit = directoryUri.AbsolutePath.Split(new[] { "/" }, StringSplitOptions.RemoveEmptyEntries);
if (!uriSplit.Any())
{
- throw new StorageException(string.Format("URI {0} must contain at least the container.", directoryUri));
+ throw new StorageException(
+ string.Format("URI {0} must contain at least the container.",
+ directoryUri));
}
var containerName = uriSplit[0];
@@ -152,32 +154,69 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
directory = directory.GetDirectoryReference(uriSplit[i]);
}
- foreach (var blob in directory.ListBlobs(true).OfType<ICloudBlob>())
- {
- blob.DeleteIfExistsAsync().Wait();
- }
+ Task.WaitAll(directory
+ .ListBlobs(true)
+ .OfType<ICloudBlob>()
+ .Select(blob => blob.DeleteIfExistsAsync())
+ .ToArray());
}
/// <summary>
- /// Gets the children of the blob "directory."
+ /// Gets the children of the container (if the uri has segments)
+ /// or containers in the storage account (if the uri has no segments)
/// </summary>
public IEnumerable<Uri> GetChildren(Uri directoryUri)
{
+ string[] segments = directoryUri.Segments;
BlobContinuationToken blobContinuationToken = null;
- var path = directoryUri.AbsolutePath.Trim('/');
- do
+ // If at the root, return all containers
+ if (segments.Length <= 1)
{
- var listing = _client.ListBlobsSegmented(path, false,
- BlobListingDetails.None, null,
- blobContinuationToken, new BlobRequestOptions(), new OperationContext());
-
- if (listing.Results != null)
+ do
{
- foreach (var listBlobItem in listing.Results)
+ ContainerResultSegment containerListing = _client.ListContainersSegmented(blobContinuationToken);
+
+ if (containerListing?.Results != null)
{
- yield return listBlobItem.Uri;
+ foreach (CloudBlobContainer containerItem in containerListing.Results)
+ {
+ yield return containerItem.Uri;
+ }
}
+
+ blobContinuationToken = containerListing?.ContinuationToken;
+ }
+ while (blobContinuationToken != null);
+ yield break;
+ }
+
+ // If not at the root folder, return all blobs within the container
+ string containerName = segments[1];
+ string relativeAddress = directoryUri.PathAndQuery.Substring(containerName.Length + 1);
+
+ do
+ {
+ BlobResultSegment listing = _client.ListBlobsSegmented(
+ containerName,
+ relativeAddress,
+ useFlatListing: false,
+ BlobListingDetails.None,
+ maxResults: null,
+ blobContinuationToken,
+ new BlobRequestOptions(),
+ new OperationContext());
+
+ if (listing == null || listing.Results.Count() == 0)
+ {
+ throw new ArgumentException(
+ "Call to ListBlobsSegmented returned no results. Uri is invalid or does not have children.",
+ nameof(directoryUri));
+ }
+
+ foreach (IListBlobItem listBlobItem in listing.Results)
+ {
+ yield return listBlobItem.Uri;
}
blobContinuationToken = listing.ContinuationToken;
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
index 86d55fa..5b54e68 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
@@ -67,14 +68,30 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
return new AzureCloudBlockBlob(uri, _client.Credentials);
}
- public BlobResultSegment ListBlobsSegmented(string prefix, bool useFlatListing, BlobListingDetails blobListingDetails,
- int? maxResults, BlobContinuationToken continuationToken, BlobRequestOptions blobRequestOptions,
+ public BlobResultSegment ListBlobsSegmented(
+ string containerName,
+ string relativeAddress,
+ bool useFlatListing,
+ BlobListingDetails blobListingDetails,
+ int? maxResults,
+ BlobContinuationToken continuationToken,
+ BlobRequestOptions blobRequestOptions,
OperationContext operationContext)
{
- var task = _client.ListBlobsSegmentedAsync(prefix, useFlatListing, blobListingDetails, maxResults, continuationToken,
- blobRequestOptions, operationContext);
- task.Wait();
- return task.Result;
+ CloudBlobContainer container = _client.GetContainerReference(containerName);
+ CloudBlobDirectory directory = container.GetDirectoryReference(relativeAddress);
+ return directory.ListBlobsSegmentedAsync(
+ useFlatListing,
+ blobListingDetails,
+ maxResults,
+ continuationToken,
+ blobRequestOptions,
+ operationContext).GetAwaiter().GetResult();
+ }
+
+ public ContainerResultSegment ListContainersSegmented(BlobContinuationToken continuationToken)
+ {
+ return _client.ListContainersSegmentedAsync(continuationToken).GetAwaiter().GetResult();
}
}
}
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs
index 0acc2c3..f2a43aa 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobClient.cs
@@ -56,9 +56,23 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
ICloudBlockBlob GetBlockBlobReference(Uri uri);
/// <summary>
- /// Paginates a blob listing with prefix.
+ /// Paginates a blob listing with container and relative path.
/// </summary>
- BlobResultSegment ListBlobsSegmented(string prefix, bool useFlatListing, BlobListingDetails blobListingDetails, int? maxResults,
- BlobContinuationToken continuationToken, BlobRequestOptions blobRequestOptions, OperationContext operationContext);
+ BlobResultSegment ListBlobsSegmented(
+ string containerName,
+ string relativeAddress,
+ bool useFlatListing,
+ BlobListingDetails blobListingDetails,
+ int? maxResults,
+ BlobContinuationToken continuationToken,
+ BlobRequestOptions blobRequestOptions,
+ OperationContext operationContext);
+
+ /// <summary>
+ /// Paginates a container listing.
+ /// </summary>
+ /// <param name="continuationToken">Continuation token for pagination</param>
+ /// <returns>List of containers</returns>
+ ContainerResultSegment ListContainersSegmented(BlobContinuationToken continuationToken);
}
}