You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/02/12 21:12:47 UTC
reef git commit: [REEF-1975] Support stream option for Azure Blob
storage
Repository: reef
Updated Branches:
refs/heads/master d45954935 -> 0348e54ec
[REEF-1975] Support stream option for Azure Blob storage
* Support stream option for Azure Blob storage
* Updated Open test to assert on stream Stream to blobs (#8)
* Implement Create method to create a blob and return a write stream to the blob
JIRA: [REEF-1975](https://issues.apache.org/jira/browse/REEF-1975)
Closes #1426
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/0348e54e
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/0348e54e
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/0348e54e
Branch: refs/heads/master
Commit: 0348e54ec13bd8f267670167af55b8f6797b24bd
Parents: d459549
Author: dwaijam <dw...@gmail.com>
Authored: Thu Jan 18 10:30:16 2018 -0800
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Mon Feb 12 12:57:17 2018 -0800
----------------------------------------------------------------------
.../TestAzureBlockBlobFileSystem.cs | 20 +++-
.../TestAzureBlockBlobFileSystemE2E.cs | 102 ++++++++++++-------
.../AzureBlob/AzureBlockBlobFileSystem.cs | 8 +-
.../FileSystem/AzureBlob/AzureCloudBlockBlob.cs | 10 ++
.../FileSystem/AzureBlob/ICloudBlockBlob.cs | 14 +++
5 files changed, 111 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
index a1f9b34..dc48d5e 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
@@ -36,18 +36,24 @@ namespace Org.Apache.REEF.IO.Tests
/// </summary>
public sealed class TestAzureBlockBlobFileSystem
{
- private readonly static Uri FakeUri = new Uri("http://fake.com");
+ private static readonly Uri FakeUri = new Uri("http://fake.com");
[Fact]
- public void TestCreateNotSupported()
+ public void TestCreate()
{
- Assert.Throws<NotSupportedException>(() => new TestContext().GetAzureFileSystem().Create(FakeUri));
+ var testContext = new TestContext();
+ Stream stream = testContext.GetAzureFileSystem().Create(new Uri(FakeUri, "container/file"));
+ testContext.TestCloudBlockBlob.Received(1).Create();
+ Assert.Equal(testContext.TestCreateStream, stream);
}
[Fact]
- public void TestOpenNotSupported()
+ public void TestOpen()
{
- Assert.Throws<NotSupportedException>(() => new TestContext().GetAzureFileSystem().Open(FakeUri));
+ var testContext = new TestContext();
+ Stream stream = testContext.GetAzureFileSystem().Open(new Uri(FakeUri, "container/file"));
+ testContext.TestCloudBlockBlob.Received(1).Open();
+ Assert.Equal(testContext.TestOpenStream, stream);
}
[Fact]
@@ -133,6 +139,8 @@ namespace Org.Apache.REEF.IO.Tests
public readonly ICloudBlockBlob TestCloudBlockBlob = Substitute.For<ICloudBlockBlob>();
public readonly ICloudBlobContainer TestCloudBlobContainer = Substitute.For<ICloudBlobContainer>();
public readonly ICloudBlobDirectory TestCloudBlobDirectory = Substitute.For<ICloudBlobDirectory>();
+ public readonly Stream TestOpenStream = Substitute.For<Stream>();
+ public readonly Stream TestCreateStream = Substitute.For<Stream>();
public IFileSystem GetAzureFileSystem()
{
@@ -144,6 +152,8 @@ namespace Org.Apache.REEF.IO.Tests
injector.BindVolatileInstance(TestCloudBlobClient);
var fs = injector.GetInstance<AzureBlockBlobFileSystem>();
TestCloudBlobClient.BaseUri.ReturnsForAnyArgs(FakeUri);
+ TestCloudBlockBlob.Open().Returns(TestOpenStream);
+ TestCloudBlockBlob.Create().Returns(TestCreateStream);
TestCloudBlobClient.GetBlockBlobReference(FakeUri).ReturnsForAnyArgs(TestCloudBlockBlob);
TestCloudBlobClient.GetContainerReference("container").ReturnsForAnyArgs(TestCloudBlobContainer);
TestCloudBlobContainer.GetDirectoryReference("directory").ReturnsForAnyArgs(TestCloudBlobDirectory);
http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
index 8e8a708..7b749e4 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
@@ -34,6 +34,7 @@ namespace Org.Apache.REEF.IO.Tests
/// </summary>
public sealed class TestAzureBlockBlobFileSystemE2E : IDisposable
{
+ private const string SkipMessage = "Fill in credentials before running test"; // Use null to run tests
private const string HelloFile = "hello";
private IFileSystem _fileSystem;
private CloudBlobContainer _container;
@@ -41,14 +42,14 @@ namespace Org.Apache.REEF.IO.Tests
public TestAzureBlockBlobFileSystemE2E()
{
// Fill in before running test!
- const string connectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;";
+ const string ConnectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;";
var defaultContainerName = "reef-test-container-" + Guid.NewGuid();
var conf = AzureBlockBlobFileSystemConfiguration.ConfigurationModule
- .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, connectionString)
+ .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, ConnectionString)
.Build();
_fileSystem = TangFactory.GetTang().NewInjector(conf).GetInstance<AzureBlockBlobFileSystem>();
- _container = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName);
+ _container = CloudStorageAccount.Parse(ConnectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName);
_container.CreateIfNotExistsAsync().Wait();
}
@@ -88,7 +89,40 @@ namespace Org.Apache.REEF.IO.Tests
return task.Result;
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
+ public void TestOpenE2E()
+ {
+ const string Text = "hello";
+ var blob = _container.GetBlockBlobReference(HelloFile);
+ UploadFromString(blob, Text);
+ Assert.True(CheckBlobExists(blob));
+ using (var reader = new StreamReader(_fileSystem.Open(PathToFile(HelloFile))))
+ {
+ string streamText = reader.ReadToEnd();
+ Assert.Equal(Text, streamText);
+ }
+ }
+
+ [Fact(Skip = SkipMessage)]
+ public void TestCreateE2E()
+ {
+ const string Text = "Hello Azure Blob";
+ var blob = _container.GetBlockBlobReference(HelloFile);
+ Assert.False(CheckBlobExists(blob));
+ using (var streamWriter = new StreamWriter(_fileSystem.Create(PathToFile(HelloFile))))
+ {
+ streamWriter.Write(Text);
+ }
+ blob = _container.GetBlockBlobReference(HelloFile);
+ Assert.True(CheckBlobExists(blob));
+ using (var reader = new StreamReader(blob.OpenRead()))
+ {
+ string streamText = reader.ReadToEnd();
+ Assert.Equal(Text, streamText);
+ }
+ }
+
+ [Fact(Skip = SkipMessage)]
public void TestDeleteE2E()
{
var blob = _container.GetBlockBlobReference(HelloFile);
@@ -98,7 +132,7 @@ namespace Org.Apache.REEF.IO.Tests
Assert.False(CheckBlobExists(blob));
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestExistsE2E()
{
var helloFilePath = PathToFile(HelloFile);
@@ -109,39 +143,39 @@ namespace Org.Apache.REEF.IO.Tests
Assert.False(_fileSystem.Exists(helloFilePath));
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestCopyE2E()
{
- const string srcFileName = "src";
- const string destFileName = "dest";
- var srcFilePath = PathToFile(srcFileName);
- var destFilePath = PathToFile(destFileName);
- ICloudBlob srcBlob = _container.GetBlockBlobReference(srcFileName);
+ const string SrcFileName = "src";
+ const string DestFileName = "dest";
+ var srcFilePath = PathToFile(SrcFileName);
+ var destFilePath = PathToFile(DestFileName);
+ ICloudBlob srcBlob = _container.GetBlockBlobReference(SrcFileName);
UploadFromString(srcBlob, "hello");
Assert.True(CheckBlobExists(srcBlob));
- ICloudBlob destBlob = _container.GetBlockBlobReference(destFileName);
+ ICloudBlob destBlob = _container.GetBlockBlobReference(DestFileName);
Assert.False(CheckBlobExists(destBlob));
_fileSystem.Copy(srcFilePath, destFilePath);
- destBlob = GetBlobReferenceFromServer(_container, destFileName);
+ destBlob = GetBlobReferenceFromServer(_container, DestFileName);
Assert.True(CheckBlobExists(destBlob));
- srcBlob = GetBlobReferenceFromServer(_container, srcFileName);
+ srcBlob = GetBlobReferenceFromServer(_container, SrcFileName);
Assert.True(CheckBlobExists(srcBlob));
- Assert.Equal(DownloadText(_container.GetBlockBlobReference(srcFileName)), DownloadText(_container.GetBlockBlobReference(destFileName)));
+ Assert.Equal(DownloadText(_container.GetBlockBlobReference(SrcFileName)), DownloadText(_container.GetBlockBlobReference(DestFileName)));
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestCopyToLocalE2E()
{
var helloFilePath = PathToFile(HelloFile);
var blob = _container.GetBlockBlobReference(HelloFile);
var tempFilePath = GetTempFilePath();
- const string text = "hello";
+ const string Text = "hello";
try
{
- UploadFromString(blob, text);
+ UploadFromString(blob, Text);
_fileSystem.CopyToLocal(helloFilePath, tempFilePath);
Assert.True(File.Exists(tempFilePath));
- Assert.Equal(text, File.ReadAllText(tempFilePath));
+ Assert.Equal(Text, File.ReadAllText(tempFilePath));
}
finally
{
@@ -149,17 +183,17 @@ namespace Org.Apache.REEF.IO.Tests
}
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestCopyFromLocalE2E()
{
var helloFilePath = PathToFile(HelloFile);
ICloudBlob blob = _container.GetBlockBlobReference(HelloFile);
Assert.False(CheckBlobExists(blob));
var tempFilePath = GetTempFilePath();
- const string text = "hello";
+ const string Text = "hello";
try
{
- File.WriteAllText(tempFilePath, text);
+ File.WriteAllText(tempFilePath, Text);
_fileSystem.CopyFromLocal(tempFilePath, helloFilePath);
blob = GetBlobReferenceFromServer(_container, HelloFile);
Assert.True(CheckBlobExists(blob));
@@ -171,7 +205,7 @@ namespace Org.Apache.REEF.IO.Tests
using (var sr = new StreamReader(stream))
{
var matchingText = sr.ReadToEnd();
- Assert.Equal(text, matchingText);
+ Assert.Equal(Text, matchingText);
}
}
}
@@ -181,29 +215,29 @@ namespace Org.Apache.REEF.IO.Tests
}
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestDeleteDirectoryAtContainerE2E()
{
_fileSystem.DeleteDirectory(_container.Uri);
Assert.False(CheckContainerExists(_container));
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestDeleteDirectoryFirstLevelE2E()
{
- const string directory = "dir";
+ const string Directory = "dir";
var blockBlobs = new List<CloudBlockBlob>();
for (var i = 0; i < 3; i++)
{
- var filePath = directory + '/' + i;
+ var filePath = Directory + '/' + i;
var blockBlob = _container.GetBlockBlobReference(filePath);
UploadFromString(blockBlob, "hello");
Assert.True(CheckBlobExists(blockBlob));
blockBlobs.Add(blockBlob);
}
- _fileSystem.DeleteDirectory(PathToFile(directory));
+ _fileSystem.DeleteDirectory(PathToFile(Directory));
foreach (var blockBlob in blockBlobs)
{
@@ -213,18 +247,18 @@ namespace Org.Apache.REEF.IO.Tests
Assert.True(CheckContainerExists(_container));
}
- [Fact(Skip = "Fill in credentials before running test")]
+ [Fact(Skip = SkipMessage)]
public void TestDeleteDirectorySecondLevelE2E()
{
- const string directory1 = "dir1";
- const string directory2 = "dir2";
+ const string Directory1 = "dir1";
+ const string Directory2 = "dir2";
var blockBlobs1 = new List<CloudBlockBlob>();
var blockBlobs2 = new List<CloudBlockBlob>();
for (var i = 0; i < 3; i++)
{
- var filePath1 = directory1 + '/' + i;
- var filePath2 = directory1 + '/' + directory2 + '/' + i;
+ var filePath1 = Directory1 + '/' + i;
+ var filePath2 = Directory1 + '/' + Directory2 + '/' + i;
var blockBlob1 = _container.GetBlockBlobReference(filePath1);
var blockBlob2 = _container.GetBlockBlobReference(filePath2);
UploadFromString(blockBlob1, "hello");
@@ -235,7 +269,7 @@ namespace Org.Apache.REEF.IO.Tests
blockBlobs2.Add(blockBlob2);
}
- _fileSystem.DeleteDirectory(PathToFile(directory1 + '/' + directory2));
+ _fileSystem.DeleteDirectory(PathToFile(Directory1 + '/' + Directory2));
foreach (var blockBlob in blockBlobs2)
{
http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
index b5f659c..81a7c1f 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
@@ -42,19 +42,19 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
}
/// <summary>
- /// Not supported for Azure Blobs, will throw <see cref="NotSupportedException"/>.
+ /// Returns a Stream object to the blob specified by the fileUri.
/// </summary>
public Stream Open(Uri fileUri)
{
- throw new NotSupportedException("Open is not supported for AzureBlockBlobFileSystem.");
+ return _client.GetBlockBlobReference(fileUri).Open();
}
/// <summary>
- /// Not supported for Azure Blobs, will throw <see cref="NotSupportedException"/>.
+ /// Creates a blob for the specified fileUri and returns a write Stream object to it.
/// </summary>
public Stream Create(Uri fileUri)
{
- throw new NotSupportedException("Open is not supported for AzureBlockBlobFileSystem.");
+ return _client.GetBlockBlobReference(fileUri).Create();
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
index c4b9c6d..b2289b6 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
@@ -59,6 +59,16 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
_blob = new CloudBlockBlob(uri, credentials);
}
+ public Stream Open()
+ {
+ return _blob.OpenRead();
+ }
+
+ public Stream Create()
+ {
+ return _blob.OpenWrite();
+ }
+
public bool Exists()
{
var task = _blob.ExistsAsync();
http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
index 2941e6e..ec102cc 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
@@ -46,6 +46,20 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
CopyState CopyState { get; }
/// <summary>
+ /// Opens a stream to the blob content.
+ /// </summary>
+ /// <returns>System.IO.Stream object.</returns>
+ /// <exception cref="StorageException">If blob does not exist</exception>
+ Stream Open();
+
+ /// <summary>
+ /// Creates a blob and returns a write Stream object to it.
+ /// </summary>
+ /// <returns>System.IO.Stream object.</returns>
+ /// <exception cref="StorageException">If blob cannot be created</exception>
+ Stream Create();
+
+ /// <summary>
/// Makes a round trip to the server to test if the blob exists.
/// </summary>
/// <returns>True if exists. False otherwise.</returns>