You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/02/12 21:12:47 UTC

reef git commit: [REEF-1975] Support stream option for Azure Blob storage

Repository: reef
Updated Branches:
  refs/heads/master d45954935 -> 0348e54ec


[REEF-1975] Support stream option for Azure Blob storage

   * Support stream option for Azure Blob storage
   * Updated Open test to assert on stream Stream to blobs (#8)
   * Implement Create method to create a blob and return a write stream to the blob

JIRA: [REEF-1975](https://issues.apache.org/jira/browse/REEF-1975)

Closes #1426


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/0348e54e
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/0348e54e
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/0348e54e

Branch: refs/heads/master
Commit: 0348e54ec13bd8f267670167af55b8f6797b24bd
Parents: d459549
Author: dwaijam <dw...@gmail.com>
Authored: Thu Jan 18 10:30:16 2018 -0800
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Mon Feb 12 12:57:17 2018 -0800

----------------------------------------------------------------------
 .../TestAzureBlockBlobFileSystem.cs             |  20 +++-
 .../TestAzureBlockBlobFileSystemE2E.cs          | 102 ++++++++++++-------
 .../AzureBlob/AzureBlockBlobFileSystem.cs       |   8 +-
 .../FileSystem/AzureBlob/AzureCloudBlockBlob.cs |  10 ++
 .../FileSystem/AzureBlob/ICloudBlockBlob.cs     |  14 +++
 5 files changed, 111 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
index a1f9b34..dc48d5e 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
@@ -36,18 +36,24 @@ namespace Org.Apache.REEF.IO.Tests
     /// </summary>
     public sealed class TestAzureBlockBlobFileSystem
     {
-        private readonly static Uri FakeUri = new Uri("http://fake.com");
+        private static readonly Uri FakeUri = new Uri("http://fake.com");
 
         [Fact]
-        public void TestCreateNotSupported()
+        public void TestCreate()
         {
-            Assert.Throws<NotSupportedException>(() => new TestContext().GetAzureFileSystem().Create(FakeUri));
+            var testContext = new TestContext();
+            Stream stream = testContext.GetAzureFileSystem().Create(new Uri(FakeUri, "container/file"));
+            testContext.TestCloudBlockBlob.Received(1).Create();
+            Assert.Equal(testContext.TestCreateStream, stream);
         }
 
         [Fact]
-        public void TestOpenNotSupported()
+        public void TestOpen()
         {
-            Assert.Throws<NotSupportedException>(() => new TestContext().GetAzureFileSystem().Open(FakeUri));
+            var testContext = new TestContext();
+            Stream stream = testContext.GetAzureFileSystem().Open(new Uri(FakeUri, "container/file"));
+            testContext.TestCloudBlockBlob.Received(1).Open();
+            Assert.Equal(testContext.TestOpenStream, stream);
         }
 
         [Fact]
@@ -133,6 +139,8 @@ namespace Org.Apache.REEF.IO.Tests
             public readonly ICloudBlockBlob TestCloudBlockBlob = Substitute.For<ICloudBlockBlob>();
             public readonly ICloudBlobContainer TestCloudBlobContainer = Substitute.For<ICloudBlobContainer>();
             public readonly ICloudBlobDirectory TestCloudBlobDirectory = Substitute.For<ICloudBlobDirectory>();
+            public readonly Stream TestOpenStream = Substitute.For<Stream>();
+            public readonly Stream TestCreateStream = Substitute.For<Stream>();
 
             public IFileSystem GetAzureFileSystem()
             {
@@ -144,6 +152,8 @@ namespace Org.Apache.REEF.IO.Tests
                 injector.BindVolatileInstance(TestCloudBlobClient);
                 var fs = injector.GetInstance<AzureBlockBlobFileSystem>();
                 TestCloudBlobClient.BaseUri.ReturnsForAnyArgs(FakeUri);
+                TestCloudBlockBlob.Open().Returns(TestOpenStream);
+                TestCloudBlockBlob.Create().Returns(TestCreateStream);
                 TestCloudBlobClient.GetBlockBlobReference(FakeUri).ReturnsForAnyArgs(TestCloudBlockBlob);
                 TestCloudBlobClient.GetContainerReference("container").ReturnsForAnyArgs(TestCloudBlobContainer);
                 TestCloudBlobContainer.GetDirectoryReference("directory").ReturnsForAnyArgs(TestCloudBlobDirectory);

http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
index 8e8a708..7b749e4 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
@@ -34,6 +34,7 @@ namespace Org.Apache.REEF.IO.Tests
     /// </summary>
     public sealed class TestAzureBlockBlobFileSystemE2E : IDisposable
     {
+        private const string SkipMessage = "Fill in credentials before running test"; // Use null to run tests
         private const string HelloFile = "hello";
         private IFileSystem _fileSystem;
         private CloudBlobContainer _container;
@@ -41,14 +42,14 @@ namespace Org.Apache.REEF.IO.Tests
         public TestAzureBlockBlobFileSystemE2E()
         {
             // Fill in before running test!
-            const string connectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;";
+            const string ConnectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;";
             var defaultContainerName = "reef-test-container-" + Guid.NewGuid();
             var conf = AzureBlockBlobFileSystemConfiguration.ConfigurationModule
-                .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, connectionString)
+                .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, ConnectionString)
                 .Build();
 
             _fileSystem = TangFactory.GetTang().NewInjector(conf).GetInstance<AzureBlockBlobFileSystem>();
-            _container = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName);
+            _container = CloudStorageAccount.Parse(ConnectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName);
             _container.CreateIfNotExistsAsync().Wait();
         }
 
@@ -88,7 +89,40 @@ namespace Org.Apache.REEF.IO.Tests
             return task.Result;
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
+        public void TestOpenE2E()
+        {
+            const string Text = "hello";
+            var blob = _container.GetBlockBlobReference(HelloFile);
+            UploadFromString(blob, Text);
+            Assert.True(CheckBlobExists(blob));
+            using (var reader = new StreamReader(_fileSystem.Open(PathToFile(HelloFile))))
+            {
+                string streamText = reader.ReadToEnd();
+                Assert.Equal(Text, streamText);
+            }
+        }
+
+        [Fact(Skip = SkipMessage)]
+        public void TestCreateE2E()
+        {
+            const string Text = "Hello Azure Blob";
+            var blob = _container.GetBlockBlobReference(HelloFile);
+            Assert.False(CheckBlobExists(blob));
+            using (var streamWriter = new StreamWriter(_fileSystem.Create(PathToFile(HelloFile))))
+            {
+                streamWriter.Write(Text);
+            }
+            blob = _container.GetBlockBlobReference(HelloFile);
+            Assert.True(CheckBlobExists(blob));
+            using (var reader = new StreamReader(blob.OpenRead()))
+            {
+                string streamText = reader.ReadToEnd();
+                Assert.Equal(Text, streamText);
+            }
+        }
+
+        [Fact(Skip = SkipMessage)]
         public void TestDeleteE2E()
         {
             var blob = _container.GetBlockBlobReference(HelloFile);
@@ -98,7 +132,7 @@ namespace Org.Apache.REEF.IO.Tests
             Assert.False(CheckBlobExists(blob));
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestExistsE2E()
         {
             var helloFilePath = PathToFile(HelloFile);
@@ -109,39 +143,39 @@ namespace Org.Apache.REEF.IO.Tests
             Assert.False(_fileSystem.Exists(helloFilePath));
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestCopyE2E()
         {
-            const string srcFileName = "src";
-            const string destFileName = "dest";
-            var srcFilePath = PathToFile(srcFileName);
-            var destFilePath = PathToFile(destFileName);
-            ICloudBlob srcBlob = _container.GetBlockBlobReference(srcFileName);
+            const string SrcFileName = "src";
+            const string DestFileName = "dest";
+            var srcFilePath = PathToFile(SrcFileName);
+            var destFilePath = PathToFile(DestFileName);
+            ICloudBlob srcBlob = _container.GetBlockBlobReference(SrcFileName);
             UploadFromString(srcBlob, "hello");
             Assert.True(CheckBlobExists(srcBlob));
-            ICloudBlob destBlob = _container.GetBlockBlobReference(destFileName);
+            ICloudBlob destBlob = _container.GetBlockBlobReference(DestFileName);
             Assert.False(CheckBlobExists(destBlob));
             _fileSystem.Copy(srcFilePath, destFilePath);
-            destBlob = GetBlobReferenceFromServer(_container, destFileName);
+            destBlob = GetBlobReferenceFromServer(_container, DestFileName);
             Assert.True(CheckBlobExists(destBlob));
-            srcBlob = GetBlobReferenceFromServer(_container, srcFileName);
+            srcBlob = GetBlobReferenceFromServer(_container, SrcFileName);
             Assert.True(CheckBlobExists(srcBlob));
-            Assert.Equal(DownloadText(_container.GetBlockBlobReference(srcFileName)), DownloadText(_container.GetBlockBlobReference(destFileName)));
+            Assert.Equal(DownloadText(_container.GetBlockBlobReference(SrcFileName)), DownloadText(_container.GetBlockBlobReference(DestFileName)));
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestCopyToLocalE2E()
         {
             var helloFilePath = PathToFile(HelloFile);
             var blob = _container.GetBlockBlobReference(HelloFile);
             var tempFilePath = GetTempFilePath();
-            const string text = "hello";
+            const string Text = "hello";
             try
             {
-                UploadFromString(blob, text);
+                UploadFromString(blob, Text);
                 _fileSystem.CopyToLocal(helloFilePath, tempFilePath);
                 Assert.True(File.Exists(tempFilePath));
-                Assert.Equal(text, File.ReadAllText(tempFilePath));
+                Assert.Equal(Text, File.ReadAllText(tempFilePath));
             }
             finally
             {
@@ -149,17 +183,17 @@ namespace Org.Apache.REEF.IO.Tests
             }
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestCopyFromLocalE2E()
         {
             var helloFilePath = PathToFile(HelloFile);
             ICloudBlob blob = _container.GetBlockBlobReference(HelloFile);
             Assert.False(CheckBlobExists(blob));
             var tempFilePath = GetTempFilePath();
-            const string text = "hello";
+            const string Text = "hello";
             try
             {
-                File.WriteAllText(tempFilePath, text);
+                File.WriteAllText(tempFilePath, Text);
                 _fileSystem.CopyFromLocal(tempFilePath, helloFilePath);
                 blob = GetBlobReferenceFromServer(_container, HelloFile);
                 Assert.True(CheckBlobExists(blob));
@@ -171,7 +205,7 @@ namespace Org.Apache.REEF.IO.Tests
                     using (var sr = new StreamReader(stream))
                     {
                         var matchingText = sr.ReadToEnd();
-                        Assert.Equal(text, matchingText);
+                        Assert.Equal(Text, matchingText);
                     }
                 }
             }
@@ -181,29 +215,29 @@ namespace Org.Apache.REEF.IO.Tests
             }
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestDeleteDirectoryAtContainerE2E()
         {
             _fileSystem.DeleteDirectory(_container.Uri);
             Assert.False(CheckContainerExists(_container));
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestDeleteDirectoryFirstLevelE2E()
         {
-            const string directory = "dir";
+            const string Directory = "dir";
             var blockBlobs = new List<CloudBlockBlob>(); 
 
             for (var i = 0; i < 3; i++)
             {
-                var filePath = directory + '/' + i;
+                var filePath = Directory + '/' + i;
                 var blockBlob = _container.GetBlockBlobReference(filePath);
                 UploadFromString(blockBlob, "hello");
                 Assert.True(CheckBlobExists(blockBlob));
                 blockBlobs.Add(blockBlob);
             }
 
-            _fileSystem.DeleteDirectory(PathToFile(directory));
+            _fileSystem.DeleteDirectory(PathToFile(Directory));
 
             foreach (var blockBlob in blockBlobs)
             {
@@ -213,18 +247,18 @@ namespace Org.Apache.REEF.IO.Tests
             Assert.True(CheckContainerExists(_container));
         }
 
-        [Fact(Skip = "Fill in credentials before running test")]
+        [Fact(Skip = SkipMessage)]
         public void TestDeleteDirectorySecondLevelE2E()
         {
-            const string directory1 = "dir1";
-            const string directory2 = "dir2";
+            const string Directory1 = "dir1";
+            const string Directory2 = "dir2";
             var blockBlobs1 = new List<CloudBlockBlob>();
             var blockBlobs2 = new List<CloudBlockBlob>();
 
             for (var i = 0; i < 3; i++)
             {
-                var filePath1 = directory1 + '/' + i;
-                var filePath2 = directory1 + '/' + directory2 + '/' + i;
+                var filePath1 = Directory1 + '/' + i;
+                var filePath2 = Directory1 + '/' + Directory2 + '/' + i;
                 var blockBlob1 = _container.GetBlockBlobReference(filePath1);
                 var blockBlob2 = _container.GetBlockBlobReference(filePath2);
                 UploadFromString(blockBlob1, "hello");
@@ -235,7 +269,7 @@ namespace Org.Apache.REEF.IO.Tests
                 blockBlobs2.Add(blockBlob2);
             }
 
-            _fileSystem.DeleteDirectory(PathToFile(directory1 + '/' + directory2));
+            _fileSystem.DeleteDirectory(PathToFile(Directory1 + '/' + Directory2));
 
             foreach (var blockBlob in blockBlobs2)
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
index b5f659c..81a7c1f 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
@@ -42,19 +42,19 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
         }
 
         /// <summary>
-        /// Not supported for Azure Blobs, will throw <see cref="NotSupportedException"/>.
+        /// Returns a Stream object to the blob specified by the fileUri.
         /// </summary>
         public Stream Open(Uri fileUri)
         {
-            throw new NotSupportedException("Open is not supported for AzureBlockBlobFileSystem.");
+            return _client.GetBlockBlobReference(fileUri).Open();
         }
 
         /// <summary>
-        /// Not supported for Azure Blobs, will throw <see cref="NotSupportedException"/>.
+        /// Creates a blob for the specified fileUri and returns a write Stream object to it.
         /// </summary>
         public Stream Create(Uri fileUri)
         {
-            throw new NotSupportedException("Open is not supported for AzureBlockBlobFileSystem.");
+            return _client.GetBlockBlobReference(fileUri).Create();
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
index c4b9c6d..b2289b6 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
@@ -59,6 +59,16 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
             _blob = new CloudBlockBlob(uri, credentials);
         }
 
+        public Stream Open()
+        {
+            return _blob.OpenRead();
+        }
+
+        public Stream Create()
+        {
+            return _blob.OpenWrite();
+        }
+
         public bool Exists()
         {
             var task = _blob.ExistsAsync();

http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
index 2941e6e..ec102cc 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs
@@ -46,6 +46,20 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
         CopyState CopyState { get; }
 
         /// <summary>
+        /// Opens a stream to the blob content.
+        /// </summary>
+        /// <returns>System.IO.Stream object.</returns>
+        /// <exception cref="StorageException">If blob does not exist</exception>
+        Stream Open();
+
+        /// <summary>
+        /// Creates a blob and returns a write Stream object to it.
+        /// </summary>
+        /// <returns>System.IO.Stream object.</returns>
+        /// <exception cref="StorageException">If blob cannot be created</exception>
+        Stream Create();
+
+        /// <summary>
         /// Makes a round trip to the server to test if the blob exists.
         /// </summary>
         /// <returns>True if exists. False otherwise.</returns>