You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2015/11/19 02:20:20 UTC

incubator-reef git commit: [REEF-940] Create .NET job resource uploader using IFileSystem

Repository: incubator-reef
Updated Branches:
  refs/heads/master d169b2591 -> b99b96a00


[REEF-940] Create .NET job resource uploader using IFileSystem

JIRA:
 [REEF-940](https://issues.apache.org/jira/browse/REEF-940)

Pull Request:
  Closes #637


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b99b96a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b99b96a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b99b96a0

Branch: refs/heads/master
Commit: b99b96a00f4a02918876fb7fcf49cff52346a1c4
Parents: d169b25
Author: Anupam <an...@gmail.com>
Authored: Fri Nov 13 11:57:22 2015 -0800
Committer: Andrew Chung <af...@gmail.com>
Committed: Wed Nov 18 17:18:30 2015 -0800

----------------------------------------------------------------------
 .../JobResourceUploaderTests.cs                 | 130 +++++++++++++++++++
 .../LegacyJobResourceUploaderTests.cs           |   2 +-
 .../Org.Apache.REEF.Client.Tests.csproj         |   9 +-
 .../Org.Apache.REEF.Client.csproj               |   5 +
 .../YARN/IJobSubmissionDirectoryProvider.cs     |   4 +-
 .../YARN/JobSubmissionDirectoryProvider.cs      |   2 +-
 .../YARN/LegacyJobResourceUploader.cs           |   2 +-
 .../RESTClient/FileSystemJobResourceUploader.cs |  81 ++++++++++++
 8 files changed, 228 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
new file mode 100644
index 0000000..3479842
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using NSubstitute;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Client.Tests
+{
+    [TestClass]
+    public class JobResourceUploaderTests
+    {
+        private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path\";
+        private const string AnyDriverResourceUploadPath = "/vol1/tmp/";
+        private const string AnyUploadedResourcePath = "/vol1/tmp/Path.zip";
+        private const string AnyHost = "host";
+        private const string AnyScheme = "hdfs://";
+        private const string AnyUploadedResourceAbsoluteUri = AnyScheme + AnyHost + AnyUploadedResourcePath;
+        private const string AnyLocalArchivePath = @"Any\Local\Archive\Path.zip";
+        private const long AnyModificationTime = 1447413621;
+        private const long AnyResourceSize = 53092;
+        private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
+
+        [TestMethod]
+        public void JobResourceUploaderCanInstantiateWithDefaultBindings()
+        {
+            TangFactory.GetTang().NewInjector().GetInstance<FileSystemJobResourceUploader>();
+        }
+
+        [TestMethod]
+        public void UploadJobResourceCreatesResourceArchive()
+        {
+            var testContext = new TestContext();
+            var jobResourceUploader = testContext.GetJobResourceUploader();
+
+            jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+            // Archive file generator recieved exactly one call with correct driver local folder path
+            testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath);
+        }
+
+        [TestMethod]
+        public void UploadJobResourceReturnsJobResourceDetails()
+        {
+            var testContext = new TestContext();
+            var jobResourceUploader = testContext.GetJobResourceUploader();
+
+            var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+            Assert.AreEqual(AnyModificationTime, jobResource.LastModificationUnixTimestamp);
+            Assert.AreEqual(AnyResourceSize, jobResource.ResourceSize);
+            Assert.AreEqual(AnyUploadedResourceAbsoluteUri, jobResource.RemoteUploadPath);
+        }
+
+        [TestMethod]
+        public void UploadJobResourceMakesCorrectFileSystemCalls()
+        {
+            var testContext = new TestContext();
+            var jobResourceUploader = testContext.GetJobResourceUploader();
+
+            jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+            testContext.FileSystem.Received(1).CreateUriForPath(AnyDriverResourceUploadPath);
+            testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath);
+            testContext.FileSystem.Received(1)
+                .CopyFromLocal(AnyLocalArchivePath, new Uri(AnyUploadedResourceAbsoluteUri));
+            testContext.FileSystem.Received(1)
+                .CreateDirectory(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath));
+        }
+
+        [TestMethod]
+        public void UploadJobResourceCallsJobSubmissionDirProvider()
+        {
+            var testContext = new TestContext();
+            var jobResourceUploader = testContext.GetJobResourceUploader();
+
+            jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+            testContext.JobSubmissionDirectoryProvider.Received(1).GetJobSubmissionRemoteDirectory();
+        }
+
+        private class TestContext
+        {
+
+            public readonly IJobSubmissionDirectoryProvider JobSubmissionDirectoryProvider =
+                Substitute.For<IJobSubmissionDirectoryProvider>();
+            public readonly IResourceArchiveFileGenerator ResourceArchiveFileGenerator =
+                Substitute.For<IResourceArchiveFileGenerator>();
+            public readonly IFileSystem FileSystem = Substitute.For<IFileSystem>();
+
+            public FileSystemJobResourceUploader GetJobResourceUploader()
+            {
+                var injector = TangFactory.GetTang().NewInjector();
+                JobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().Returns(AnyDriverResourceUploadPath);
+                FileSystem.GetFileStatus(new Uri(AnyUploadedResourceAbsoluteUri))
+                    .Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize));
+                ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath)
+                    .Returns(AnyLocalArchivePath);
+                FileSystem.CreateUriForPath(AnyDriverResourceUploadPath)
+                    .Returns(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath));
+                FileSystem.CreateUriForPath(AnyUploadedResourcePath)
+                    .Returns(new Uri(AnyUploadedResourceAbsoluteUri));
+                injector.BindVolatileInstance(GenericType<IJobSubmissionDirectoryProvider>.Class, JobSubmissionDirectoryProvider);
+                injector.BindVolatileInstance(GenericType<IResourceArchiveFileGenerator>.Class, ResourceArchiveFileGenerator);
+                injector.BindVolatileInstance(GenericType<IFileSystem>.Class, FileSystem);
+                return injector.GetInstance<FileSystemJobResourceUploader>();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
index e76ae66..9b4e4f1 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.REEF.Client.Tests
             const string anyLocalArchivePath = @"Any\Local\Archive\Path.zip";
             testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\")
                 .Returns(anyLocalArchivePath);
-            testContext.JobSubmissionDirectoryProvider.GetJobSubmissionDirectory().Returns(AnyDriverResourceUploadPath);
+            testContext.JobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().Returns(AnyDriverResourceUploadPath);
             jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
 
             const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader";

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
index e7af026..8094981 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
@@ -47,6 +47,7 @@ under the License.
     <Reference Include="System.ServiceProcess" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="JobResourceUploaderTests.cs" />
     <Compile Include="LegacyJobResourceUploaderTests.cs" />
     <Compile Include="MultipleRMUrlProviderTests.cs" />
     <Compile Include="WindowsHadoopEmulatorYarnClientTests.cs" />
@@ -55,11 +56,15 @@ under the License.
     <Compile Include="YarnConfigurationUrlProviderTests.cs" />
   </ItemGroup>
   <ItemGroup>
-    <ProjectReference Include="..\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
       <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project>
       <Name>Org.Apache.REEF.Client</Name>
     </ProjectReference>
-    <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+      <Project>{DEC0F0A8-DBEF-4EBF-B69C-E2369C15ABF1}</Project>
+      <Name>Org.Apache.REEF.IO</Name>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
       <Project>{97DBB573-3994-417A-9F69-FFA25F00D2A6}</Project>
       <Name>Org.Apache.REEF.Tang</Name>
     </ProjectReference>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index e456716..910ae2d 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -113,6 +113,7 @@ under the License.
     <Compile Include="YARN\RestClient\DataModel\Tokens.cs" />
     <Compile Include="YARN\RestClient\IRestRequestExecutor.cs" />
     <Compile Include="YARN\RestClient\IUrlProvider.cs" />
+    <Compile Include="YARN\RestClient\FileSystemJobResourceUploader.cs" />
     <Compile Include="YARN\RestClient\MultipleRMUrlProvider.cs" />
     <Compile Include="YARN\RestClient\RestJsonDeserializer.cs" />
     <Compile Include="YARN\RestClient\RestJsonSerializer.cs" />
@@ -181,6 +182,10 @@ under the License.
       <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project>
       <Name>Org.Apache.REEF.Examples</Name>
     </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+      <Project>{DEC0F0A8-DBEF-4EBF-B69C-E2369C15ABF1}</Project>
+      <Name>Org.Apache.REEF.IO</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup>
     <EmbeddedResource Include="$(TempResxFile)">

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
index b702e79..3faa62d 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
@@ -26,9 +26,9 @@ namespace Org.Apache.REEF.Client.Yarn
     public interface IJobSubmissionDirectoryProvider
     {
         /// <summary>
-        /// Returns path to job submission directory.
+        /// Returns path to job submission directory in DFS.
         /// </summary>
         /// <returns></returns>
-        string GetJobSubmissionDirectory();
+        string GetJobSubmissionRemoteDirectory();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
index 4a5c59a..fe1b007 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Client.Yarn
             _jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
         }
 
-        public string GetJobSubmissionDirectory()
+        public string GetJobSubmissionRemoteDirectory()
         {
             return string.Format(@"{0}/{1}{2}/",
                 _jobSubmissionDirectoryPrefix,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
index 22e32a1..0c01709 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
@@ -59,7 +59,7 @@ namespace Org.Apache.REEF.Client.Yarn
         public JobResource UploadJobResource(string driverLocalFolderPath)
         {
             driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
-            string driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionDirectory().TrimEnd('/') + @"/";
+            string driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().TrimEnd('/') + @"/";
             Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);
 
             var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
new file mode 100644
index 0000000..cfa83b0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN.RestClient
+{
+    /// <summary>
+    /// Provides FileSystem agnostic job resource uploader.
+    /// User can provide custome implementation of 
+    /// <see cref="IFileSystem"/> for their choice of DFS.
+    /// </summary>
+    internal sealed class FileSystemJobResourceUploader : IJobResourceUploader
+    {
+        private static readonly Logger Log = Logger.GetLogger(typeof(FileSystemJobResourceUploader));
+        private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
+        private readonly IJobSubmissionDirectoryProvider _jobSubmissionDirectoryProvider;
+        private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator;
+        private readonly IFileSystem _fileSystem;
+
+        [Inject]
+        private FileSystemJobResourceUploader(
+            IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider,
+            IResourceArchiveFileGenerator resourceArchiveFileGenerator,
+            IFileSystem fileSystem)
+        {
+            _fileSystem = fileSystem;
+            _resourceArchiveFileGenerator = resourceArchiveFileGenerator;
+            _jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider;
+        }
+
+        public JobResource UploadJobResource(string driverLocalFolderPath)
+        {
+            driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
+            var driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().TrimEnd('/') + @"/";
+            Log.Log(Level.Verbose, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);
+            var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+
+            var destinationPath = driverUploadPath + Path.GetFileName(archivePath);
+            var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath);
+            Log.Log(Level.Verbose, @"Copy {0} to {1}", archivePath, remoteFileUri);
+
+            var parentDirectoryUri = _fileSystem.CreateUriForPath(driverUploadPath);
+            _fileSystem.CreateDirectory(parentDirectoryUri);
+            _fileSystem.CopyFromLocal(archivePath, remoteFileUri);
+            var fileStatus = _fileSystem.GetFileStatus(remoteFileUri);
+
+            return new JobResource
+            {
+                LastModificationUnixTimestamp = DateTimeToUnixTimestamp(fileStatus.ModificationTime),
+                RemoteUploadPath = remoteFileUri.AbsoluteUri,
+                ResourceSize = fileStatus.LengthBytes
+            };
+        }
+
+        private long DateTimeToUnixTimestamp(DateTime dateTime)
+        {
+            return (long) (dateTime - Epoch).TotalSeconds;
+        }
+    }
+}
\ No newline at end of file