You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/11/12 03:10:18 UTC
[2/2] incubator-reef git commit: [REEF-874] return application status
after job is submitted
[REEF-874] return application status after job is submitted
* This PR added GetJobFinalStatus() in IREEFClient and Yarn implementations
* It added MultipleRMUrlProvider to enumerate all the http addresses from env
* ApplicationFinalState is added which matches those from Yarn
* Test cases are added for the change
JIRA:
[REEF-874](https://issues.apache.org/jira/browse/REEF-874)
Pull Request:
This closes #594
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4fbc9038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4fbc9038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4fbc9038
Branch: refs/heads/master
Commit: 4fbc9038e509ef4971fa2cea2456f3ef93c0ce86
Parents: ef5403a
Author: Julia Wang <ju...@microsoft.com>
Authored: Wed Nov 11 17:32:48 2015 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Nov 11 18:08:39 2015 -0800
----------------------------------------------------------------------
.../MultipleRMUrlProviderTests.cs | 117 +++++++
.../Org.Apache.REEF.Client.Tests.csproj | 1 +
.../YarnClientTests.cs | 60 +++-
.../YarnConfigurationUrlProviderTests.cs | 28 +-
.../Org.Apache.REEF.Client/API/IREEFClient.cs | 16 +-
.../Common/HttpClientHelper.cs | 344 -------------------
.../Common/IDriverHttpEndpoint.cs | 31 --
.../Common/IJobSubmissionResult.cs | 51 +++
.../Common/JobSubmissionResult.cs | 318 +++++++++++++++++
.../Org.Apache.REEF.Client/Local/LocalClient.cs | 26 +-
.../Local/LocalJobSubmissionResult.cs | 84 +++++
.../Org.Apache.REEF.Client.csproj | 8 +-
.../YARN/RESTClient/DataModel/Application.cs | 2 +-
.../DataModel/ApplicationFinalState.cs | 36 ++
.../YARN/RESTClient/IUrlProvider.cs | 9 +-
.../YARN/RESTClient/MultipleRMUrlProvider.cs | 74 ++++
.../YARN/RESTClient/YarnClient.cs | 50 ++-
.../RESTClient/YarnConfigurationUrlProvider.cs | 9 +-
.../YARN/YARNREEFClient.cs | 41 ++-
.../YARN/YarnJobSubmissionResult.cs | 55 +++
.../AllHandlers.cs | 12 +-
.../DriverRestart.cs | 2 +-
.../HelloREEF.cs | 4 +-
lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs | 2 +-
.../InProcess/InProcessIMRUClient.cs | 2 +-
.../OnREEF/Client/REEFIMRUClient.cs | 8 +-
.../FileSystem/Hadoop/HadoopFileSystem.cs | 13 +-
.../BroadcastAndReduceClient.cs | 2 +-
.../Functional/Bridge/TestBridgeClient.cs | 4 +-
.../Bridge/TestSimpleEventHandlers.cs | 2 +-
.../Functional/ReefFunctionalTest.cs | 2 +-
31 files changed, 948 insertions(+), 465 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client.Tests/MultipleRMUrlProviderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/MultipleRMUrlProviderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/MultipleRMUrlProviderTests.cs
new file mode 100644
index 0000000..64c8747
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/MultipleRMUrlProviderTests.cs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Globalization;
+using System.IO;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Client.Yarn.RestClient;
+using Org.Apache.REEF.Client.YARN.RestClient;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Client.Tests
+{
+ [TestClass]
+ public class MultipleRMUrlProviderTests
+ {
+ private const string HadoopConfDirEnvVariable = "HADOOP_CONF_DIR";
+ private const string YarnConfigFileName = "yarn-site.xml";
+ private const string AnyHttpAddressConfig = @"anyhost:8088";
+ private const string AnyHttpAddressConfigUpdated = @"anyhost1:8088";
+ private const string AnyHttpsAddressConfig = @"anyotherhost:9088";
+
+ private const string YarnConfigurationXmlContent = @"<?xml version=""1.0""?>
+<?xml-stylesheet type=""text/xsl"" href=""configuration.xsl""?>
+<!-- Put site-specific property overrides in this file. -->
+<configuration xmlns:xi=""http://www.w3.org/2001/XInclude"">
+ <property>
+ <name>yarn.resourcemanager.webapp.address.rm1</name>
+ <value>" + AnyHttpAddressConfig + @"</value>
+ </property>
+ <property>
+ <name>yarn.resourcemanager.webapp.address.rm2</name>
+ <value>" + AnyHttpAddressConfigUpdated + @"</value>
+ </property>
+ <property>
+ <name>yarn.nodemanager.local-dirs</name>
+ <value>C:\hdpdata\hadoop\local</value>
+ </property>
+</configuration>";
+
+ [TestMethod]
+ public void UrlProviderReadsEnvVarConfiguredConfigFileAndParsesCorrectHttpUrl()
+ {
+ string tempFile = Path.GetTempFileName();
+ string tempDir = Path.GetDirectoryName(tempFile);
+ string yarnConfigFile = Path.Combine(tempDir, YarnConfigFileName);
+
+ using (new YarnConfigurationUrlProviderTests.TempFileWriter(yarnConfigFile, YarnConfigurationXmlContent))
+ using (new YarnConfigurationUrlProviderTests.TemporaryOverrideEnvironmentVariable(HadoopConfDirEnvVariable, tempDir))
+ {
+ IUrlProvider urlProvider = GetYarnConfigurationUrlProvider();
+ var url = urlProvider.GetUrlAsync().GetAwaiter().GetResult();
+
+ int i = 0;
+ foreach (var u in url)
+ {
+ i++;
+ Assert.AreEqual("http", u.Scheme);
+ if (i == 1)
+ {
+ Assert.AreEqual(AnyHttpAddressConfig.Split(':')[0], u.Host);
+ Assert.AreEqual(AnyHttpAddressConfig.Split(':')[1],
+ u.Port.ToString(CultureInfo.InvariantCulture));
+ }
+ else
+ {
+ Assert.AreEqual(AnyHttpAddressConfigUpdated.Split(':')[0], u.Host);
+ Assert.AreEqual(AnyHttpAddressConfigUpdated.Split(':')[1],
+ u.Port.ToString(CultureInfo.InvariantCulture));
+ }
+ }
+ }
+ }
+
+ [TestMethod]
+ public void CannotFindHadoopConfigDirThrowsArgumentException()
+ {
+ using (new YarnConfigurationUrlProviderTests.TemporaryOverrideEnvironmentVariable(HadoopConfDirEnvVariable, string.Empty))
+ {
+ try
+ {
+ IUrlProvider urlProviderNotUsed = GetYarnConfigurationUrlProvider();
+ Assert.Fail("Should throw exception");
+ }
+ catch (InjectionException injectionException)
+ {
+ Assert.IsTrue(injectionException.GetBaseException() is ArgumentException);
+ }
+ }
+ }
+
+ private IUrlProvider GetYarnConfigurationUrlProvider()
+ {
+ var builder = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation(GenericType<IUrlProvider>.Class, GenericType<MultipleRMUrlProvider>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewInjector(builder).GetInstance<IUrlProvider>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
index 126647c..261021c 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/Org.Apache.REEF.Client.Tests.csproj
@@ -47,6 +47,7 @@ under the License.
<Reference Include="System.ServiceProcess" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="MultipleRMUrlProviderTests.cs" />
<Compile Include="WindowsHadoopEmulatorYarnClientTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="YarnClientTests.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client.Tests/YarnClientTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnClientTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnClientTests.cs
index 8e83ae4..a5dc5f5 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnClientTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnClientTests.cs
@@ -17,12 +17,14 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using NSubstitute;
+using Org.Apache.REEF.Client.Yarn;
using Org.Apache.REEF.Client.Yarn.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
@@ -42,7 +44,7 @@ namespace Org.Apache.REEF.Client.Tests
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
- Uri anyUri = new Uri("anyscheme://anypath");
+ var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyClusterInfo = new ClusterInfo
{
@@ -55,7 +57,7 @@ namespace Org.Apache.REEF.Client.Tests
req =>
req.Resource == "ws/v1/cluster/info" && req.RootElement == "clusterInfo" &&
req.Method == Method.GET),
- anyUri,
+ anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyClusterInfo));
// act
@@ -73,7 +75,7 @@ namespace Org.Apache.REEF.Client.Tests
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
- Uri anyUri = new Uri("anyscheme://anypath");
+ var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyClusterMetrics = new ClusterMetrics
{
@@ -87,7 +89,7 @@ namespace Org.Apache.REEF.Client.Tests
req =>
req.Resource == "ws/v1/cluster/metrics" && req.RootElement == "clusterMetrics" &&
req.Method == Method.GET),
- anyUri,
+ anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyClusterMetrics));
var yarnClient = ctx.GetClient();
@@ -103,7 +105,7 @@ namespace Org.Apache.REEF.Client.Tests
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
- Uri anyUri = new Uri("anyscheme://anypath");
+ var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyApplication = new Application
@@ -122,7 +124,7 @@ namespace Org.Apache.REEF.Client.Tests
req.Resource == "ws/v1/cluster/apps/" + applicationId
&& req.RootElement == "app"
&& req.Method == Method.GET),
- anyUri,
+ anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyApplication));
var yarnClient = ctx.GetClient();
@@ -133,12 +135,48 @@ namespace Org.Apache.REEF.Client.Tests
}
[TestMethod]
+ public async Task TestGetApplicationFinalStatus()
+ {
+ var ctx = new TestContext();
+ var urlProvider = ctx.UrlProviderFake;
+ var restReqExecutor = ctx.RestRequestExecutorFake;
+ var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
+ const string applicationId = "AnyApplicationId";
+ urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
+ var anyApplication = new Application
+ {
+ AllocatedMB = 100,
+ AmHostHttpAddress = "http://anyhttpaddress",
+ AmContainerLogs = "SomeLogs",
+ ApplicationType = "AnyYarnApplicationType",
+ State = State.FINISHED,
+ FinalStatus = FinalState.SUCCEEDED,
+ Name = "AnyApplicationName",
+ RunningContainers = 0
+ };
+ restReqExecutor.ExecuteAsync<Application>(
+ Arg.Is<IRestRequest>(
+ req =>
+ req.Resource == "ws/v1/cluster/apps/" + applicationId
+ && req.RootElement == "app"
+ && req.Method == Method.GET),
+ anyUri.First(),
+ CancellationToken.None).Returns(Task.FromResult(anyApplication));
+
+ var yarnClient = ctx.GetClient();
+
+ Application actualApplication = await yarnClient.GetApplicationAsync(applicationId);
+
+ Assert.AreEqual(actualApplication.FinalStatus, FinalState.SUCCEEDED);
+ }
+
+ [TestMethod]
public async Task TestCreateNewApplication()
{
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
- Uri anyUri = new Uri("anyscheme://anypath");
+ var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyNewApplication = new NewApplication
@@ -150,7 +188,7 @@ namespace Org.Apache.REEF.Client.Tests
req =>
req.Resource == "ws/v1/cluster/apps/new-application"
&& req.Method == Method.POST),
- anyUri,
+ anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyNewApplication));
var yarnClient = ctx.GetClient();
@@ -166,7 +204,7 @@ namespace Org.Apache.REEF.Client.Tests
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
- Uri anyUri = new Uri("anyscheme://anypath");
+ var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
const string anyApplicationType = "REEFTest";
const string anyApplicationName = "AnyAPP";
@@ -280,7 +318,7 @@ namespace Org.Apache.REEF.Client.Tests
&& req.JsonSerializer is RestJsonSerializer
&& req.Parameters.First().Name == "application/json"
&& B(req, expectedJson)),
- anyUri,
+ anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(response));
restReqExecutor.ExecuteAsync<Application>(
@@ -289,7 +327,7 @@ namespace Org.Apache.REEF.Client.Tests
req.Resource == "ws/v1/cluster/apps/" + applicationId
&& req.RootElement == "app"
&& req.Method == Method.GET),
- anyUri,
+ anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(thisApplication));
var yarnClient = ctx.GetClient();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client.Tests/YarnConfigurationUrlProviderTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnConfigurationUrlProviderTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnConfigurationUrlProviderTests.cs
index 71747a5..9a8223b 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnConfigurationUrlProviderTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnConfigurationUrlProviderTests.cs
@@ -16,10 +16,14 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.Globalization;
using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Client.Yarn.RestClient;
+using Org.Apache.REEF.Client.YARN.RestClient;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -83,9 +87,9 @@ namespace Org.Apache.REEF.Client.Tests
YarnConfigurationUrlProvider urlProvider = GetYarnConfigurationUrlProvider();
var url = urlProvider.GetUrlAsync().GetAwaiter().GetResult();
- Assert.AreEqual("http", url.Scheme);
- Assert.AreEqual(AnyHttpAddressConfig.Split(':')[0], url.Host);
- Assert.AreEqual(AnyHttpAddressConfig.Split(':')[1], url.Port.ToString(CultureInfo.InvariantCulture));
+ Assert.AreEqual("http", url.First().Scheme);
+ Assert.AreEqual(AnyHttpAddressConfig.Split(':')[0], url.First().Host);
+ Assert.AreEqual(AnyHttpAddressConfig.Split(':')[1], url.First().Port.ToString(CultureInfo.InvariantCulture));
}
}
@@ -100,11 +104,11 @@ namespace Org.Apache.REEF.Client.Tests
using (new TemporaryOverrideEnvironmentVariable(HadoopConfDirEnvVariable, tempDir))
{
YarnConfigurationUrlProvider urlProvider = GetYarnConfigurationUrlProvider(useHttps: true);
- var url = urlProvider.GetUrlAsync().GetAwaiter().GetResult();
+ IEnumerable<Uri> url = urlProvider.GetUrlAsync().GetAwaiter().GetResult();
- Assert.AreEqual("https", url.Scheme);
- Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[0], url.Host);
- Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[1], url.Port.ToString(CultureInfo.InvariantCulture));
+ Assert.AreEqual("https", url.First().Scheme);
+ Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[0], url.First().Host);
+ Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[1], url.First().Port.ToString(CultureInfo.InvariantCulture));
}
}
@@ -122,9 +126,9 @@ namespace Org.Apache.REEF.Client.Tests
useHttps: true);
var url = urlProvider.GetUrlAsync().GetAwaiter().GetResult();
- Assert.AreEqual("https", url.Scheme);
- Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[0], url.Host);
- Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[1], url.Port.ToString(CultureInfo.InvariantCulture));
+ Assert.AreEqual("https", url.First().Scheme);
+ Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[0], url.First().Host);
+ Assert.AreEqual(AnyHttpsAddressConfig.Split(':')[1], url.First().Port.ToString(CultureInfo.InvariantCulture));
}
}
@@ -165,7 +169,7 @@ namespace Org.Apache.REEF.Client.Tests
/// even in the case of failure.
/// Other tests in the assembly will see changed env var
/// </summary>
- private class TemporaryOverrideEnvironmentVariable : IDisposable
+ internal class TemporaryOverrideEnvironmentVariable : IDisposable
{
private readonly string _variableName;
private readonly string _oldValue;
@@ -188,7 +192,7 @@ namespace Org.Apache.REEF.Client.Tests
/// even in the case of failures.
/// It is a shame we are writing to disk in unit test
/// </summary>
- private class TempFileWriter : IDisposable
+ internal class TempFileWriter : IDisposable
{
private readonly string _filePath;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
index 74b8d95..25211d2 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/IREEFClient.cs
@@ -18,7 +18,9 @@
*/
using System;
+using System.Threading.Tasks;
using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
using Org.Apache.REEF.Common.Attributes;
namespace Org.Apache.REEF.Client.API
@@ -37,11 +39,19 @@ namespace Org.Apache.REEF.Client.API
/// <summary>
/// Submit the job described in jobSubmission to the cluster.
- /// Expect IDriverHttpEndpoint returned after the call.
+ /// Expect IJobSubmissionResult returned after the call.
/// </summary>
/// <param name="jobSubmission"></param>
- /// <returns>IDriverHttpEndpoint</returns>
+ /// <returns>IJobSubmissionResult</returns>
[Unstable("0.13", "Working in progress for what to return after submit")]
- IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission);
+ IJobSubmissionResult SubmitAndGetJobStatus(IJobSubmission jobSubmission);
+
+ /// <summary>
+ /// Returns the application status in running the job
+ /// </summary>
+ /// <param name="appId"></param>
+ /// <returns></returns>
+ [Unstable("0.14", "Working in progress for rest API status returned")]
+ Task<FinalState> GetJobFinalStatus(string appId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs b/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs
deleted file mode 100644
index 1a00493..0000000
--- a/lang/cs/Org.Apache.REEF.Client/Common/HttpClientHelper.cs
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Collections.Generic;
-using Newtonsoft.Json;
-using Org.Apache.REEF.Utilities.Logging;
-using System;
-using System.IO;
-using System.Net;
-using System.Net.Http;
-using System.Net.Http.Headers;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.REEF.Client.Common
-{
- internal class HttpClientHelper : IDriverHttpEndpoint
- {
- private static readonly Logger LOGGER = Logger.GetLogger(typeof (HttpClientHelper));
- private const int MaxConnectAttemptCount = 20;
- private const int MilliSecondsToWaitBeforeNextConnectAttempt = 1000;
- private const int SecondsForHttpClientTimeout = 120;
- private const string UnAssigned = "UNASSIGNED";
- private const string TrackingUrlKey = "trackingUrl";
- private const string AppKey = "app";
- private const string ThisIsStandbyRm = "This is standby RM";
- private const string AppJson = "application/json";
-
- private string _driverUrl;
-
- private readonly HttpClient _client;
-
- internal HttpClientHelper()
- {
- _client = new HttpClient
- {
- Timeout = TimeSpan.FromSeconds(SecondsForHttpClientTimeout),
- };
- _client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(AppJson));
- }
-
- public string DriverUrl { get { return _driverUrl; } }
-
- public string GetUrlResult(string url)
- {
- var task = Task.Run(() => CallUrl(url));
- task.Wait();
- return task.Result;
- }
-
- enum UrlResultKind
- {
- WasNotAbleToTalkToRm,
- BackupRm,
- AppIdNotThereYet,
- UrlNotAssignedYet,
- GotAppIdUrl,
- }
-
- internal static List<string> GetRmUri(string filePath)
- {
- using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read)))
- {
- sr.ReadLine(); // appid
- sr.ReadLine(); // trackingUrl
- var rmList = new List<string>();
- var rmUri = sr.ReadLine();
- while (rmUri != null)
- {
- rmList.Add(rmUri);
- rmUri = sr.ReadLine();
- }
- return rmList;
- }
- }
-
- internal static string GetAppId(string filePath)
- {
- using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read)))
- {
- var appId = sr.ReadLine();
- return appId;
- }
- }
-
- internal static string GetTrackingUrl(string filePath)
- {
- using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read)))
- {
- sr.ReadLine(); // appid
- var trackingUrl = sr.ReadLine();
- return "http://" + trackingUrl + "/";
- }
- }
-
- internal async Task<string> CallUrl (string url)
- {
- var result = await TryGetUri(url);
- if (HasCommandFailed(result))
- {
- return null;
- }
- LOGGER.Log(Level.Warning, "CallUrl result " + result.Item2);
- return result.Item2;
- }
-
- internal string GetDriverUrlForYarn(String filePath)
- {
- _driverUrl = GetTrackingUrl(filePath);
- return _driverUrl;
- }
-
- internal string GetDriverUrlForLocalRuntime(string filePath)
- {
- _driverUrl = null;
- for (int i = 0; i < 10; i++)
- {
- var driverUrl = TryReadHttpServerIpAndPortFromFile(filePath);
- if (!string.IsNullOrEmpty(driverUrl))
- {
- _driverUrl = "http://" + driverUrl + "/";
- break;
- }
- Thread.Sleep(1000);
- }
- return _driverUrl;
- }
-
- private string TryReadHttpServerIpAndPortFromFile(String fileName)
- {
- string httpServerIpAndPort = null;
- try
- {
- LOGGER.Log(Level.Info, "try open " + fileName);
- using (var rdr = new StreamReader(File.OpenRead(fileName)))
- {
- httpServerIpAndPort = rdr.ReadLine();
- LOGGER.Log(Level.Info, "httpServerIpAndPort is " + httpServerIpAndPort);
- }
- }
- catch (FileNotFoundException)
- {
- LOGGER.Log(Level.Info, "File does not exist: " + fileName);
- }
- return httpServerIpAndPort;
- }
-
- internal async Task<string> GetAppIdTrackingUrl(string url)
- {
- var result = await TryGetUri(url);
- if (HasCommandFailed(result) ||
- result.Item2 == null)
- {
- return null;
- }
-
- LOGGER.Log(Level.Info, "GetAppIdTrackingUrl: " + result.Item2);
- return result.Item2;
- }
-
- private static bool ShouldRetry(HttpRequestException httpRequestException)
- {
- var shouldRetry = false;
- if (httpRequestException.Message.IndexOf(((int)(HttpStatusCode.NotFound)).ToString(), StringComparison.Ordinal) != -1 ||
- httpRequestException.Message.IndexOf(((int)(HttpStatusCode.BadGateway)).ToString(), StringComparison.Ordinal) != -1)
- {
- shouldRetry = true;
- }
- else
- {
- var webException = httpRequestException.InnerException as System.Net.WebException;
- if (webException != null)
- {
- if (webException.Status == System.Net.WebExceptionStatus.ConnectFailure)
- {
- shouldRetry = true;
- }
- }
- }
- return shouldRetry;
- }
-
- private static Tuple<bool, string> CommandFailed(String reason)
- {
- return new Tuple<bool, string>(false, null);
- }
-
- private static Tuple<bool, string> CommandSucceeded(string commandResult)
- {
- return new Tuple<bool, string>(true, commandResult);
- }
-
- private bool HasCommandFailed(Tuple<bool, string> httpCallResult)
- {
- return !httpCallResult.Item1;
- }
-
- internal async Task<Tuple<bool, string>> TryGetUri(string commandUri)
- {
- var connectAttemptCount = 0;
- Tuple<bool, string> result;
-
- while (true)
- {
- try
- {
- string strResult = null;
- LOGGER.Log(Level.Warning, "Try url [" + commandUri + "] connectAttemptCount " + connectAttemptCount + ".");
- strResult = await _client.GetStringAsync(commandUri);
- result = CommandSucceeded(strResult);
- LOGGER.Log(Level.Warning, "Connection succeeded. connectAttemptCount was " + connectAttemptCount + ".");
- break;
- }
- catch (HttpRequestException httpRequestException)
- {
- if (!ShouldRetry(httpRequestException))
- {
- LOGGER.Log(Level.Error,
- commandUri + " exception " + httpRequestException.Message + "\n" +
- httpRequestException.StackTrace);
- result = CommandFailed(httpRequestException.Message);
- LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + ".");
- break;
- }
- }
- catch (Exception ex)
- {
- LOGGER.Log(Level.Error, commandUri + " exception " + ex.Message + "\n" + ex.StackTrace);
- result = CommandFailed(ex.Message);
- LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + ".");
- break;
- }
-
- ++connectAttemptCount;
- if (connectAttemptCount >= MaxConnectAttemptCount)
- {
- result = CommandFailed("Could not connect to " + commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts.");
- LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + ".");
- break;
- }
-
- Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt);
- }
-
- return result;
- }
-
- internal async Task<string> TryUntilNoConnection(string commandUri)
- {
- var connectAttemptCount = 0;
- while (true)
- {
- try
- {
- var strResult = await _client.GetStringAsync(commandUri);
- LOGGER.Log(Level.Info,
- "Connection succeeded. connectAttemptCount was " + connectAttemptCount + ".");
- }
- catch (HttpRequestException httpRequestException)
- {
- LOGGER.Log(Level.Info, httpRequestException.Message);
- break;
- }
- catch (Exception e)
- {
- LOGGER.Log(Level.Info, e.Message);
- break;
- }
-
- ++connectAttemptCount;
- if (connectAttemptCount >= MaxConnectAttemptCount)
- {
- LOGGER.Log(Level.Info, "Can still connect to " + commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts.");
- break;
- }
-
- Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt);
- }
-
- return null;
- }
-
- private static bool ShouldRetry(HttpStatusCode httpStatusCode)
- {
- return httpStatusCode == HttpStatusCode.NotFound;
- }
-
- private UrlResultKind CheckUrlAttempt(string result)
- {
- UrlResultKind resultKind = UrlResultKind.WasNotAbleToTalkToRm;
- if (string.IsNullOrEmpty(result))
- {
- resultKind = UrlResultKind.WasNotAbleToTalkToRm;
- }
- else if (result.StartsWith(ThisIsStandbyRm))
- {
- resultKind = UrlResultKind.BackupRm;
- }
- else
- {
- dynamic deserializedValue = JsonConvert.DeserializeObject(result);
- var values = deserializedValue[AppKey];
- if (values == null || values[TrackingUrlKey] == null)
- {
- resultKind = UrlResultKind.AppIdNotThereYet;
- }
- else
- {
- _driverUrl = values[TrackingUrlKey].ToString();
- LOGGER.Log(Level.Info, "trackingUrl[" + _driverUrl + "]");
-
- if (0 == String.Compare(_driverUrl, UnAssigned))
- {
- resultKind = UrlResultKind.UrlNotAssignedYet;
- }
- else
- {
- resultKind = UrlResultKind.GotAppIdUrl;
- }
-
- }
- }
-
- LOGGER.Log(Level.Info, "CheckUrlAttempt " + resultKind);
- return resultKind;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs b/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs
deleted file mode 100644
index c128ad0..0000000
--- a/lang/cs/Org.Apache.REEF.Client/Common/IDriverHttpEndpoint.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Common.Attributes;
-
-namespace Org.Apache.REEF.Client.Common
-{
- [Unstable("0.13", "Working in progress for what to return after submit")]
- public interface IDriverHttpEndpoint
- {
- string GetUrlResult(string url);
-
- string DriverUrl { get; }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
new file mode 100644
index 0000000..6ecdbb4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Common/IJobSubmissionResult.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Common.Attributes;
+
+namespace Org.Apache.REEF.Client.Common
+{
+ [Unstable("0.13", "Working in progress. For local runtime, some of the property, such as FinalState and AppId are not implemented yet.")]
+ public interface IJobSubmissionResult
+ {
+ /// <summary>
+ /// Get http response for the given url
+ /// </summary>
+ /// <param name="url"></param>
+ /// <returns></returns>
+ string GetUrlResult(string url);
+
+ /// <summary>
+ /// This method returns the url of http server running inside the driver.
+ /// e.g. http://hostname:port/
+ /// </summary>
+ string DriverUrl { get; }
+
+ /// <summary>
+ /// Get Application final state
+ /// </summary>
+ FinalState FinalState { get; }
+
+ /// <summary>
+ /// Get Yarn application id after Job is submited
+ /// </summary>
+ string AppId { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
new file mode 100644
index 0000000..80bad49
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using Newtonsoft.Json;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+
+namespace Org.Apache.REEF.Client.Common
+{
+ internal abstract class JobSubmissionResult : IJobSubmissionResult
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof (JobSubmissionResult));
+ private const int MaxConnectAttemptCount = 20;
+ private const int MilliSecondsToWaitBeforeNextConnectAttempt = 1000;
+ private const int SecondsForHttpClientTimeout = 120;
+ private const string UnAssigned = "UNASSIGNED";
+ private const string TrackingUrlKey = "trackingUrl";
+ private const string AppKey = "app";
+ private const string ThisIsStandbyRm = "This is standby RM";
+ private const string AppJson = "application/json";
+
+ private string _driverUrl;
+ protected string _appId;
+
+ private readonly HttpClient _client;
+ private readonly IREEFClient _reefClient;
+
+ internal JobSubmissionResult(IREEFClient reefClient, string filePath)
+ {
+ _reefClient = reefClient;
+ _client = new HttpClient
+ {
+ Timeout = TimeSpan.FromSeconds(SecondsForHttpClientTimeout),
+ };
+ _client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(AppJson));
+
+ _driverUrl = GetDriverUrl(filePath);
+ }
+
+ /// <summary>
+ /// Returns http end point of the web server running in the driver
+ /// </summary>
+ public string DriverUrl { get { return _driverUrl; }
+ }
+
+ /// <summary>
+ /// Get application Id returned from Yarn job submission
+ /// </summary>
+ public string AppId
+ {
+ get { return _appId; }
+ }
+
+ /// <summary>
+ /// Get application final status from Yarn
+ /// </summary>
+ public FinalState FinalState
+ {
+ get { return _reefClient.GetJobFinalStatus(_appId).Result; }
+ }
+
+ /// <summary>
+ /// Return response for a given http request url
+ /// </summary>
+ /// <param name="url"></param>
+ /// <returns></returns>
+ public string GetUrlResult(string url)
+ {
+ var task = Task.Run(() => CallUrl(url));
+ task.Wait();
+ return task.Result;
+ }
+
+ protected abstract string GetDriverUrl(string filepath);
+
+ enum UrlResultKind
+ {
+ WasNotAbleToTalkToRm,
+ BackupRm,
+ AppIdNotThereYet,
+ UrlNotAssignedYet,
+ GotAppIdUrl,
+ }
+
+ internal static List<string> GetRmUri(string filePath)
+ {
+ using (var sr = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read)))
+ {
+ sr.ReadLine(); // appid
+ sr.ReadLine(); // trackingUrl
+ var rmList = new List<string>();
+ var rmUri = sr.ReadLine();
+ while (rmUri != null)
+ {
+ rmList.Add(rmUri);
+ rmUri = sr.ReadLine();
+ }
+ return rmList;
+ }
+ }
+
+ internal async Task<string> CallUrl (string url)
+ {
+ var result = await TryGetUri(url);
+ if (HasCommandFailed(result))
+ {
+ return null;
+ }
+ LOGGER.Log(Level.Warning, "CallUrl result " + result.Item2);
+ return result.Item2;
+ }
+
+ internal async Task<string> GetAppIdTrackingUrl(string url)
+ {
+ var result = await TryGetUri(url);
+ if (HasCommandFailed(result) ||
+ result.Item2 == null)
+ {
+ return null;
+ }
+
+ LOGGER.Log(Level.Info, "GetAppIdTrackingUrl: " + result.Item2);
+ return result.Item2;
+ }
+
+ private static bool ShouldRetry(HttpRequestException httpRequestException)
+ {
+ var shouldRetry = false;
+ if (httpRequestException.Message.IndexOf(((int)(HttpStatusCode.NotFound)).ToString(), StringComparison.Ordinal) != -1 ||
+ httpRequestException.Message.IndexOf(((int)(HttpStatusCode.BadGateway)).ToString(), StringComparison.Ordinal) != -1)
+ {
+ shouldRetry = true;
+ }
+ else
+ {
+ var webException = httpRequestException.InnerException as System.Net.WebException;
+ if (webException != null)
+ {
+ if (webException.Status == System.Net.WebExceptionStatus.ConnectFailure)
+ {
+ shouldRetry = true;
+ }
+ }
+ }
+ return shouldRetry;
+ }
+
+ private static Tuple<bool, string> CommandFailed(String reason)
+ {
+ return new Tuple<bool, string>(false, null);
+ }
+
+ private static Tuple<bool, string> CommandSucceeded(string commandResult)
+ {
+ return new Tuple<bool, string>(true, commandResult);
+ }
+
+ private bool HasCommandFailed(Tuple<bool, string> httpCallResult)
+ {
+ return !httpCallResult.Item1;
+ }
+
+ internal async Task<Tuple<bool, string>> TryGetUri(string commandUri)
+ {
+ var connectAttemptCount = 0;
+ Tuple<bool, string> result;
+
+ while (true)
+ {
+ try
+ {
+ string strResult = null;
+ LOGGER.Log(Level.Warning, "Try url [" + commandUri + "] connectAttemptCount " + connectAttemptCount + ".");
+ strResult = await _client.GetStringAsync(commandUri);
+ result = CommandSucceeded(strResult);
+ LOGGER.Log(Level.Warning, "Connection succeeded. connectAttemptCount was " + connectAttemptCount + ".");
+ break;
+ }
+ catch (HttpRequestException httpRequestException)
+ {
+ if (!ShouldRetry(httpRequestException))
+ {
+ LOGGER.Log(Level.Error,
+ commandUri + " exception " + httpRequestException.Message + "\n" +
+ httpRequestException.StackTrace);
+ result = CommandFailed(httpRequestException.Message);
+ LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + ".");
+ break;
+ }
+ }
+ catch (Exception ex)
+ {
+ LOGGER.Log(Level.Error, commandUri + " exception " + ex.Message + "\n" + ex.StackTrace);
+ result = CommandFailed(ex.Message);
+ LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + ".");
+ break;
+ }
+
+ ++connectAttemptCount;
+ if (connectAttemptCount >= MaxConnectAttemptCount)
+ {
+ result = CommandFailed("Could not connect to " + commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts.");
+ LOGGER.Log(Level.Warning, "Connection failed. connectAttemptCount was " + connectAttemptCount + ".");
+ break;
+ }
+
+ Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt);
+ }
+
+ return result;
+ }
+
+ internal async Task<string> TryUntilNoConnection(string commandUri)
+ {
+ var connectAttemptCount = 0;
+ while (true)
+ {
+ try
+ {
+ var strResult = await _client.GetStringAsync(commandUri);
+ LOGGER.Log(Level.Info,
+ "Connection succeeded. connectAttemptCount was " + connectAttemptCount + ".");
+ }
+ catch (HttpRequestException httpRequestException)
+ {
+ LOGGER.Log(Level.Info, httpRequestException.Message);
+ break;
+ }
+ catch (Exception e)
+ {
+ LOGGER.Log(Level.Info, e.Message);
+ break;
+ }
+
+ ++connectAttemptCount;
+ if (connectAttemptCount >= MaxConnectAttemptCount)
+ {
+ LOGGER.Log(Level.Info, "Can still connect to " + commandUri + " after " + MaxConnectAttemptCount.ToString() + "attempts.");
+ break;
+ }
+
+ Thread.Sleep(MilliSecondsToWaitBeforeNextConnectAttempt);
+ }
+
+ return null;
+ }
+
+ private static bool ShouldRetry(HttpStatusCode httpStatusCode)
+ {
+ return httpStatusCode == HttpStatusCode.NotFound;
+ }
+
+ private UrlResultKind CheckUrlAttempt(string result)
+ {
+ UrlResultKind resultKind = UrlResultKind.WasNotAbleToTalkToRm;
+ if (string.IsNullOrEmpty(result))
+ {
+ resultKind = UrlResultKind.WasNotAbleToTalkToRm;
+ }
+ else if (result.StartsWith(ThisIsStandbyRm))
+ {
+ resultKind = UrlResultKind.BackupRm;
+ }
+ else
+ {
+ dynamic deserializedValue = JsonConvert.DeserializeObject(result);
+ var values = deserializedValue[AppKey];
+ if (values == null || values[TrackingUrlKey] == null)
+ {
+ resultKind = UrlResultKind.AppIdNotThereYet;
+ }
+ else
+ {
+ _driverUrl = values[TrackingUrlKey].ToString();
+ LOGGER.Log(Level.Info, "trackingUrl[" + _driverUrl + "]");
+
+ if (0 == String.Compare(_driverUrl, UnAssigned))
+ {
+ resultKind = UrlResultKind.UrlNotAssignedYet;
+ }
+ else
+ {
+ resultKind = UrlResultKind.GotAppIdUrl;
+ }
+
+ }
+ }
+
+ LOGGER.Log(Level.Info, "CheckUrlAttempt " + resultKind);
+ return resultKind;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
index 00b33d9..a1e8f91 100644
--- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
@@ -18,6 +18,7 @@
*/
using System;
+using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
@@ -26,6 +27,8 @@ using Org.Apache.REEF.Client.Avro;
using Org.Apache.REEF.Client.Avro.Local;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Local.Parameters;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Common.Attributes;
using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Tang.Annotations;
@@ -55,7 +58,6 @@ namespace Org.Apache.REEF.Client.Local
private readonly JavaClientLauncher _javaClientLauncher;
private readonly int _maxNumberOfConcurrentEvaluators;
private readonly string _runtimeFolder;
- private string _driverUrl;
private REEFFileNames _fileNames;
[Inject]
@@ -139,7 +141,7 @@ namespace Org.Apache.REEF.Client.Local
Logger.Log(Level.Info, "Submitted the Driver for execution.");
}
- public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission)
+ public IJobSubmissionResult SubmitAndGetJobStatus(IJobSubmission jobSubmission)
{
var driverFolder = PrepareDriverFolder(jobSubmission);
var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder);
@@ -147,16 +149,24 @@ namespace Org.Apache.REEF.Client.Local
Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath));
var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint);
- HttpClientHelper helper = new HttpClientHelper();
- _driverUrl = helper.GetDriverUrlForLocalRuntime(fileName);
+ JobSubmissionResult result = new LocalJobSubmissionResult(this, fileName);
- Logger.Log(Level.Info, "Submitted the Driver for execution. Returned driverUrl is: " + _driverUrl);
- return helper;
+ var msg = string.Format(CultureInfo.CurrentCulture,
+ "Submitted the Driver for execution. Returned driverUrl is: {0}.", result.DriverUrl);
+ Logger.Log(Level.Info, msg);
+ return result;
}
- public string DriverUrl
+ /// <summary>
+ /// Return current Job status
+ /// </summary>
+ /// <returns></returns>
+ /// //TODO: REEF-889
+ [Unstable("0.14", "Working in progress for rest API status returned")]
+ public async Task<FinalState> GetJobFinalStatus(string appId)
{
- get { return _driverUrl; }
+ await Task.Delay(0);
+ return FinalState.SUCCEEDED;
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Local/LocalJobSubmissionResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalJobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalJobSubmissionResult.cs
new file mode 100644
index 0000000..78fb1f7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalJobSubmissionResult.cs
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.IO;
+using System.Threading;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.Local
+{
+ internal class LocalJobSubmissionResult : JobSubmissionResult
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(LocalJobSubmissionResult));
+
+ /// <summary>
+ /// Time interval between the pulling of data from the http end point file
+ /// </summary>
+ private const int PullInterval = 1000;
+
+ private const string UriTemplate = @"http://{0}/";
+
+ internal LocalJobSubmissionResult(IREEFClient reefClient, string filePath)
+ : base(reefClient, filePath)
+ {
+ }
+
+ protected override string GetDriverUrl(string filepath)
+ {
+ return GetDriverUrlForLocalRuntime(filepath);
+ }
+
+ private string GetDriverUrlForLocalRuntime(string filePath)
+ {
+ string fullDriverUrl = null;
+ for (int i = 0; i < 10; i++)
+ {
+ var driverUrl = TryReadHttpServerIpAndPortFromFile(filePath);
+ if (!string.IsNullOrEmpty(driverUrl))
+ {
+ fullDriverUrl = string.Format(UriTemplate, driverUrl);
+ break;
+ }
+ Thread.Sleep(PullInterval);
+ }
+ return fullDriverUrl;
+ }
+
+ private string TryReadHttpServerIpAndPortFromFile(string fileName)
+ {
+ string httpServerIpAndPort = null;
+ try
+ {
+ LOGGER.Log(Level.Info, "try open " + fileName);
+ using (var rdr = new StreamReader(File.OpenRead(fileName)))
+ {
+ httpServerIpAndPort = rdr.ReadLine();
+ LOGGER.Log(Level.Info, "httpServerIpAndPort is " + httpServerIpAndPort);
+ }
+ }
+ catch (FileNotFoundException)
+ {
+ LOGGER.Log(Level.Info, "File does not exist: " + fileName);
+ }
+ return httpServerIpAndPort;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index ece257c..9b41ea5 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -74,11 +74,12 @@ under the License.
<Compile Include="Common\ClientConstants.cs" />
<Compile Include="Common\DriverFolderPreparationHelper.cs" />
<Compile Include="Common\FileSets.cs" />
- <Compile Include="Common\HttpClientHelper.cs" />
- <Compile Include="Common\IDriverHttpEndpoint.cs" />
+ <Compile Include="Common\JobSubmissionResult.cs" />
+ <Compile Include="Common\IJobSubmissionResult.cs" />
<Compile Include="Common\JavaClientLauncher.cs" />
<Compile Include="Common\ResourceHelper.cs" />
<Compile Include="Local\LocalClient.cs" />
+ <Compile Include="Local\LocalJobSubmissionResult.cs" />
<Compile Include="Local\LocalRuntimeClientConfiguration.cs" />
<Compile Include="Local\Parameters\LocalRuntimeDirectory.cs" />
<Compile Include="Local\Parameters\NumberOfEvaluators.cs" />
@@ -87,6 +88,7 @@ under the License.
<Compile Include="YARN\Parameters\SecurityTokenParameters.cs" />
<Compile Include="YARN\RestClient\DataModel\Acls.cs" />
<Compile Include="YARN\RestClient\DataModel\AmContainerSpec.cs" />
+ <Compile Include="YARN\RestClient\DataModel\ApplicationFinalState.cs" />
<Compile Include="YARN\RestClient\DataModel\ApplicationTag.cs" />
<Compile Include="YARN\RestClient\DataModel\Commands.cs" />
<Compile Include="YARN\RestClient\DataModel\Credentials.cs" />
@@ -98,7 +100,9 @@ under the License.
<Compile Include="YARN\RestClient\DataModel\Tokens.cs" />
<Compile Include="YARN\RestClient\IRestRequestExecutor.cs" />
<Compile Include="YARN\RestClient\IUrlProvider.cs" />
+ <Compile Include="YARN\RestClient\MultipleRMUrlProvider.cs" />
<Compile Include="YARN\RestClient\RestJsonSerializer.cs" />
+ <Compile Include="YARN\YarnJobSubmissionResult.cs" />
<Compile Include="YARN\YARNREEFClient.cs" />
<Compile Include="YARN\RestClient\IRestClientFactory.cs" />
<Compile Include="YARN\RestClient\RestRequestExecutor.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/Application.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/Application.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/Application.cs
index 2856efc..d80c551 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/Application.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/Application.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.REEF.Client.YARN.RestClient.DataModel
public State State { get; set; }
- public string FinalStatus { get; set; }
+ public FinalState FinalStatus { get; set; }
public float Progress { get; set; }
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/ApplicationFinalState.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/ApplicationFinalState.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/ApplicationFinalState.cs
new file mode 100644
index 0000000..8a49c9e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/DataModel/ApplicationFinalState.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Client.YARN.RestClient.DataModel
+{
+ /// <summary>
+ /// Class generated based on schema provided in
+ /// <see cref="!:http://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html">
+ /// Hadoop RM REST API </see> documentation.
+ /// </summary>
+ // valid values are members of the YarnApplicationState enum
+ public enum FinalState
+ {
+ UNDEFINED,
+
+ SUCCEEDED,
+
+ FAILED,
+
+ KILLED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/IUrlProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/IUrlProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/IUrlProvider.cs
index 85126d3..dd1e8ab 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/IUrlProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/IUrlProvider.cs
@@ -16,14 +16,19 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
using Org.Apache.REEF.Tang.Annotations;
namespace Org.Apache.REEF.Client.Yarn.RestClient
{
[DefaultImplementation(typeof(YarnConfigurationUrlProvider))]
- internal interface IUrlProvider
+ public interface IUrlProvider
{
- Task<Uri> GetUrlAsync();
+ /// <summary>
+ /// Returns available Yarn resourcemanager web address for the environment
+ /// </summary>
+ /// <returns></returns>
+ Task<IEnumerable<Uri>> GetUrlAsync();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/MultipleRMUrlProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/MultipleRMUrlProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/MultipleRMUrlProvider.cs
new file mode 100644
index 0000000..5ce3bb8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/MultipleRMUrlProvider.cs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using System.Xml.Linq;
+using Org.Apache.REEF.Client.Yarn.RestClient;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN.RestClient
+{
+ public class MultipleRMUrlProvider : IUrlProvider
+ {
+ private const string RmConfigKeyPrefix = "yarn.resourcemanager.webapp.address.rm";
+ private static readonly string HadoopConfDirEnvVariable = "HADOOP_CONF_DIR";
+ private static readonly string YarnConfigFileName = "yarn-site.xml";
+ private static readonly Logger Logger = Logger.GetLogger(typeof(MultipleRMUrlProvider));
+ private IList<Uri> _yarnRmUri;
+
+ [Inject]
+ private MultipleRMUrlProvider()
+ {
+ var hadoopConfigDir = Environment.GetEnvironmentVariable(HadoopConfDirEnvVariable);
+
+ if (string.IsNullOrEmpty(hadoopConfigDir) || !Directory.Exists(hadoopConfigDir))
+ {
+ throw new ArgumentException(HadoopConfDirEnvVariable + " is not configured or does not exist.",
+ "hadoopConfigDir");
+ }
+
+ Logger.Log(Level.Verbose, "Using {0} as hadoop configuration directory", hadoopConfigDir);
+ string yarnConfigurationFile = Path.Combine(hadoopConfigDir, YarnConfigFileName);
+ LoadYarnConfiguration(yarnConfigurationFile);
+ }
+
+ public Task<IEnumerable<Uri>> GetUrlAsync()
+ {
+ return Task.FromResult((IEnumerable<Uri>)_yarnRmUri);
+ }
+
+ private void LoadYarnConfiguration(string yarnConfigurationFile)
+ {
+ var configRoot = XElement.Load(yarnConfigurationFile);
+ var address = configRoot.Elements("property")
+ .Where(x =>
+ ((string) x.Element("name")).ToUpper().StartsWith(RmConfigKeyPrefix.ToUpper()))
+ .Select(x => (string) x.Element("value"));
+ _yarnRmUri =
+ address.Select(x => x.TrimEnd('/') + @"/")
+ .Select(x => string.Format("http://{0}", x))
+ .Where(x => Uri.IsWellFormedUriString(x, UriKind.Absolute))
+ .Select(x => new Uri(x))
+ .ToList();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnClient.cs
index f54c5f3..a649594 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnClient.cs
@@ -16,6 +16,8 @@
// under the License.
using System;
+using System.Collections.Generic;
+using System.Globalization;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
@@ -23,6 +25,7 @@ using Org.Apache.REEF.Client.YARN.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.AsyncUtils;
+using Org.Apache.REEF.Utilities.Logging;
using RestSharp;
namespace Org.Apache.REEF.Client.Yarn.RestClient
@@ -35,6 +38,7 @@ namespace Org.Apache.REEF.Client.Yarn.RestClient
/// </summary>
internal sealed partial class YarnClient : IYarnRMClient
{
+ private static readonly Logger Logger = Logger.GetLogger(typeof(YarnClient));
private readonly string _baseResourceString;
private readonly IUrlProvider _yarnRmUrlProviderUri;
private readonly IRestRequestExecutor _restRequestExecutor;
@@ -130,19 +134,49 @@ namespace Org.Apache.REEF.Client.Yarn.RestClient
CancellationToken cancellationToken)
where T : new()
{
- Uri yarnRmUri = await _yarnRmUrlProviderUri.GetUrlAsync();
- return
- await
- _restRequestExecutor.ExecuteAsync<T>(request, yarnRmUri, cancellationToken);
+ IEnumerable<Uri> yarnRmUris = await _yarnRmUrlProviderUri.GetUrlAsync();
+ var exceptions = new List<Exception>();
+ foreach (var yarnRmUri in yarnRmUris)
+ {
+ try
+ {
+ return
+ await
+ _restRequestExecutor.ExecuteAsync<T>(request, yarnRmUri, cancellationToken);
+ }
+ catch (Exception e)
+ {
+ exceptions.Add(e);
+ Logger.Log(Level.Verbose, string.Format(
+ CultureInfo.CurrentCulture, "Possibly transient error in rest call {0}", e.Message));
+ }
+ }
+
+ throw new AggregateException("Failed Rest Request", exceptions);
}
private async Task<IRestResponse> GenerateUrlAndExecuteRequestAsync(IRestRequest request,
CancellationToken cancellationToken)
{
- Uri yarnRmUri = await _yarnRmUrlProviderUri.GetUrlAsync();
- return
- await
- _restRequestExecutor.ExecuteAsync(request, yarnRmUri, cancellationToken);
+ IEnumerable<Uri> yarnRmUris = await _yarnRmUrlProviderUri.GetUrlAsync();
+ var exceptions = new List<Exception>();
+ foreach (var yarnRmUri in yarnRmUris)
+ {
+ try
+ {
+ return
+ await
+ _restRequestExecutor.ExecuteAsync(request, yarnRmUri, cancellationToken);
+ }
+ catch (Exception e)
+ {
+ exceptions.Add(e);
+ Logger.Log(Level.Verbose, string.Format(
+ CultureInfo.CurrentCulture, "Possibly transient error in rest call {0}", e.Message));
+ }
+ }
+
+ throw new AggregateException("Failed Rest Request", exceptions);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnConfigurationUrlProvider.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnConfigurationUrlProvider.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnConfigurationUrlProvider.cs
index 0860f51..d46d73a 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnConfigurationUrlProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/YarnConfigurationUrlProvider.cs
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
@@ -42,7 +43,7 @@ namespace Org.Apache.REEF.Client.Yarn.RestClient
private static readonly string YarnRmWebappHttpsAddressPropertyName = "yarn.resourcemanager.webapp.https.address";
private static readonly string YarnRmWebappHttpAddressPropertyName = "yarn.resourcemanager.webapp.address";
private static readonly Logger Logger = Logger.GetLogger(typeof(YarnConfigurationUrlProvider));
- private Uri _yarnRmUri;
+ private IList<Uri> _yarnRmUri;
[Inject]
private YarnConfigurationUrlProvider(
@@ -69,9 +70,9 @@ namespace Org.Apache.REEF.Client.Yarn.RestClient
LoadYarnConfiguration(yarnConfigurationFile, useHttps);
}
- public Task<Uri> GetUrlAsync()
+ public Task<IEnumerable<Uri>> GetUrlAsync()
{
- return Task.FromResult(_yarnRmUri);
+ return Task.FromResult((IEnumerable<Uri>)_yarnRmUri);
}
private void LoadYarnConfiguration(string yarnConfigurationFile, bool useHttps)
@@ -91,7 +92,7 @@ namespace Org.Apache.REEF.Client.Yarn.RestClient
address = address.TrimEnd('/') + @"/";
- _yarnRmUri = new Uri(string.Format("{0}://{1}", prefix, address));
+ _yarnRmUri = new List<Uri> { new Uri(string.Format("{0}://{1}", prefix, address)) };
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
index b9f71d1..acec5e7 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
@@ -18,13 +18,18 @@
*/
using System;
+using System.Globalization;
using System.IO;
using System.Linq;
+using System.Threading.Tasks;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Avro;
using Org.Apache.REEF.Client.Avro.YARN;
using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn.RestClient;
+using Org.Apache.REEF.Client.YARN;
using Org.Apache.REEF.Client.YARN.Parameters;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Driver.Bridge;
@@ -48,14 +53,15 @@ namespace Org.Apache.REEF.Client.Yarn
private readonly string _securityTokenKind;
private readonly string _securityTokenService;
private readonly string _jobSubmissionPrefix;
- private String _driverUrl;
- private REEFFileNames _fileNames;
+ private readonly REEFFileNames _fileNames;
+ private readonly IYarnRMClient _yarnClient;
[Inject]
internal YarnREEFClient(JavaClientLauncher javaClientLauncher,
DriverFolderPreparationHelper driverFolderPreparationHelper,
REEFFileNames fileNames,
YarnCommandLineEnvironment yarn,
+ IYarnRMClient yarnClient,
[Parameter(typeof(SecurityTokenKindParameter))] string securityTokenKind,
[Parameter(typeof(SecurityTokenServiceParameter))] string securityTokenService,
[Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string jobSubmissionPrefix)
@@ -67,6 +73,7 @@ namespace Org.Apache.REEF.Client.Yarn
_javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList());
_driverFolderPreparationHelper = driverFolderPreparationHelper;
_fileNames = fileNames;
+ _yarnClient = yarnClient;
}
public void Submit(IJobSubmission jobSubmission)
@@ -78,7 +85,7 @@ namespace Org.Apache.REEF.Client.Yarn
Launch(jobSubmission, driverFolderPath);
}
- public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission)
+ public IJobSubmissionResult SubmitAndGetJobStatus(IJobSubmission jobSubmission)
{
// Prepare the job submission folder
var driverFolderPath = CreateDriverFolder(jobSubmission.JobIdentifier);
@@ -87,11 +94,28 @@ namespace Org.Apache.REEF.Client.Yarn
Launch(jobSubmission, driverFolderPath);
var pointerFileName = Path.Combine(driverFolderPath, _fileNames.DriverHttpEndpoint);
+ var jobSubmitionResultImpl = new YarnJobSubmissionResult(this, pointerFileName);
- var httpClient = new HttpClientHelper();
- _driverUrl = httpClient.GetDriverUrlForYarn(pointerFileName);
+ var msg = string.Format(CultureInfo.CurrentCulture,
+ "Submitted the Driver for execution. Returned driverUrl is: {0}, appId is {1}.",
+ jobSubmitionResultImpl.DriverUrl, jobSubmitionResultImpl.AppId);
+ Logger.Log(Level.Info, msg);
- return httpClient;
+ return jobSubmitionResultImpl;
+ }
+
+ /// <summary>
+ /// Pull Job status from Yarn for the given appId
+ /// </summary>
+ /// <returns></returns>
+ public async Task<FinalState> GetJobFinalStatus(string appId)
+ {
+ var application = await _yarnClient.GetApplicationAsync(appId);
+
+ Logger.Log(Level.Verbose, string.Format("application status {0}, Progress: {1}, trackingUri: {2}, Name: {3}, ApplicationId: {4}, State {5}.",
+ application.FinalStatus, application.Progress, application.TrackingUI, application.Name, application.Id, application.State));
+
+ return application.FinalStatus;
}
private void Launch(IJobSubmission jobSubmission, string driverFolderPath)
@@ -139,11 +163,6 @@ namespace Org.Apache.REEF.Client.Yarn
Logger.Log(Level.Info, "Submitted the Driver for execution." + jobSubmission.JobIdentifier);
}
- public string DriverUrl
- {
- get { return _driverUrl; }
- }
-
/// <summary>
/// Creates the temporary directory to hold the job submission.
/// </summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Client/YARN/YarnJobSubmissionResult.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnJobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnJobSubmissionResult.cs
new file mode 100644
index 0000000..9512103
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnJobSubmissionResult.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ internal class YarnJobSubmissionResult : JobSubmissionResult
+ {
+ internal YarnJobSubmissionResult(IREEFClient reefClient, string filePath)
+ : base(reefClient, filePath)
+ {
+ }
+
+ protected override string GetDriverUrl(string filepath)
+ {
+ return GetTrackingUrlAppId(filepath);
+ }
+
+ private string GetTrackingUrlAppId(string filepath)
+ {
+ if (!File.Exists(filepath))
+ {
+ throw new ApplicationException(string.Format("File {0} deosn't exist while trying to get tracking Uri", filepath));
+ }
+
+ using (var sr = new StreamReader(File.Open(filepath, FileMode.Open, FileAccess.Read, FileShare.Read)))
+ {
+ _appId = sr.ReadLine();
+ var trackingUrl = sr.ReadLine();
+ return "http://" + trackingUrl + "/";
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
index 57d01e4..127603b 100644
--- a/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.AllHandlers/AllHandlers.cs
@@ -51,7 +51,7 @@ namespace Org.Apache.REEF.Examples.AllHandlers
_jobSubmissionBuilderFactory = jobSubmissionBuilderFactory;
}
- private IDriverHttpEndpoint Run()
+ private IJobSubmissionResult Run()
{
var helloDriverConfiguration = DriverConfiguration.ConfigurationModule
.Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<HelloAllocatedEvaluatorHandler>.Class)
@@ -84,8 +84,8 @@ namespace Org.Apache.REEF.Examples.AllHandlers
.SetJobIdentifier("HelloDriver")
.Build();
- IDriverHttpEndpoint driverHttpEndpoint = _reefClient.SubmitAndGetDriverUrl(helloJobSubmission);
- return driverHttpEndpoint;
+ IJobSubmissionResult jobSubmissionResult = _reefClient.SubmitAndGetJobStatus(helloJobSubmission);
+ return jobSubmissionResult;
}
/// <summary></summary>
@@ -125,12 +125,12 @@ namespace Org.Apache.REEF.Examples.AllHandlers
/// args[0] specify either running local or YARN. Default is local
/// args[1] specify running folder. Default is REEF_LOCAL_RUNTIME
/// </remarks>
- public static IDriverHttpEndpoint Run(string[] args)
+ public static IJobSubmissionResult Run(string[] args)
{
string runOnYarn = args.Length > 0 ? args[0] : Local;
string runtimeFolder = args.Length > 1 ? args[1] : "REEF_LOCAL_RUNTIME";
- IDriverHttpEndpoint driverEndPoint = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, runtimeFolder)).GetInstance<AllHandlers>().Run();
- return driverEndPoint;
+ IJobSubmissionResult jobSubmissionResult = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, runtimeFolder)).GetInstance<AllHandlers>().Run();
+ return jobSubmissionResult;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
index 59e3f3a..ae40b9c 100644
--- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
@@ -76,7 +76,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
.SetJobIdentifier("DriverRestart")
.Build();
- _reefClient.SubmitAndGetDriverUrl(restartJobSubmission);
+ _reefClient.SubmitAndGetJobStatus(restartJobSubmission);
}
public static void Main(string[] args)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 803d360..f759005 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -26,6 +26,7 @@ using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Examples.HelloREEF
{
@@ -34,6 +35,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
/// </summary>
public sealed class HelloREEF
{
+ private static readonly Logger Logger = Logger.GetLogger(typeof(HelloREEF));
private const string Local = "local";
private const string YARN = "yarn";
private readonly IREEFClient _reefClient;
@@ -63,7 +65,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
.SetJobIdentifier("HelloREEF")
.Build();
- _reefClient.SubmitAndGetDriverUrl(helloJobSubmission);
+ _reefClient.SubmitAndGetJobStatus(helloJobSubmission);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
index 74b3efa..bf8315b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
@@ -41,6 +41,6 @@ namespace Org.Apache.REEF.IMRU.API
/// For example: see InProcessIMRUCLient.cs
/// </summary>
[Unstable("0.13", "This depends on IREEFClient API which itself in unstable ")]
- IDriverHttpEndpoint DriverHttpEndpoint { get; }
+ IJobSubmissionResult JobSubmissionResult { get; }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4fbc9038/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
index 54c2f23..54aa4a1 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
@@ -102,7 +102,7 @@ namespace Org.Apache.REEF.IMRU.InProcess
/// <summary>
/// DriverHttpEndPoint returned by IReefClient after job submission
/// </summary>
- public IDriverHttpEndpoint DriverHttpEndpoint
+ public IJobSubmissionResult JobSubmissionResult
{
get { return null; }
}