You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/06/19 17:57:34 UTC
[reef] branch master updated: [REEF-2021]In AzureBatch Runtime,
Enable REEF .NET Client Communication to Driver (#1468)
This is an automated email from the ASF dual-hosted git repository.
motus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/reef.git
The following commit(s) were added to refs/heads/master by this push:
new b96c045 [REEF-2021]In AzureBatch Runtime, Enable REEF .NET Client Communication to Driver (#1468)
b96c045 is described below
commit b96c04526cc9f4ad7554c8aed118799bc644244a
Author: Chenxi Zhao <ch...@microsoft.com>
AuthorDate: Tue Jun 19 10:57:30 2018 -0700
[REEF-2021]In AzureBatch Runtime, Enable REEF .NET Client Communication to Driver (#1468)
This update enables REEF.NET Client-Driver communication by using `InBoundNATPool` features provided by Azure Batch Pool. More Info: https://docs.microsoft.com/en-us/rest/api/batchservice/pool/add#inboundnatpool
JIRA: [REEF-2021](https://issues.apache.org/jira/browse/REEF-2021)
Pull Request:
This closes #1468
---
.../AvroAzureBatchJobSubmissionParameters.cs | 9 +-
.../AzureBatch/AzureBatchDotNetClient.cs | 37 ++++++--
.../AzureBatch/AzureBatchJobSubmissionResult.cs | 98 ++++++++++++++++++++++
.../AzureBatchRuntimeClientConfiguration.cs | 10 +++
.../Parameters/AzureBatchPoolDriverPortsList.cs | 26 ++++++
.../AzureBatch/Service/AzureBatchService.cs | 11 +++
.../AzureBatch/Util/JobJarMaker.cs | 7 +-
.../Common/AllErrorsTransientStrategy.cs | 30 +++++++
.../Common/DriverFolderPreparationHelper.cs | 17 ----
.../Common/JobSubmissionResult.cs | 34 ++++++--
.../YARN/RESTClient/HttpClientRetryHandler.cs | 9 +-
.../HelloREEF.cs | 6 ++
.../src/main/avro/JobSubmissionParameters.avsc | 3 +-
.../client/AzureBatchBootstrapREEFLauncher.java | 39 +++++++--
lang/java/reef-runtime-azbatch/pom.xml | 1 +
.../wake/remote/ports/ListTcpPortProvider.java | 60 +++++++++++++
.../wake/remote/ports/parameters/TcpPortList.java | 37 ++++++++
17 files changed, 383 insertions(+), 51 deletions(-)
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
index 6409dc8..d0c1e14 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+using System.Collections.Generic;
using System.Runtime.Serialization;
using Org.Apache.REEF.Utilities.Attributes;
@@ -30,7 +31,7 @@ namespace Org.Apache.REEF.Client.Avro.AzureBatch
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroAzureBatchJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{"" [...]
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{"" [...]
/// <summary>
/// Gets the schema.
@@ -80,6 +81,12 @@ namespace Org.Apache.REEF.Client.Avro.AzureBatch
public string AzureStorageContainerName { get; set; }
/// <summary>
+ /// Gets or sets the AzureBatchPoolDriverPortsList field.
+ /// </summary>
+ [DataMember]
+ public IList<string> AzureBatchPoolDriverPortsList { get; set; }
+
+ /// <summary>
/// Initializes a new instance of the <see cref="AvroAzureBatchJobSubmissionParameters"/> class.
/// </summary>
public AvroAzureBatchJobSubmissionParameters()
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
index 7f9bdb3..d2721a4 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
@@ -27,6 +27,8 @@ using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Client.AzureBatch.Parameters;
+using Org.Apache.REEF.Client.API.Parameters;
namespace Org.Apache.REEF.Client.DotNet.AzureBatch
{
@@ -45,6 +47,8 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
private readonly AzureBatchService _batchService;
private readonly JobJarMaker _jobJarMaker;
private readonly AzureBatchFileNames _azbatchFileNames;
+ private readonly int _retryInterval;
+ private readonly int _numberOfRetries;
[Inject]
private AzureBatchDotNetClient(
@@ -56,7 +60,12 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
AzureBatchFileNames azbatchFileNames,
JobRequestBuilderFactory jobRequestBuilderFactory,
AzureBatchService batchService,
- JobJarMaker jobJarMaker)
+ JobJarMaker jobJarMaker,
+ //// Those parameters are used in AzureBatchJobSubmissionResult, but could not be injected there.
+ //// It introduces circular injection issues, as all classes constructor inherited from JobSubmissionResult has reference to IREEFClient.
+ //// TODO: [REEF-2020] Refactor IJobSubmissionResult Interface and JobSubmissionResult implementation
+ [Parameter(typeof(DriverHTTPConnectionRetryInterval))]int retryInterval,
+ [Parameter(typeof(DriverHTTPConnectionAttempts))] int numberOfRetries)
{
_injector = injector;
_fileNames = fileNames;
@@ -66,6 +75,8 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
_jobRequestBuilderFactory = jobRequestBuilderFactory;
_batchService = batchService;
_jobJarMaker = jobJarMaker;
+ _retryInterval = retryInterval;
+ _numberOfRetries = numberOfRetries;
}
public JobRequestBuilder NewJobRequestBuilder()
@@ -81,6 +92,22 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
public void Submit(JobRequest jobRequest)
{
+ JobSubmitInternal(jobRequest);
+ }
+
+ public IJobSubmissionResult SubmitAndGetJobStatus(JobRequest jobRequest)
+ {
+ string azureJobId = JobSubmitInternal(jobRequest);
+ return new AzureBatchJobSubmissionResult(this,
+ _fileNames.DriverHttpEndpoint,
+ azureJobId,
+ _numberOfRetries,
+ _retryInterval,
+ _batchService);
+ }
+
+ private string JobSubmitInternal(JobRequest jobRequest)
+ {
var configModule = AzureBatchRuntimeClientConfiguration.ConfigurationModule;
string jobId = jobRequest.JobIdentifier;
string azureBatchjobId = CreateAzureJobId(jobId);
@@ -90,6 +117,7 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
Uri blobUri = _azureStorageClient.UploadFile(destination, jarPath).Result;
string sasToken = _azureStorageClient.CreateContainerSharedAccessSignature();
_batchService.CreateJob(azureBatchjobId, blobUri, commandLine, sasToken);
+ return azureBatchjobId;
}
private string GetCommand(JobParameters jobParameters)
@@ -121,13 +149,6 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
return command;
}
- public IJobSubmissionResult SubmitAndGetJobStatus(JobRequest jobRequest)
- {
- Submit(jobRequest);
- /// Azure Batch is not able to comminicate to client through driver end point. It behaves the same as Submit(JobRequest jobRequest).
- return null;
- }
-
private string CreateAzureJobId(string jobId)
{
string guid = Guid.NewGuid().ToString();
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchJobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchJobSubmissionResult.cs
new file mode 100644
index 0000000..be84fc6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchJobSubmissionResult.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using Microsoft.Azure.Batch;
+using Microsoft.Azure.Batch.Common;
+using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.DotNet.AzureBatch;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.AzureBatch
+{
+ internal class AzureBatchJobSubmissionResult : JobSubmissionResult
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(AzureBatchJobSubmissionResult));
+ private const string AzureBatchTaskWorkDirectory = "wd";
+ private readonly AzureBatchService _azurebatchService;
+ private readonly string _jobId;
+ private readonly RetryPolicy _policy;
+
+ internal AzureBatchJobSubmissionResult(IREEFClient reefClient,
+ string filePath,
+ string jobId,
+ int numberOfRetries,
+ int retryInterval,
+ AzureBatchService azbatchService) : base(reefClient, filePath, numberOfRetries, retryInterval)
+ {
+ _jobId = jobId;
+ _azurebatchService = azbatchService;
+ _policy = new RetryPolicy<AllErrorsTransientStrategy>(numberOfRetries, TimeSpan.FromMilliseconds(retryInterval));
+ }
+
+ protected override string GetDriverUrl(string filepath)
+ {
+ return _policy.ExecuteAction(() => GetDriverUrlInternal(filepath));
+ }
+
+ private string GetDriverUrlInternal(string filepath)
+ {
+ CloudTask driverTask = _azurebatchService.GetJobManagerTaskFromJobId(_jobId);
+
+ NodeFile httpEndPointFile;
+ try
+ {
+ httpEndPointFile = driverTask.GetNodeFile(Path.Combine(AzureBatchTaskWorkDirectory, filepath));
+ }
+ catch (BatchException e)
+ {
+ throw new InvalidOperationException("driver http endpoint file is not ready.", e);
+ }
+
+ string driverHost = httpEndPointFile.ReadAsString().TrimEnd('\r', '\n', ' ');
+
+ //// Get port
+ string[] driverIpAndPorts = driverHost.Split(':');
+ if (driverIpAndPorts.Length <= 1 || !int.TryParse(driverIpAndPorts[1], out int backendPort))
+ {
+ LOGGER.Log(Level.Warning, "Unable to get driver http endpoint port from: {0}", driverHost);
+ return null;
+ }
+
+ //// Get public Ip
+ string publicIp = "0.0.0.0";
+ int frontEndPort = 0;
+ string driverNodeId = driverTask.ComputeNodeInformation.ComputeNodeId;
+ ComputeNode driverNode = _azurebatchService.GetComputeNodeFromNodeId(driverNodeId);
+ IReadOnlyList<InboundEndpoint> inboundEndpoints = driverNode.EndpointConfiguration.InboundEndpoints;
+ InboundEndpoint endpoint = inboundEndpoints.FirstOrDefault(s => s.BackendPort == backendPort);
+
+ if (endpoint != null)
+ {
+ publicIp = endpoint.PublicIPAddress;
+ frontEndPort = endpoint.FrontendPort;
+ }
+
+ return "http://" + publicIp + ':' + frontEndPort + '/';
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
index 31db860..195e2ce 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
@@ -16,12 +16,14 @@
// under the License.
using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.API.Parameters;
using Org.Apache.REEF.Client.AzureBatch.Parameters;
using Org.Apache.REEF.Client.DotNet.AzureBatch;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using System;
+using System.Collections.Generic;
using System.IO;
namespace Org.Apache.REEF.Client.AzureBatch
@@ -42,6 +44,11 @@ namespace Org.Apache.REEF.Client.AzureBatch
public static readonly RequiredParameter<string> AzureStorageAccountKey = new RequiredParameter<string>();
public static readonly RequiredParameter<string> AzureStorageContainerName = new RequiredParameter<string>();
+ public static readonly OptionalParameter<int> DriverHTTPConnectionRetryInterval = new OptionalParameter<int>();
+ public static readonly OptionalParameter<int> DriverHTTPConnectionAttempts = new OptionalParameter<int>();
+
+ public static readonly OptionalParameter<IList<string>> AzureBatchPoolDriverPortsList = new OptionalParameter<IList<string>>();
+
public static ConfigurationModule ConfigurationModule = new AzureBatchRuntimeClientConfiguration()
.BindImplementation(GenericType<IREEFClient>.Class, GenericType<AzureBatchDotNetClient>.Class)
.BindNamedParameter(GenericType<AzureBatchAccountUri>.Class, AzureBatchAccountUri)
@@ -51,6 +58,9 @@ namespace Org.Apache.REEF.Client.AzureBatch
.BindNamedParameter(GenericType<AzureStorageAccountName>.Class, AzureStorageAccountName)
.BindNamedParameter(GenericType<AzureStorageAccountKey>.Class, AzureStorageAccountKey)
.BindNamedParameter(GenericType<AzureStorageContainerName>.Class, AzureStorageContainerName)
+ .BindNamedParameter(GenericType<DriverHTTPConnectionRetryInterval>.Class, DriverHTTPConnectionRetryInterval)
+ .BindNamedParameter(GenericType<DriverHTTPConnectionAttempts>.Class, DriverHTTPConnectionAttempts)
+ .BindNamedParameter(GenericType<AzureBatchPoolDriverPortsList>.Class, AzureBatchPoolDriverPortsList)
.Build();
public static IConfiguration FromTextFile(string file)
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/AzureBatchPoolDriverPortsList.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/AzureBatchPoolDriverPortsList.cs
new file mode 100644
index 0000000..4d25eb7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/AzureBatchPoolDriverPortsList.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+ [NamedParameter(Documentation = "The Azure Batch Pool Driver Http Server Ports List")]
+ public sealed class AzureBatchPoolDriverPortsList : Name<IList<string>>
+ {
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
index b3e40da..075ce00 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
@@ -133,6 +133,17 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
return this.Client.JobOperations.GetJobAsync(jobId, detailLevel);
}
+ public CloudTask GetJobManagerTaskFromJobId(string jobId)
+ {
+ string driverTaskId = this.Client.JobOperations.GetJob(jobId).JobManagerTask.Id;
+ return this.Client.JobOperations.GetTask(jobId, driverTaskId);
+ }
+
+ public ComputeNode GetComputeNodeFromNodeId(string nodeId)
+ {
+ return this.Client.PoolOperations.GetComputeNode(this.PoolId, nodeId);
+ }
+
#endregion
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
index 4d434d7..4b02691 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
@@ -24,6 +24,7 @@ using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Tang.Annotations;
using System;
+using System.Collections.Generic;
using System.IO;
namespace Org.Apache.REEF.Client.AzureBatch.Util
@@ -44,7 +45,8 @@ namespace Org.Apache.REEF.Client.AzureBatch.Util
[Parameter(typeof(AzureBatchAccountUri))] string azureBatchAccountUri,
[Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId,
[Parameter(typeof(AzureStorageAccountName))] string azureStorageAccountName,
- [Parameter(typeof(AzureStorageContainerName))] string azureStorageContainerName)
+ [Parameter(typeof(AzureStorageContainerName))] string azureStorageContainerName,
+ [Parameter(typeof(AzureBatchPoolDriverPortsList))] List<string> azureBatchPoolDriverPortsList)
{
_resourceArchiveFileGenerator = resourceArchiveFileGenerator;
_driverFolderPreparationHelper = driverFolderPreparationHelper;
@@ -56,6 +58,7 @@ namespace Org.Apache.REEF.Client.AzureBatch.Util
AzureBatchPoolId = azureBatchPoolId,
AzureStorageAccountName = azureStorageAccountName,
AzureStorageContainerName = azureStorageContainerName,
+ AzureBatchPoolDriverPortsList = azureBatchPoolDriverPortsList,
};
}
@@ -76,7 +79,7 @@ namespace Org.Apache.REEF.Client.AzureBatch.Util
string localDriverFolderPath = CreateDriverFolder(azureBatchjobId);
- _driverFolderPreparationHelper.PrepareDriverFolderWithGlobalBridgeJar(jobRequest.AppParameters, localDriverFolderPath);
+ _driverFolderPreparationHelper.PrepareDriverFolder(jobRequest.AppParameters, localDriverFolderPath);
SerializeJobFile(localDriverFolderPath, _avroAzureBatchJobSubmissionParameters);
return _resourceArchiveFileGenerator.CreateArchiveToUpload(localDriverFolderPath);
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/AllErrorsTransientStrategy.cs b/lang/cs/Org.Apache.REEF.Client/Common/AllErrorsTransientStrategy.cs
new file mode 100644
index 0000000..f2dc63b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Common/AllErrorsTransientStrategy.cs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
+
+namespace Org.Apache.REEF.Client.Common
+{
+ internal class AllErrorsTransientStrategy : ITransientErrorDetectionStrategy
+ {
+ public bool IsTransient(Exception ex)
+ {
+ return true;
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
index 90e865d..67f51ba 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
@@ -94,23 +94,6 @@ namespace Org.Apache.REEF.Client.Common
}
/// <summary>
- /// Prepares the working directory for a Driver in driverFolderPath.
- /// </summary>
- /// <param name="appParameters"></param>
- /// <param name="driverFolderPath"></param>
- internal void PrepareDriverFolderWithGlobalBridgeJar(AppParameters appParameters, string driverFolderPath)
- {
- // Add the appParameters into that folder structure
- _fileSets.AddJobFiles(appParameters);
-
- // Add the reef-bridge-client jar to the global files in the manner of JavaClientLauncher.cs.
- _fileSets.AddToGlobalFiles(Directory.GetFiles(JarFolder)
- .Where(jarFile => Path.GetFileName(jarFile).ToLower().StartsWith(ClientConstants.ClientJarFilePrefix)));
-
- InternalPrepareDriverFolder(appParameters, driverFolderPath);
- }
-
- /// <summary>
/// Merges the Configurations in appParameters and serializes them into the right place within driverFolderPath,
/// assuming
/// that points to a Driver's working directory.
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
index 3fc8ff4..0717419 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
@@ -30,7 +30,6 @@ using Microsoft.Practices.TransientFaultHandling;
#endif
using Newtonsoft.Json;
using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.YARN.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
using Org.Apache.REEF.Utilities.Logging;
using HttpClient = System.Net.Http.HttpClient;
@@ -48,14 +47,24 @@ namespace Org.Apache.REEF.Client.Common
private const string AppKey = "app";
private const string ThisIsStandbyRm = "This is standby RM";
private const string AppJson = "application/json";
+ private const int DriverStatusIntervalInMilliSecond = 4000;
- private string _driverUrl;
protected string _appId;
private readonly HttpClient _client;
private readonly IREEFClient _reefClient;
/// <summary>
+ /// Url of http end point of the web server running in the driver
+ /// </summary>
+ private string _driverUrl;
+
+ /// <summary>
+ /// File path to the driver's http endpoint.
+ /// </summary>
+ private readonly string _filePath;
+
+ /// <summary>
/// Number of retries when connecting to the Driver's HTTP endpoint.
/// </summary>
private readonly int _numberOfRetries;
@@ -74,7 +83,7 @@ namespace Org.Apache.REEF.Client.Common
};
_client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(AppJson));
- _driverUrl = GetDriverUrl(filePath);
+ _filePath = filePath;
_numberOfRetries = numberOfRetries;
_retryInterval = TimeSpan.FromMilliseconds(retryInterval);
@@ -85,7 +94,15 @@ namespace Org.Apache.REEF.Client.Common
/// </summary>
public string DriverUrl
{
- get { return _driverUrl; }
+ get
+ {
+ if (_driverUrl == null)
+ {
+ _driverUrl = GetDriverUrl(_filePath);
+ }
+
+ return _driverUrl;
+ }
}
/// <summary>
@@ -125,17 +142,22 @@ namespace Org.Apache.REEF.Client.Common
// We were unable to connect to the Driver at least once.
throw new WebException("Unable to connect to the Driver.");
}
-
+
while (status.IsActive())
{
+ // Add sleep in while loop, whose value alligns with default heart beat interval.
+ Thread.Sleep(DriverStatusIntervalInMilliSecond);
+ LOGGER.Log(Level.Info, "DriverStatus is {0}", status);
+
try
{
status = FetchDriverStatus();
}
- catch (WebException)
+ catch (WebException e)
{
// If we no longer can reach the Driver, it must have exited.
status = DriverStatus.UNKNOWN_EXITED;
+ LOGGER.Log(Level.Warning, "Driver unreacheable. Exiting now.", e);
}
}
}
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs
index 3170042..2fd8ae9 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs
@@ -19,6 +19,7 @@ using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
+using Org.Apache.REEF.Client.Common;
#if REEF_DOTNET_BUILD
using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
@@ -62,12 +63,4 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
cancellationToken);
}
}
-
- internal class AllErrorsTransientStrategy : ITransientErrorDetectionStrategy
- {
- public bool IsTransient(Exception ex)
- {
- return true;
- }
- }
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 406fb36..fede636 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Collections.Generic;
using System.Globalization;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.AzureBatch;
@@ -123,6 +124,11 @@ namespace Org.Apache.REEF.Examples.HelloREEF
.Set(AzureBatchRuntimeClientConfiguration.AzureStorageAccountKey, @"##########################################")
.Set(AzureBatchRuntimeClientConfiguration.AzureStorageAccountName, @"############")
.Set(AzureBatchRuntimeClientConfiguration.AzureStorageContainerName, @"###########")
+ //// Extend default retry interval in Azure Batch
+ .Set(AzureBatchRuntimeClientConfiguration.DriverHTTPConnectionRetryInterval, "20000")
+ //// To allow Driver - Client communication, please specify the ports to use to set up driver http server.
+ //// These ports must be defined in Azure Batch InBoundNATPool.
+ .Set(AzureBatchRuntimeClientConfiguration.AzureBatchPoolDriverPortsList, new List<string>(new string[] { "123", "456" }))
.Build();
default:
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index d3bda9f..92f215f 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -77,7 +77,8 @@
{ "name": "AzureBatchAccountUri", "type": "string" },
{ "name": "AzureBatchPoolId", "type": "string" },
{ "name": "AzureStorageAccountName", "type": "string" },
- { "name": "AzureStorageContainerName", "type": "string" }
+ { "name": "AzureStorageContainerName", "type": "string" },
+ { "name": "AzureBatchPoolDriverPortsList", "type": {"type": "array", "items": "string"}}
]
}
]
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
index 4f2a3f6..69e39f3 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
@@ -40,15 +40,20 @@ import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
import org.apache.reef.runtime.common.files.RuntimePathProvider;
import org.apache.reef.runtime.common.launch.REEFErrorHandler;
import org.apache.reef.runtime.common.launch.REEFMessageCodec;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.*;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.ports.ListTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
import org.apache.reef.wake.time.Clock;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -84,23 +89,33 @@ public final class AzureBatchBootstrapREEFLauncher {
throw fatal(message, new IllegalArgumentException(message));
}
- final AvroAzureBatchJobSubmissionParameters avroAzureBatchJobSubmissionParameters =
+ final AvroAzureBatchJobSubmissionParameters jobSubmissionParameters =
readAvroJobSubmissionParameters(new File(args[0]));
final AzureBatchBootstrapDriverConfigGenerator azureBatchBootstrapDriverConfigGenerator =
- TANG.newInjector(generateConfiguration(avroAzureBatchJobSubmissionParameters))
+ TANG.newInjector(generateConfiguration(jobSubmissionParameters))
.getInstance(AzureBatchBootstrapDriverConfigGenerator.class);
- final Configuration launcherConfig =
+ final JavaConfigurationBuilder launcherConfigBuilder =
TANG.newConfigurationBuilder()
.bindNamedParameter(RemoteConfiguration.ManagerName.class, "AzureBatchBootstrapREEFLauncher")
.bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
.bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
- .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
- .build();
+ .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class);
+
+ // Check if user has set up preferred ports to use.
+ // If set, we prefer will launch driver that binds those ports.
+ final List<String> preferredPorts = asStringList(jobSubmissionParameters.getAzureBatchPoolDriverPortsList());
+
+ if (preferredPorts.size() > 0) {
+ launcherConfigBuilder.bindList(TcpPortList.class, preferredPorts)
+ .bindImplementation(TcpPortProvider.class, ListTcpPortProvider.class);
+ }
+
+ final Configuration launcherConfig = launcherConfigBuilder.build();
try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(
azureBatchBootstrapDriverConfigGenerator.getDriverConfigurationFromParams(
- avroAzureBatchJobSubmissionParameters), launcherConfig)) {
+ jobSubmissionParameters), launcherConfig)) {
reef.run();
} catch (final InjectionException ex) {
throw fatal("Unable to configure and start REEFEnvironment.", ex);
@@ -144,6 +159,14 @@ public final class AzureBatchBootstrapREEFLauncher {
.build();
}
+ private static List<String> asStringList(final Collection<? extends CharSequence> list) {
+ final List<String> result = new ArrayList<>(list.size());
+ for (final CharSequence sequence : list) {
+ result.add(sequence.toString());
+ }
+ return result;
+ }
+
private static RuntimeException fatal(final String msg, final Throwable t) {
LOG.log(Level.SEVERE, msg, t);
return new RuntimeException(msg, t);
diff --git a/lang/java/reef-runtime-azbatch/pom.xml b/lang/java/reef-runtime-azbatch/pom.xml
index 0280e8f..aa1d69b 100644
--- a/lang/java/reef-runtime-azbatch/pom.xml
+++ b/lang/java/reef-runtime-azbatch/pom.xml
@@ -61,6 +61,7 @@ under the License.
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-batch</artifactId>
+ <version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
new file mode 100644
index 0000000..c28a638
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports;
+
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A TcpPortProvider which gives out random ports in a range.
+ */
+public final class ListTcpPortProvider implements TcpPortProvider {
+
+ private static final Logger LOG = Logger.getLogger(ListTcpPortProvider.class.getName());
+ private final List<Integer> tcpPortList;
+
+ @Inject
+ public ListTcpPortProvider(@Parameter(TcpPortList.class) final List<Integer> tcpPortList) {
+ this.tcpPortList = tcpPortList;
+ LOG.log(Level.FINE, "Instantiating {0}", this);
+ }
+
+ /**
+ * Returns an iterator over a set of tcp ports.
+ *
+ * @return an Iterator.
+ */
+ @Override
+ public Iterator<Integer> iterator() {
+ return this.tcpPortList.iterator();
+ }
+
+ @Override
+ public String toString() {
+ return "ListTcpPortProvider{" + StringUtils.join(this.tcpPortList, ',') + '}';
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
new file mode 100644
index 0000000..c7f4fbf
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.List;
+
+/**
+ * An list of tcp port numbers to try.
+ */
+@NamedParameter(doc = "An list of tcp port numbers to try")
+public final class TcpPortList implements Name<List<Integer>> {
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private TcpPortList() {
+ }
+}