You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2015/11/19 02:20:20 UTC
incubator-reef git commit: [REEF-940] Create .NET job resource
uploader using IFileSystem
Repository: incubator-reef
Updated Branches:
refs/heads/master d169b2591 -> b99b96a00
[REEF-940] Create .NET job resource uploader using IFileSystem
JIRA:
[REEF-940](https://issues.apache.org/jira/browse/REEF-940)
Pull Request:
Closes #637
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b99b96a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b99b96a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b99b96a0
Branch: refs/heads/master
Commit: b99b96a00f4a02918876fb7fcf49cff52346a1c4
Parents: d169b25
Author: Anupam <an...@gmail.com>
Authored: Fri Nov 13 11:57:22 2015 -0800
Committer: Andrew Chung <af...@gmail.com>
Committed: Wed Nov 18 17:18:30 2015 -0800
----------------------------------------------------------------------
.../JobResourceUploaderTests.cs | 130 +++++++++++++++++++
.../LegacyJobResourceUploaderTests.cs | 2 +-
.../Org.Apache.REEF.Client.Tests.csproj | 9 +-
.../Org.Apache.REEF.Client.csproj | 5 +
.../YARN/IJobSubmissionDirectoryProvider.cs | 4 +-
.../YARN/JobSubmissionDirectoryProvider.cs | 2 +-
.../YARN/LegacyJobResourceUploader.cs | 2 +-
.../RESTClient/FileSystemJobResourceUploader.cs | 81 ++++++++++++
8 files changed, 228 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
new file mode 100644
index 0000000..3479842
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/JobResourceUploaderTests.cs
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using NSubstitute;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Client.Tests
+{
+ [TestClass]
+ public class JobResourceUploaderTests
+ {
+ private const string AnyDriverLocalFolderPath = @"Any\Local\Folder\Path\";
+ private const string AnyDriverResourceUploadPath = "/vol1/tmp/";
+ private const string AnyUploadedResourcePath = "/vol1/tmp/Path.zip";
+ private const string AnyHost = "host";
+ private const string AnyScheme = "hdfs://";
+ private const string AnyUploadedResourceAbsoluteUri = AnyScheme + AnyHost + AnyUploadedResourcePath;
+ private const string AnyLocalArchivePath = @"Any\Local\Archive\Path.zip";
+ private const long AnyModificationTime = 1447413621;
+ private const long AnyResourceSize = 53092;
+ private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
+
+ [TestMethod]
+ public void JobResourceUploaderCanInstantiateWithDefaultBindings()
+ {
+ TangFactory.GetTang().NewInjector().GetInstance<FileSystemJobResourceUploader>();
+ }
+
+ [TestMethod]
+ public void UploadJobResourceCreatesResourceArchive()
+ {
+ var testContext = new TestContext();
+ var jobResourceUploader = testContext.GetJobResourceUploader();
+
+ jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+ // Archive file generator recieved exactly one call with correct driver local folder path
+ testContext.ResourceArchiveFileGenerator.Received(1).CreateArchiveToUpload(AnyDriverLocalFolderPath);
+ }
+
+ [TestMethod]
+ public void UploadJobResourceReturnsJobResourceDetails()
+ {
+ var testContext = new TestContext();
+ var jobResourceUploader = testContext.GetJobResourceUploader();
+
+ var jobResource = jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+ Assert.AreEqual(AnyModificationTime, jobResource.LastModificationUnixTimestamp);
+ Assert.AreEqual(AnyResourceSize, jobResource.ResourceSize);
+ Assert.AreEqual(AnyUploadedResourceAbsoluteUri, jobResource.RemoteUploadPath);
+ }
+
+ [TestMethod]
+ public void UploadJobResourceMakesCorrectFileSystemCalls()
+ {
+ var testContext = new TestContext();
+ var jobResourceUploader = testContext.GetJobResourceUploader();
+
+ jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+ testContext.FileSystem.Received(1).CreateUriForPath(AnyDriverResourceUploadPath);
+ testContext.FileSystem.Received(1).CreateUriForPath(AnyUploadedResourcePath);
+ testContext.FileSystem.Received(1)
+ .CopyFromLocal(AnyLocalArchivePath, new Uri(AnyUploadedResourceAbsoluteUri));
+ testContext.FileSystem.Received(1)
+ .CreateDirectory(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath));
+ }
+
+ [TestMethod]
+ public void UploadJobResourceCallsJobSubmissionDirProvider()
+ {
+ var testContext = new TestContext();
+ var jobResourceUploader = testContext.GetJobResourceUploader();
+
+ jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
+
+ testContext.JobSubmissionDirectoryProvider.Received(1).GetJobSubmissionRemoteDirectory();
+ }
+
+ private class TestContext
+ {
+
+ public readonly IJobSubmissionDirectoryProvider JobSubmissionDirectoryProvider =
+ Substitute.For<IJobSubmissionDirectoryProvider>();
+ public readonly IResourceArchiveFileGenerator ResourceArchiveFileGenerator =
+ Substitute.For<IResourceArchiveFileGenerator>();
+ public readonly IFileSystem FileSystem = Substitute.For<IFileSystem>();
+
+ public FileSystemJobResourceUploader GetJobResourceUploader()
+ {
+ var injector = TangFactory.GetTang().NewInjector();
+ JobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().Returns(AnyDriverResourceUploadPath);
+ FileSystem.GetFileStatus(new Uri(AnyUploadedResourceAbsoluteUri))
+ .Returns(new FileStatus(Epoch + TimeSpan.FromSeconds(AnyModificationTime), AnyResourceSize));
+ ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath)
+ .Returns(AnyLocalArchivePath);
+ FileSystem.CreateUriForPath(AnyDriverResourceUploadPath)
+ .Returns(new Uri(AnyScheme + AnyHost + AnyDriverResourceUploadPath));
+ FileSystem.CreateUriForPath(AnyUploadedResourcePath)
+ .Returns(new Uri(AnyUploadedResourceAbsoluteUri));
+ injector.BindVolatileInstance(GenericType<IJobSubmissionDirectoryProvider>.Class, JobSubmissionDirectoryProvider);
+ injector.BindVolatileInstance(GenericType<IResourceArchiveFileGenerator>.Class, ResourceArchiveFileGenerator);
+ injector.BindVolatileInstance(GenericType<IFileSystem>.Class, FileSystem);
+ return injector.GetInstance<FileSystemJobResourceUploader>();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
index e76ae66..9b4e4f1 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/LegacyJobResourceUploaderTests.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.REEF.Client.Tests
const string anyLocalArchivePath = @"Any\Local\Archive\Path.zip";
testContext.ResourceArchiveFileGenerator.CreateArchiveToUpload(AnyDriverLocalFolderPath + @"\")
.Returns(anyLocalArchivePath);
- testContext.JobSubmissionDirectoryProvider.GetJobSubmissionDirectory().Returns(AnyDriverResourceUploadPath);
+ testContext.JobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().Returns(AnyDriverResourceUploadPath);
jobResourceUploader.UploadJobResource(AnyDriverLocalFolderPath);
const string javaClassNameForResourceUploader = @"org.apache.reef.bridge.client.JobResourceUploader";
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
index e7af026..8094981 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
@@ -47,6 +47,7 @@ under the License.
<Reference Include="System.ServiceProcess" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="JobResourceUploaderTests.cs" />
<Compile Include="LegacyJobResourceUploaderTests.cs" />
<Compile Include="MultipleRMUrlProviderTests.cs" />
<Compile Include="WindowsHadoopEmulatorYarnClientTests.cs" />
@@ -55,11 +56,15 @@ under the License.
<Compile Include="YarnConfigurationUrlProviderTests.cs" />
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
<Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project>
<Name>Org.Apache.REEF.Client</Name>
</ProjectReference>
- <ProjectReference Include="..\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+ <Project>{DEC0F0A8-DBEF-4EBF-B69C-E2369C15ABF1}</Project>
+ <Name>Org.Apache.REEF.IO</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
<Project>{97DBB573-3994-417A-9F69-FFA25F00D2A6}</Project>
<Name>Org.Apache.REEF.Tang</Name>
</ProjectReference>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index e456716..910ae2d 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -113,6 +113,7 @@ under the License.
<Compile Include="YARN\RestClient\DataModel\Tokens.cs" />
<Compile Include="YARN\RestClient\IRestRequestExecutor.cs" />
<Compile Include="YARN\RestClient\IUrlProvider.cs" />
+ <Compile Include="YARN\RestClient\FileSystemJobResourceUploader.cs" />
<Compile Include="YARN\RestClient\MultipleRMUrlProvider.cs" />
<Compile Include="YARN\RestClient\RestJsonDeserializer.cs" />
<Compile Include="YARN\RestClient\RestJsonSerializer.cs" />
@@ -181,6 +182,10 @@ under the License.
<Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project>
<Name>Org.Apache.REEF.Examples</Name>
</ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+ <Project>{DEC0F0A8-DBEF-4EBF-B69C-E2369C15ABF1}</Project>
+ <Name>Org.Apache.REEF.IO</Name>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="$(TempResxFile)">
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
index b702e79..3faa62d 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/IJobSubmissionDirectoryProvider.cs
@@ -26,9 +26,9 @@ namespace Org.Apache.REEF.Client.Yarn
public interface IJobSubmissionDirectoryProvider
{
/// <summary>
- /// Returns path to job submission directory.
+ /// Returns path to job submission directory in DFS.
/// </summary>
/// <returns></returns>
- string GetJobSubmissionDirectory();
+ string GetJobSubmissionRemoteDirectory();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
index 4a5c59a..fe1b007 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/JobSubmissionDirectoryProvider.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.REEF.Client.Yarn
_jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
}
- public string GetJobSubmissionDirectory()
+ public string GetJobSubmissionRemoteDirectory()
{
return string.Format(@"{0}/{1}{2}/",
_jobSubmissionDirectoryPrefix,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
index 22e32a1..0c01709 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/LegacyJobResourceUploader.cs
@@ -59,7 +59,7 @@ namespace Org.Apache.REEF.Client.Yarn
public JobResource UploadJobResource(string driverLocalFolderPath)
{
driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
- string driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionDirectory().TrimEnd('/') + @"/";
+ string driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().TrimEnd('/') + @"/";
Log.Log(Level.Info, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);
var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b99b96a0/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
new file mode 100644
index 0000000..cfa83b0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN.RestClient
+{
+ /// <summary>
+ /// Provides FileSystem agnostic job resource uploader.
+ /// User can provide custome implementation of
+ /// <see cref="IFileSystem"/> for their choice of DFS.
+ /// </summary>
+ internal sealed class FileSystemJobResourceUploader : IJobResourceUploader
+ {
+ private static readonly Logger Log = Logger.GetLogger(typeof(FileSystemJobResourceUploader));
+ private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
+ private readonly IJobSubmissionDirectoryProvider _jobSubmissionDirectoryProvider;
+ private readonly IResourceArchiveFileGenerator _resourceArchiveFileGenerator;
+ private readonly IFileSystem _fileSystem;
+
+ [Inject]
+ private FileSystemJobResourceUploader(
+ IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider,
+ IResourceArchiveFileGenerator resourceArchiveFileGenerator,
+ IFileSystem fileSystem)
+ {
+ _fileSystem = fileSystem;
+ _resourceArchiveFileGenerator = resourceArchiveFileGenerator;
+ _jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider;
+ }
+
+ public JobResource UploadJobResource(string driverLocalFolderPath)
+ {
+ driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
+ var driverUploadPath = _jobSubmissionDirectoryProvider.GetJobSubmissionRemoteDirectory().TrimEnd('/') + @"/";
+ Log.Log(Level.Verbose, "DriverFolderPath: {0} DriverUploadPath: {1}", driverLocalFolderPath, driverUploadPath);
+ var archivePath = _resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+
+ var destinationPath = driverUploadPath + Path.GetFileName(archivePath);
+ var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath);
+ Log.Log(Level.Verbose, @"Copy {0} to {1}", archivePath, remoteFileUri);
+
+ var parentDirectoryUri = _fileSystem.CreateUriForPath(driverUploadPath);
+ _fileSystem.CreateDirectory(parentDirectoryUri);
+ _fileSystem.CopyFromLocal(archivePath, remoteFileUri);
+ var fileStatus = _fileSystem.GetFileStatus(remoteFileUri);
+
+ return new JobResource
+ {
+ LastModificationUnixTimestamp = DateTimeToUnixTimestamp(fileStatus.ModificationTime),
+ RemoteUploadPath = remoteFileUri.AbsoluteUri,
+ ResourceSize = fileStatus.LengthBytes
+ };
+ }
+
+ private long DateTimeToUnixTimestamp(DateTime dateTime)
+ {
+ return (long) (dateTime - Epoch).TotalSeconds;
+ }
+ }
+}
\ No newline at end of file