You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/06/19 17:57:34 UTC

[reef] branch master updated: [REEF-2021]In AzureBatch Runtime, Enable REEF .NET Client Communication to Driver (#1468)

This is an automated email from the ASF dual-hosted git repository.

motus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/reef.git


The following commit(s) were added to refs/heads/master by this push:
     new b96c045  [REEF-2021]In AzureBatch Runtime, Enable REEF .NET Client Communication to Driver (#1468)
b96c045 is described below

commit b96c04526cc9f4ad7554c8aed118799bc644244a
Author: Chenxi Zhao <ch...@microsoft.com>
AuthorDate: Tue Jun 19 10:57:30 2018 -0700

    [REEF-2021]In AzureBatch Runtime, Enable REEF .NET Client Communication to Driver (#1468)
    
    This update enables REEF.NET Client-Driver communication by using `InBoundNATPool` features provided by Azure Batch Pool. More Info: https://docs.microsoft.com/en-us/rest/api/batchservice/pool/add#inboundnatpool
    
    JIRA: [REEF-2021](https://issues.apache.org/jira/browse/REEF-2021)
    
    Pull Request:
      This closes #1468
---
 .../AvroAzureBatchJobSubmissionParameters.cs       |  9 +-
 .../AzureBatch/AzureBatchDotNetClient.cs           | 37 ++++++--
 .../AzureBatch/AzureBatchJobSubmissionResult.cs    | 98 ++++++++++++++++++++++
 .../AzureBatchRuntimeClientConfiguration.cs        | 10 +++
 .../Parameters/AzureBatchPoolDriverPortsList.cs    | 26 ++++++
 .../AzureBatch/Service/AzureBatchService.cs        | 11 +++
 .../AzureBatch/Util/JobJarMaker.cs                 |  7 +-
 .../Common/AllErrorsTransientStrategy.cs           | 30 +++++++
 .../Common/DriverFolderPreparationHelper.cs        | 17 ----
 .../Common/JobSubmissionResult.cs                  | 34 ++++++--
 .../YARN/RESTClient/HttpClientRetryHandler.cs      |  9 +-
 .../HelloREEF.cs                                   |  6 ++
 .../src/main/avro/JobSubmissionParameters.avsc     |  3 +-
 .../client/AzureBatchBootstrapREEFLauncher.java    | 39 +++++++--
 lang/java/reef-runtime-azbatch/pom.xml             |  1 +
 .../wake/remote/ports/ListTcpPortProvider.java     | 60 +++++++++++++
 .../wake/remote/ports/parameters/TcpPortList.java  | 37 ++++++++
 17 files changed, 383 insertions(+), 51 deletions(-)

diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
index 6409dc8..d0c1e14 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Collections.Generic;
 using System.Runtime.Serialization;
 using Org.Apache.REEF.Utilities.Attributes;
 
@@ -30,7 +31,7 @@ namespace Org.Apache.REEF.Client.Avro.AzureBatch
     [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
     public sealed class AvroAzureBatchJobSubmissionParameters
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{"" [...]
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{"" [...]
 
         /// <summary>
         /// Gets the schema.
@@ -80,6 +81,12 @@ namespace Org.Apache.REEF.Client.Avro.AzureBatch
         public string AzureStorageContainerName { get; set; }
 
         /// <summary>
+        /// Gets or sets the AzureBatchPoolDriverPortsList field.
+        /// </summary>
+        [DataMember]
+        public IList<string> AzureBatchPoolDriverPortsList { get; set; }
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="AvroAzureBatchJobSubmissionParameters"/> class.
         /// </summary>
         public AvroAzureBatchJobSubmissionParameters()
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
index 7f9bdb3..d2721a4 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
@@ -27,6 +27,8 @@ using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Client.AzureBatch.Parameters;
+using Org.Apache.REEF.Client.API.Parameters;
 
 namespace Org.Apache.REEF.Client.DotNet.AzureBatch
 {
@@ -45,6 +47,8 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
         private readonly AzureBatchService _batchService;
         private readonly JobJarMaker _jobJarMaker;
         private readonly AzureBatchFileNames _azbatchFileNames;
+        private readonly int _retryInterval;
+        private readonly int _numberOfRetries;
 
         [Inject]
         private AzureBatchDotNetClient(
@@ -56,7 +60,12 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
             AzureBatchFileNames azbatchFileNames,
             JobRequestBuilderFactory jobRequestBuilderFactory,
             AzureBatchService batchService,
-            JobJarMaker jobJarMaker)
+            JobJarMaker jobJarMaker,
+            //// Those parameters are used in AzureBatchJobSubmissionResult, but could not be injected there.
+            //// It introduces circular injection issues, as all classes constructor inherited from JobSubmissionResult has reference to IREEFClient. 
+            //// TODO: [REEF-2020] Refactor IJobSubmissionResult Interface and JobSubmissionResult implementation
+            [Parameter(typeof(DriverHTTPConnectionRetryInterval))]int retryInterval,
+            [Parameter(typeof(DriverHTTPConnectionAttempts))] int numberOfRetries)
         {
             _injector = injector;
             _fileNames = fileNames;
@@ -66,6 +75,8 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
             _jobRequestBuilderFactory = jobRequestBuilderFactory;
             _batchService = batchService;
             _jobJarMaker = jobJarMaker;
+            _retryInterval = retryInterval;
+            _numberOfRetries = numberOfRetries;
         }
 
         public JobRequestBuilder NewJobRequestBuilder()
@@ -81,6 +92,22 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
 
         public void Submit(JobRequest jobRequest)
         {
+            JobSubmitInternal(jobRequest);
+        }
+
+        public IJobSubmissionResult SubmitAndGetJobStatus(JobRequest jobRequest)
+        {
+            string azureJobId = JobSubmitInternal(jobRequest);
+            return new AzureBatchJobSubmissionResult(this,
+                _fileNames.DriverHttpEndpoint,
+                azureJobId,
+                _numberOfRetries,
+                _retryInterval,
+                _batchService);
+        }
+
+        private string JobSubmitInternal(JobRequest jobRequest)
+        {
             var configModule = AzureBatchRuntimeClientConfiguration.ConfigurationModule;
             string jobId = jobRequest.JobIdentifier;
             string azureBatchjobId = CreateAzureJobId(jobId);
@@ -90,6 +117,7 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
             Uri blobUri = _azureStorageClient.UploadFile(destination, jarPath).Result;
             string sasToken = _azureStorageClient.CreateContainerSharedAccessSignature();
             _batchService.CreateJob(azureBatchjobId, blobUri, commandLine, sasToken);
+            return azureBatchjobId;
         }
 
         private string GetCommand(JobParameters jobParameters)
@@ -121,13 +149,6 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
             return command;
         }
 
-        public IJobSubmissionResult SubmitAndGetJobStatus(JobRequest jobRequest)
-        {
-            Submit(jobRequest);
-            /// Azure Batch is not able to comminicate to client through driver end point. It behaves the same as Submit(JobRequest jobRequest).
-            return null;
-        }
-
         private string CreateAzureJobId(string jobId)
         {
             string guid = Guid.NewGuid().ToString();
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchJobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchJobSubmissionResult.cs
new file mode 100644
index 0000000..be84fc6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchJobSubmissionResult.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using Microsoft.Azure.Batch;
+using Microsoft.Azure.Batch.Common;
+using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.DotNet.AzureBatch;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.AzureBatch
+{
+    internal class AzureBatchJobSubmissionResult : JobSubmissionResult
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(AzureBatchJobSubmissionResult));
+        private const string AzureBatchTaskWorkDirectory = "wd";
+        private readonly AzureBatchService _azurebatchService;
+        private readonly string _jobId;
+        private readonly RetryPolicy _policy;
+
+        internal AzureBatchJobSubmissionResult(IREEFClient reefClient,
+            string filePath,
+            string jobId,
+            int numberOfRetries,
+            int retryInterval,
+            AzureBatchService azbatchService) : base(reefClient, filePath, numberOfRetries, retryInterval)
+        {
+            _jobId = jobId;
+            _azurebatchService = azbatchService;
+            _policy = new RetryPolicy<AllErrorsTransientStrategy>(numberOfRetries, TimeSpan.FromMilliseconds(retryInterval));
+        }
+
+        protected override string GetDriverUrl(string filepath)
+        {
+            return _policy.ExecuteAction(() => GetDriverUrlInternal(filepath));
+        }
+
+        private string GetDriverUrlInternal(string filepath)
+        {
+            CloudTask driverTask = _azurebatchService.GetJobManagerTaskFromJobId(_jobId);
+
+            NodeFile httpEndPointFile;
+            try
+            {
+                httpEndPointFile = driverTask.GetNodeFile(Path.Combine(AzureBatchTaskWorkDirectory, filepath));
+            }
+            catch (BatchException e)
+            {
+                throw new InvalidOperationException("driver http endpoint file is not ready.", e);
+            }
+
+            string driverHost = httpEndPointFile.ReadAsString().TrimEnd('\r', '\n', ' ');
+
+            //// Get port
+            string[] driverIpAndPorts = driverHost.Split(':');
+            if (driverIpAndPorts.Length <= 1 || !int.TryParse(driverIpAndPorts[1], out int backendPort))
+            {
+                LOGGER.Log(Level.Warning, "Unable to get driver http endpoint port from: {0}", driverHost);
+                return null;
+            }
+
+            //// Get public Ip
+            string publicIp = "0.0.0.0";
+            int frontEndPort = 0;
+            string driverNodeId = driverTask.ComputeNodeInformation.ComputeNodeId;
+            ComputeNode driverNode = _azurebatchService.GetComputeNodeFromNodeId(driverNodeId);
+            IReadOnlyList<InboundEndpoint> inboundEndpoints = driverNode.EndpointConfiguration.InboundEndpoints;
+            InboundEndpoint endpoint = inboundEndpoints.FirstOrDefault(s => s.BackendPort == backendPort);
+
+            if (endpoint != null)
+            {
+                publicIp = endpoint.PublicIPAddress;
+                frontEndPort = endpoint.FrontendPort;
+            }
+
+            return "http://" + publicIp + ':' + frontEndPort + '/';
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
index 31db860..195e2ce 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
@@ -16,12 +16,14 @@
 // under the License.
 
 using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.API.Parameters;
 using Org.Apache.REEF.Client.AzureBatch.Parameters;
 using Org.Apache.REEF.Client.DotNet.AzureBatch;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using System;
+using System.Collections.Generic;
 using System.IO;
 
 namespace Org.Apache.REEF.Client.AzureBatch
@@ -42,6 +44,11 @@ namespace Org.Apache.REEF.Client.AzureBatch
         public static readonly RequiredParameter<string> AzureStorageAccountKey = new RequiredParameter<string>();
         public static readonly RequiredParameter<string> AzureStorageContainerName = new RequiredParameter<string>();
 
+        public static readonly OptionalParameter<int> DriverHTTPConnectionRetryInterval = new OptionalParameter<int>();
+        public static readonly OptionalParameter<int> DriverHTTPConnectionAttempts = new OptionalParameter<int>();
+
+        public static readonly OptionalParameter<IList<string>> AzureBatchPoolDriverPortsList = new OptionalParameter<IList<string>>();
+
         public static ConfigurationModule ConfigurationModule = new AzureBatchRuntimeClientConfiguration()
             .BindImplementation(GenericType<IREEFClient>.Class, GenericType<AzureBatchDotNetClient>.Class)
             .BindNamedParameter(GenericType<AzureBatchAccountUri>.Class, AzureBatchAccountUri)
@@ -51,6 +58,9 @@ namespace Org.Apache.REEF.Client.AzureBatch
             .BindNamedParameter(GenericType<AzureStorageAccountName>.Class, AzureStorageAccountName)
             .BindNamedParameter(GenericType<AzureStorageAccountKey>.Class, AzureStorageAccountKey)
             .BindNamedParameter(GenericType<AzureStorageContainerName>.Class, AzureStorageContainerName)
+            .BindNamedParameter(GenericType<DriverHTTPConnectionRetryInterval>.Class, DriverHTTPConnectionRetryInterval)
+            .BindNamedParameter(GenericType<DriverHTTPConnectionAttempts>.Class, DriverHTTPConnectionAttempts)
+            .BindNamedParameter(GenericType<AzureBatchPoolDriverPortsList>.Class, AzureBatchPoolDriverPortsList)
             .Build();
 
         public static IConfiguration FromTextFile(string file)
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/AzureBatchPoolDriverPortsList.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/AzureBatchPoolDriverPortsList.cs
new file mode 100644
index 0000000..4d25eb7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/AzureBatchPoolDriverPortsList.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+    [NamedParameter(Documentation = "The Azure Batch Pool Driver Http Server Ports List")]
+    public sealed class AzureBatchPoolDriverPortsList : Name<IList<string>>
+    {
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
index b3e40da..075ce00 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
@@ -133,6 +133,17 @@ namespace Org.Apache.REEF.Client.DotNet.AzureBatch
             return this.Client.JobOperations.GetJobAsync(jobId, detailLevel);
         }
 
+        public CloudTask GetJobManagerTaskFromJobId(string jobId)
+        {
+            string driverTaskId = this.Client.JobOperations.GetJob(jobId).JobManagerTask.Id;
+            return this.Client.JobOperations.GetTask(jobId, driverTaskId);
+        }
+
+        public ComputeNode GetComputeNodeFromNodeId(string nodeId)
+        {
+            return this.Client.PoolOperations.GetComputeNode(this.PoolId, nodeId);
+        }
+
         #endregion
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
index 4d434d7..4b02691 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
@@ -24,6 +24,7 @@ using Org.Apache.REEF.Common.Avro;
 using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.Tang.Annotations;
 using System;
+using System.Collections.Generic;
 using System.IO;
 
 namespace Org.Apache.REEF.Client.AzureBatch.Util
@@ -44,7 +45,8 @@ namespace Org.Apache.REEF.Client.AzureBatch.Util
             [Parameter(typeof(AzureBatchAccountUri))] string azureBatchAccountUri,
             [Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId,
             [Parameter(typeof(AzureStorageAccountName))] string azureStorageAccountName,
-            [Parameter(typeof(AzureStorageContainerName))] string azureStorageContainerName)
+            [Parameter(typeof(AzureStorageContainerName))] string azureStorageContainerName,
+            [Parameter(typeof(AzureBatchPoolDriverPortsList))] List<string> azureBatchPoolDriverPortsList)
         {
             _resourceArchiveFileGenerator = resourceArchiveFileGenerator;
             _driverFolderPreparationHelper = driverFolderPreparationHelper;
@@ -56,6 +58,7 @@ namespace Org.Apache.REEF.Client.AzureBatch.Util
                 AzureBatchPoolId = azureBatchPoolId,
                 AzureStorageAccountName = azureStorageAccountName,
                 AzureStorageContainerName = azureStorageContainerName,
+                AzureBatchPoolDriverPortsList = azureBatchPoolDriverPortsList,
             };
         }
 
@@ -76,7 +79,7 @@ namespace Org.Apache.REEF.Client.AzureBatch.Util
 
             string localDriverFolderPath = CreateDriverFolder(azureBatchjobId);
 
-            _driverFolderPreparationHelper.PrepareDriverFolderWithGlobalBridgeJar(jobRequest.AppParameters, localDriverFolderPath);
+            _driverFolderPreparationHelper.PrepareDriverFolder(jobRequest.AppParameters, localDriverFolderPath);
             SerializeJobFile(localDriverFolderPath, _avroAzureBatchJobSubmissionParameters);
 
             return _resourceArchiveFileGenerator.CreateArchiveToUpload(localDriverFolderPath);
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/AllErrorsTransientStrategy.cs b/lang/cs/Org.Apache.REEF.Client/Common/AllErrorsTransientStrategy.cs
new file mode 100644
index 0000000..f2dc63b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Common/AllErrorsTransientStrategy.cs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
+
+namespace Org.Apache.REEF.Client.Common
+{
+    internal class AllErrorsTransientStrategy : ITransientErrorDetectionStrategy
+    {
+        public bool IsTransient(Exception ex)
+        {
+            return true;
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
index 90e865d..67f51ba 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/DriverFolderPreparationHelper.cs
@@ -94,23 +94,6 @@ namespace Org.Apache.REEF.Client.Common
         }
 
         /// <summary>
-        /// Prepares the working directory for a Driver in driverFolderPath.
-        /// </summary>
-        /// <param name="appParameters"></param>
-        /// <param name="driverFolderPath"></param>
-        internal void PrepareDriverFolderWithGlobalBridgeJar(AppParameters appParameters, string driverFolderPath)
-        {
-            // Add the appParameters into that folder structure
-            _fileSets.AddJobFiles(appParameters);
-
-            // Add the reef-bridge-client jar to the global files in the manner of JavaClientLauncher.cs.
-            _fileSets.AddToGlobalFiles(Directory.GetFiles(JarFolder)
-                .Where(jarFile => Path.GetFileName(jarFile).ToLower().StartsWith(ClientConstants.ClientJarFilePrefix)));
-
-            InternalPrepareDriverFolder(appParameters, driverFolderPath);
-        }
-
-        /// <summary>
         /// Merges the Configurations in appParameters and serializes them into the right place within driverFolderPath,
         /// assuming
         /// that points to a Driver's working directory.
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
index 3fc8ff4..0717419 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JobSubmissionResult.cs
@@ -30,7 +30,6 @@ using Microsoft.Practices.TransientFaultHandling;
 #endif
 using Newtonsoft.Json;
 using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.YARN.RestClient;
 using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
 using Org.Apache.REEF.Utilities.Logging;
 using HttpClient = System.Net.Http.HttpClient;
@@ -48,14 +47,24 @@ namespace Org.Apache.REEF.Client.Common
         private const string AppKey = "app";
         private const string ThisIsStandbyRm = "This is standby RM";
         private const string AppJson = "application/json";
+        private const int DriverStatusIntervalInMilliSecond = 4000;
 
-        private string _driverUrl;
         protected string _appId;
 
         private readonly HttpClient _client;
         private readonly IREEFClient _reefClient;
 
         /// <summary>
+        /// Url of http end point of the web server running in the driver
+        /// </summary>
+        private string _driverUrl;
+
+        /// <summary>
+        /// File path to the driver's http endpoint.
+        /// </summary>
+        private readonly string _filePath;
+
+        /// <summary>
         /// Number of retries when connecting to the Driver's HTTP endpoint.
         /// </summary>
         private readonly int _numberOfRetries;
@@ -74,7 +83,7 @@ namespace Org.Apache.REEF.Client.Common
             };
             _client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(AppJson));
 
-            _driverUrl = GetDriverUrl(filePath);
+            _filePath = filePath;
 
             _numberOfRetries = numberOfRetries;
             _retryInterval = TimeSpan.FromMilliseconds(retryInterval);
@@ -85,7 +94,15 @@ namespace Org.Apache.REEF.Client.Common
         /// </summary>
         public string DriverUrl
         {
-            get { return _driverUrl; }
+            get
+            {
+                if (_driverUrl == null)
+                {
+                    _driverUrl = GetDriverUrl(_filePath);
+                }
+
+                return _driverUrl;
+            }
         }
 
         /// <summary>
@@ -125,17 +142,22 @@ namespace Org.Apache.REEF.Client.Common
                 // We were unable to connect to the Driver at least once.
                 throw new WebException("Unable to connect to the Driver.");
             }
-            
+
             while (status.IsActive())
             {
+                // Add sleep in while loop, whose value alligns with default heart beat interval.
+                Thread.Sleep(DriverStatusIntervalInMilliSecond);
+                LOGGER.Log(Level.Info, "DriverStatus is {0}", status);
+
                 try
                 {
                     status = FetchDriverStatus();
                 }
-                catch (WebException)
+                catch (WebException e)
                 {
                     // If we no longer can reach the Driver, it must have exited.
                     status = DriverStatus.UNKNOWN_EXITED;
+                    LOGGER.Log(Level.Warning, "Driver unreacheable. Exiting now.", e);
                 }
             }
         }
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs
index 3170042..2fd8ae9 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/HttpClientRetryHandler.cs
@@ -19,6 +19,7 @@ using System;
 using System.Net.Http;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Client.Common;
 
 #if REEF_DOTNET_BUILD
 using Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling;
@@ -62,12 +63,4 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
                 cancellationToken);
         }
     }
-
-    internal class AllErrorsTransientStrategy : ITransientErrorDetectionStrategy
-    {
-        public bool IsTransient(Exception ex)
-        {
-            return true;
-        }
-    }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 406fb36..fede636 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using System.Globalization;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.AzureBatch;
@@ -123,6 +124,11 @@ namespace Org.Apache.REEF.Examples.HelloREEF
                         .Set(AzureBatchRuntimeClientConfiguration.AzureStorageAccountKey, @"##########################################")
                         .Set(AzureBatchRuntimeClientConfiguration.AzureStorageAccountName, @"############")
                         .Set(AzureBatchRuntimeClientConfiguration.AzureStorageContainerName, @"###########")
+                        //// Extend default retry interval in Azure Batch
+                        .Set(AzureBatchRuntimeClientConfiguration.DriverHTTPConnectionRetryInterval, "20000")
+                        //// To allow Driver - Client communication, please specify the ports to use to set up driver http server.
+                        //// These ports must be defined in Azure Batch InBoundNATPool.
+                        .Set(AzureBatchRuntimeClientConfiguration.AzureBatchPoolDriverPortsList, new List<string>(new string[] { "123", "456" }))
                         .Build();
 
                 default:
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index d3bda9f..92f215f 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -77,7 +77,8 @@
         { "name": "AzureBatchAccountUri", "type": "string" },
         { "name": "AzureBatchPoolId", "type": "string" },
         { "name": "AzureStorageAccountName", "type": "string" },
-        { "name": "AzureStorageContainerName", "type": "string" }
+        { "name": "AzureStorageContainerName", "type": "string" },
+        { "name": "AzureBatchPoolDriverPortsList", "type": {"type": "array", "items": "string"}}
       ]
   }
 ]
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
index 4f2a3f6..69e39f3 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
@@ -40,15 +40,20 @@ import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
 import org.apache.reef.runtime.common.files.RuntimePathProvider;
 import org.apache.reef.runtime.common.launch.REEFErrorHandler;
 import org.apache.reef.runtime.common.launch.REEFMessageCodec;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.*;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.ports.ListTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
 import org.apache.reef.wake.time.Clock;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -84,23 +89,33 @@ public final class AzureBatchBootstrapREEFLauncher {
       throw fatal(message, new IllegalArgumentException(message));
     }
 
-    final AvroAzureBatchJobSubmissionParameters avroAzureBatchJobSubmissionParameters =
+    final AvroAzureBatchJobSubmissionParameters jobSubmissionParameters =
         readAvroJobSubmissionParameters(new File(args[0]));
     final AzureBatchBootstrapDriverConfigGenerator azureBatchBootstrapDriverConfigGenerator =
-        TANG.newInjector(generateConfiguration(avroAzureBatchJobSubmissionParameters))
+        TANG.newInjector(generateConfiguration(jobSubmissionParameters))
             .getInstance(AzureBatchBootstrapDriverConfigGenerator.class);
 
-    final Configuration launcherConfig =
+    final JavaConfigurationBuilder launcherConfigBuilder =
         TANG.newConfigurationBuilder()
             .bindNamedParameter(RemoteConfiguration.ManagerName.class, "AzureBatchBootstrapREEFLauncher")
             .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
             .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
-            .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class)
-            .build();
+            .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class);
+
+    // Check if user has set up preferred ports to use.
+    // If set, we prefer will launch driver that binds those ports.
+    final List<String> preferredPorts = asStringList(jobSubmissionParameters.getAzureBatchPoolDriverPortsList());
+
+    if (preferredPorts.size() > 0) {
+      launcherConfigBuilder.bindList(TcpPortList.class, preferredPorts)
+          .bindImplementation(TcpPortProvider.class, ListTcpPortProvider.class);
+    }
+
+    final Configuration launcherConfig = launcherConfigBuilder.build();
 
     try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(
         azureBatchBootstrapDriverConfigGenerator.getDriverConfigurationFromParams(
-            avroAzureBatchJobSubmissionParameters), launcherConfig)) {
+            jobSubmissionParameters), launcherConfig)) {
       reef.run();
     } catch (final InjectionException ex) {
       throw fatal("Unable to configure and start REEFEnvironment.", ex);
@@ -144,6 +159,14 @@ public final class AzureBatchBootstrapREEFLauncher {
         .build();
   }
 
+  private static List<String> asStringList(final Collection<? extends CharSequence> list) {
+    final List<String> result = new ArrayList<>(list.size());
+    for (final CharSequence sequence : list) {
+      result.add(sequence.toString());
+    }
+    return result;
+  }
+
   private static RuntimeException fatal(final String msg, final Throwable t) {
     LOG.log(Level.SEVERE, msg, t);
     return new RuntimeException(msg, t);
diff --git a/lang/java/reef-runtime-azbatch/pom.xml b/lang/java/reef-runtime-azbatch/pom.xml
index 0280e8f..aa1d69b 100644
--- a/lang/java/reef-runtime-azbatch/pom.xml
+++ b/lang/java/reef-runtime-azbatch/pom.xml
@@ -61,6 +61,7 @@ under the License.
         <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-batch</artifactId>
+            <version>3.2.0</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
new file mode 100644
index 0000000..c28a638
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports;
+
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A TcpPortProvider which gives out random ports in a range.
+ */
+public final class ListTcpPortProvider implements TcpPortProvider {
+
+  private static final Logger LOG = Logger.getLogger(ListTcpPortProvider.class.getName());
+  private final List<Integer> tcpPortList;
+
+  @Inject
+  public ListTcpPortProvider(@Parameter(TcpPortList.class) final List<Integer> tcpPortList) {
+    this.tcpPortList = tcpPortList;
+    LOG.log(Level.FINE, "Instantiating {0}", this);
+  }
+
+  /**
+   * Returns an iterator over a set of tcp ports.
+   *
+   * @return an Iterator.
+   */
+  @Override
+  public Iterator<Integer> iterator() {
+    return this.tcpPortList.iterator();
+  }
+
+  @Override
+  public String toString() {
+    return "ListTcpPortProvider{" + StringUtils.join(this.tcpPortList, ',') + '}';
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
new file mode 100644
index 0000000..c7f4fbf
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.List;
+
+/**
+ * An list of tcp port numbers to try.
+ */
+@NamedParameter(doc = "An list of tcp port numbers to try")
+public final class TcpPortList implements Name<List<Integer>> {
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private TcpPortList() {
+  }
+}