You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by sh...@apache.org on 2017/08/09 02:21:16 UTC

reef git commit: [REEF-1820] Specify node names and relaxLocality flag in Evaluator request

Repository: reef
Updated Branches:
  refs/heads/master 2d0246926 -> 18c24b1c7


[REEF-1820] Specify node names and relaxLocality flag in Evaluator request

Addresses the following:
  * Specify node names and relaxLocality flag in Evaluator request in .Net
  * Specify node names and relaxLocality flag in Evaluator request in Java
  * Modify Bridge to patch changes from .Net to Java
  * Extends Hello Reef for yarn as sample/e2e test for the current changes

JIRA:
  [REEF-1820](https://issues.apache.org/jira/browse/REEF-1820)

Pull request:
  This closes #1360


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/18c24b1c
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/18c24b1c
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/18c24b1c

Branch: refs/heads/master
Commit: 18c24b1c760ef9afc1ea45d4c54cce8e97f3b29f
Parents: 2d02469
Author: Julia Wang <ju...@apache.org>
Authored: Mon Aug 7 19:59:06 2017 -0700
Committer: Shravan Narayanamurthy <sh...@apache.org>
Committed: Tue Aug 8 19:17:17 2017 -0700

----------------------------------------------------------------------
 .../EvaluatorRequestorClr2Java.cpp              |   6 +-
 lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp  |  15 ++
 lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h    |   4 +
 .../Bridge/Events/EvaluatorRequestor.cs         |   4 +-
 .../Evaluator/EvaluatorRequest.cs               |  32 +++-
 .../Evaluator/EvaluatorRequestBuilder.cs        |  45 +++++-
 .../Evaluator/IEvaluatorRequest.cs              |  23 ++-
 .../HelloDriverYarn.cs                          | 122 ++++++++++++++
 .../HelloREEF.cs                                |   2 +-
 .../HelloREEFYarn.cs                            | 161 +++++++++++++++++++
 .../Org.Apache.REEF.Examples.HelloREEF.csproj   |   3 +
 .../Org.Apache.REEF.Examples.HelloREEF/Run.cs   |  38 +++++
 .../javabridge/EvaluatorRequestorBridge.java    |   6 +-
 .../reef/driver/evaluator/EvaluatorRequest.java |  56 ++++++-
 .../common/driver/EvaluatorRequestorImpl.java   |  20 +--
 15 files changed, 510 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
index 7616c2c..90fe5d4 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
@@ -53,7 +53,7 @@ namespace Org {
               ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit");
               JNIEnv *env = RetrieveEnv(_jvm);
               jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor);
-              jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIILjava/lang/String;Ljava/lang/String;)V");
+              jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;)V");
 
               if (jmidSubmit == NULL) {
                 fprintf(stdout, " jmidSubmit is NULL\n");
@@ -66,8 +66,10 @@ namespace Org {
                 request->Number,
                 request->MemoryMegaBytes,
                 request->VirtualCore,
+                request->RelaxLocality,
                 JavaStringFromManagedString(env, request->Rack),
-                JavaStringFromManagedString(env, request->RuntimeName));
+                JavaStringFromManagedString(env, request->RuntimeName),
+                JavaArrayListFromManagedList(env, request->NodeNames));
               ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit");
             }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
index 628e7ba..4402c44 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
@@ -70,6 +70,21 @@ jstring JavaStringFromManagedString(
   return env->NewString((const jchar*)wch, managedString->Length);
 }
 
+jobject JavaArrayListFromManagedList(
+    JNIEnv *env,
+    System::Collections::Generic::ICollection<String^>^ managedNodeNames) {
+
+    jclass arrayListClazz = (*env).FindClass("java/util/ArrayList");
+    jobject arrayListObj = (*env).NewObject(arrayListClazz, (*env).GetMethodID(arrayListClazz, "<init>", "()V"));
+
+    for each (String^ nodeName in managedNodeNames)
+    {
+        jstring nodeNamestr = JavaStringFromManagedString(env, nodeName);
+        (*env).CallBooleanMethod(arrayListObj, (*env).GetMethodID(arrayListClazz, "add", "(Ljava/lang/Object;)Z"), nodeNamestr);
+    }
+    return arrayListObj;
+}
+
 void HandleClr2JavaError(
   JNIEnv *env,
   String^ errorMessage,

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
index c06377c..bbaf369 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
@@ -48,6 +48,10 @@ jstring JavaStringFromManagedString(
   JNIEnv *env,
   String^ managedString);
 
+jobject JavaArrayListFromManagedList(
+    JNIEnv *env,
+    System::Collections::Generic::ICollection<String^>^ managedNodeNames);
+
 array<byte>^ ManagedByteArrayFromJavaByteArray(
   JNIEnv *env,
   jbyteArray javaByteArray);

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
index 31cb36f..504ae17 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.Globalization;
+using System.Linq;
 using System.Runtime.Serialization;
 using Org.Apache.REEF.Common.Catalog;
 using Org.Apache.REEF.Common.Evaluator;
@@ -64,7 +65,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
 
         public void Submit(IEvaluatorRequest request)
         {
-            LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and  {2} core to rack {3} and runtime {4}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName);
+            LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and  {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}.",
+                request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality);
             lock (Evaluators)
             {
                 for (var i = 0; i < request.Number; i++)

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
index 685da50..2c8365d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
@@ -16,6 +16,8 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
+using System.Linq;
 using System.Runtime.Serialization;
 
 namespace Org.Apache.REEF.Driver.Evaluator
@@ -27,37 +29,43 @@ namespace Org.Apache.REEF.Driver.Evaluator
     internal class EvaluatorRequest : IEvaluatorRequest
     {
         internal EvaluatorRequest()
-            : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty)
+            : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes)
-            : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty)
+            : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core)
-            : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty)
+            : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, string rack)
-            : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty)
+            : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core, string rack)
-            : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty)
+            : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId)
-            : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty)
+        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames)
+            : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true)
 
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName)
+        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames, bool relaxLocality)
+           : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality)
+
+        {
+        }
+
+        internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality)
         {
             Number = number;
             MemoryMegaBytes = megaBytes;
@@ -65,6 +73,8 @@ namespace Org.Apache.REEF.Driver.Evaluator
             Rack = rack;
             EvaluatorBatchId = evaluatorBatchId;
             RuntimeName = runtimeName;
+            NodeNames = nodeNames;
+            RelaxLocality = relaxLocality;
         }
 
         [DataMember]
@@ -85,6 +95,12 @@ namespace Org.Apache.REEF.Driver.Evaluator
         [DataMember]
         public string RuntimeName { get; private set; }
 
+        [DataMember]
+        public ICollection<string> NodeNames { get; private set; }
+
+        [DataMember]
+        public bool RelaxLocality { get; private set; }
+
         internal static EvaluatorRequestBuilder NewBuilder()
         {
             return new EvaluatorRequestBuilder();

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
index d6e5a63..6ccd1fb 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
@@ -16,6 +16,8 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
+using System.Linq;
 using Org.Apache.REEF.Common.Runtime;
 
 namespace Org.Apache.REEF.Driver.Evaluator
@@ -25,6 +27,8 @@ namespace Org.Apache.REEF.Driver.Evaluator
         private string _evaluatorBatchId;
         private string _rackName;
         private string _runtimeName;
+        private ICollection<string> _nodeNames;
+        private bool _relaxLocality;
 
         internal EvaluatorRequestBuilder(IEvaluatorRequest request)
         {
@@ -34,6 +38,8 @@ namespace Org.Apache.REEF.Driver.Evaluator
             _evaluatorBatchId = request.EvaluatorBatchId;
             _rackName = request.Rack;
             _runtimeName = request.RuntimeName;
+            _nodeNames = request.NodeNames;
+            _relaxLocality = request.RelaxLocality;
         }
 
         internal EvaluatorRequestBuilder()
@@ -44,6 +50,8 @@ namespace Org.Apache.REEF.Driver.Evaluator
             _rackName = string.Empty;
             _evaluatorBatchId = Guid.NewGuid().ToString("N");
             _runtimeName = string.Empty;
+            _nodeNames = Enumerable.Empty<string>().ToList();
+            _relaxLocality = true;
         }
 
         public int Number { get; private set; }
@@ -95,7 +103,29 @@ namespace Org.Apache.REEF.Driver.Evaluator
         }
 
         /// <summary>
-        /// Sets the batch ID for requested evaluators in the same request. The batch of Evaluators requested in the 
+        /// Set desired node names for the Evaluator to be allocated on.
+        /// </summary>
+        /// <param name="nodeNames"></param>
+        /// <returns>this</returns>
+        public EvaluatorRequestBuilder AddNodeNames(ICollection<string> nodeNames)
+        {
+            _nodeNames = nodeNames;
+            return this;
+        }
+
+        /// <summary>
+        /// Set a desired node name for evaluator to be allocated
+        /// </summary>
+        /// <param name="nodeName"></param>
+        /// <returns></returns>
+        public EvaluatorRequestBuilder AddNodeName(string nodeName)
+        {
+            _nodeNames.Add(nodeName);
+            return this;
+        }
+
+        /// <summary>
+        /// Sets the batch ID for requested evaluators in the same request. The batch of Evaluators requested in the
         /// same request will have the same Evaluator Batch ID.
         /// </summary>
         /// <param name="evaluatorBatchId">The batch ID for the Evaluator request.</param>
@@ -117,12 +147,23 @@ namespace Org.Apache.REEF.Driver.Evaluator
         }
 
         /// <summary>
+        /// Set the relax locality for requesting evaluator with specified node names
+        /// </summary>
+        /// <param name="relaxLocality">Locality relax flag.</param>
+        /// <returns>this</returns>
+        public EvaluatorRequestBuilder SetRelaxLocality(bool relaxLocality)
+        {
+            _relaxLocality = relaxLocality;
+            return this;
+        }
+
+        /// <summary>
         /// Build the EvaluatorRequest.
         /// </summary>
         /// <returns></returns>
         public IEvaluatorRequest Build()
         {
-            return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName);
+            return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
index 6b2ce21..357ffe3 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Collections.Generic;
+
 namespace Org.Apache.REEF.Driver.Evaluator
 {
     /// <summary>
@@ -43,14 +45,33 @@ namespace Org.Apache.REEF.Driver.Evaluator
         string Rack { get; }
 
         /// <summary>
+        /// The desired node names for the Evaluator to be allocated on.
+        /// </summary>
+        ICollection<string> NodeNames { get; }
+
+        /// <summary>
         /// The batch ID for requested evaluators. Evaluators requested in the same batch
         /// will have the same Batch ID.
         /// </summary>
         string EvaluatorBatchId { get; }
 
         /// <summary>
-        /// The name of the runtime to allocate teh evaluator on
+        /// The name of the runtime to allocate the evaluator on
         /// </summary>
         string RuntimeName { get; }
+
+        /// <summary>
+        /// For a request at a network hierarchy level, set whether locality can be relaxed to that level and beyond.
+        /// If the flag is off on a rack-level ResourceRequest, containers at that request's priority
+        /// will not be assigned to nodes on that request's rack unless requests specifically for
+        /// those nodes have also been submitted.
+        /// If the flag is off on an ANY-level ResourceRequest, containers at that request's priority
+        /// will only be assigned on racks for which specific requests have also been submitted.
+        /// For example, to request a container strictly on a specific node, the corresponding rack-level
+        /// and any-level requests should have locality relaxation set to false. Similarly,
+        /// to request a container strictly on a specific rack,
+        /// the corresponding any-level request should have locality relaxation set to false.
+        /// </summary>
+        bool RelaxLocality { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs
new file mode 100644
index 0000000..9c84d33
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.HelloREEF
+{
+    /// <summary>
+    /// The Driver for HelloREEF: It requests a single Evaluator and then submits the HelloTask to it.
+    /// </summary>
+    public sealed class HelloDriverYarn : IObserver<IAllocatedEvaluator>, IObserver<IDriverStarted>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(HelloDriver));
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
+
+        /// <summary>
+        /// List of node names for desired evaluators
+        /// </summary>
+        private readonly IList<string> _nodeNames;
+
+        /// <summary>
+        /// Specify if the desired node names is relaxed
+        /// </summary>
+        private readonly bool _relaxLocality;
+
+        /// <summary>
+        /// Constructor of the driver
+        /// </summary>
+        /// <param name="evaluatorRequestor">Evaluator Requestor</param>
+        /// <param name="nodeNames">Node names for evaluators</param>
+        /// <param name="relaxLocality">Relax indicator of evaluator node request</param>
+        [Inject]
+        private HelloDriverYarn(IEvaluatorRequestor evaluatorRequestor,
+            [Parameter(typeof(NodeNames))] ISet<string> nodeNames,
+            [Parameter(typeof(RelaxLocality))] bool relaxLocality)
+        {
+            _evaluatorRequestor = evaluatorRequestor;
+            _nodeNames = nodeNames.ToList();
+            _relaxLocality = relaxLocality;
+        }
+
+        /// <summary>
+        /// Submits the HelloTask to the Evaluator.
+        /// </summary>
+        /// <param name="allocatedEvaluator"></param>
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            Logger.Log(Level.Info, "Received allocatedEvaluator-HostName: " + allocatedEvaluator.GetEvaluatorDescriptor().NodeDescriptor.HostName);
+            var taskConfiguration = TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "HelloTask")
+                .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class)
+                .Build();
+            allocatedEvaluator.SubmitTask(taskConfiguration);
+        }
+
+        public void OnError(Exception error)
+        {
+            throw error;
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        /// <summary>
+        /// Called to start the user mode driver
+        /// </summary>
+        /// <param name="driverStarted"></param>
+        public void OnNext(IDriverStarted driverStarted)
+        {
+            Logger.Log(Level.Info, string.Format("HelloDriver started at {0}", driverStarted.StartTime));
+
+            if (_nodeNames != null && _nodeNames.Count > 0)
+            {
+                _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
+                    .AddNodeNames(_nodeNames)
+                    .SetMegabytes(64)
+                    .SetNumber(_nodeNames.Count)
+                    .SetRelaxLocality(_relaxLocality)
+                    .Build());
+            }
+            else
+            {
+                _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
+                    .SetMegabytes(64)
+                    .Build());
+            }
+        }
+    }
+
+    [NamedParameter(documentation: "Set of node names for evaluators")]
+    internal class NodeNames : Name<ISet<string>>
+    {    
+    }
+
+    [NamedParameter(documentation: "RelaxLocality for specifying evaluator node names", shortName: "RelaxLocality", defaultValue: "true")]
+    internal class RelaxLocality : Name<bool>
+    {        
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 575a13f..20c80fa 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -103,7 +103,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
             }
         }
 
-        public static void Main(string[] args)
+        public static void MainSimple(string[] args)
         {
             TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(args.Length > 0 ? args[0] : Local)).GetInstance<HelloREEF>().Run();
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
new file mode 100644
index 0000000..fbccdec
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.HelloREEF
+{
+    /// <summary>
+    /// A Tool that submits HelloREEFDriver for execution on YARN.
+    /// </summary>
+    public sealed class HelloREEFYarn
+    {
+        private const int ReTryCounts = 200;
+        private const int SleepTime = 2000;
+        private const string DefaultPortRangeStart = "2000";
+        private const string DefaultPortRangeCount = "20";
+        private const string TrustedApplicationTokenIdentifier = "TrustedApplicationTokenIdentifier";
+        private const string SecurityTokenId = "SecurityTokenId";
+        private const string SecurityTokenPwd = "SecurityTokenPwd";
+
+        private readonly IREEFClient _reefClient;
+        private readonly JobRequestBuilder _jobRequestBuilder;
+
+        private static readonly Logger Logger = Logger.GetLogger(typeof(HelloREEFYarn));
+
+        /// <summary>
+        /// List of node names for evaluators
+        /// </summary>
+        private readonly IList<string> _nodeNames;
+
+        [Inject]
+        private HelloREEFYarn(IREEFClient reefClient, 
+            JobRequestBuilder jobRequestBuilder,
+            [Parameter(typeof(NodeNames))] ISet<string> nodeNames)
+        {
+            _reefClient = reefClient;
+            _jobRequestBuilder = jobRequestBuilder;
+            _nodeNames = nodeNames.ToList();
+        }
+
+        /// <summary>
+        /// Runs HelloREEF using the IREEFClient passed into the constructor.
+        /// </summary>
+        private void Run()
+        {
+            // The driver configuration contains all the needed handler bindings
+            var helloDriverConfiguration = DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<HelloDriverYarn>.Class)
+                .Set(DriverConfiguration.OnDriverStarted, GenericType<HelloDriverYarn>.Class)              
+                .Build();
+
+            var driverConfig = TangFactory.GetTang()
+                .NewConfigurationBuilder(helloDriverConfiguration);
+
+            foreach (var n in _nodeNames)
+            {
+                driverConfig.BindSetEntry<NodeNames, string>(GenericType<NodeNames>.Class, n);
+            }
+            
+            // The JobSubmission contains the Driver configuration as well as the files needed on the Driver.
+            var helloJobRequest = _jobRequestBuilder
+                .AddDriverConfiguration(driverConfig.Build())
+                .AddGlobalAssemblyForType(typeof(HelloDriverYarn))
+                .SetJobIdentifier("HelloREEF")
+                .SetJavaLogLevel(JavaLoggingSetting.Verbose)
+                .Build();
+
+            var result = _reefClient.SubmitAndGetJobStatus(helloJobRequest);
+            var state = PullFinalJobStatus(result);
+            Logger.Log(Level.Info, "Application state : {0}.", state);
+        }
+
+        /// <summary>
+        /// This is to pull job final status until the Job is done
+        /// </summary>
+        /// <param name="jobSubmitionResult"></param>
+        /// <returns></returns>
+        private FinalState PullFinalJobStatus(IJobSubmissionResult jobSubmitionResult)
+        {
+            int n = 0;
+            var state = jobSubmitionResult.FinalState;
+            while (state.Equals(FinalState.UNDEFINED) && n++ < ReTryCounts)
+            {
+                Thread.Sleep(SleepTime);
+                state = jobSubmitionResult.FinalState;
+            }
+            return state;
+        }
+
+        /// <summary>
+        /// Get runtime configuration
+        /// </summary>
+        /// <returns></returns>
+        private static IConfiguration GetRuntimeConfiguration(string[] args)
+        {
+            var c = YARNClientConfiguration.ConfigurationModule
+                .Set(YARNClientConfiguration.SecurityTokenKind, TrustedApplicationTokenIdentifier)
+                .Set(YARNClientConfiguration.SecurityTokenService, TrustedApplicationTokenIdentifier)
+                .Build();
+
+            File.WriteAllText(SecurityTokenId, args[0]);
+            File.WriteAllText(SecurityTokenPwd, args[1]);
+
+            IConfiguration tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule
+                .Set(TcpPortConfigurationModule.PortRangeStart, args.Length > 2 ? args[2] : DefaultPortRangeStart)
+                .Set(TcpPortConfigurationModule.PortRangeCount, args.Length > 3 ? args[3] : DefaultPortRangeCount)
+                .Build();
+
+            return Configurations.Merge(c, tcpPortConfig);
+        }
+
+        /// <summary>
+        /// HelloREEF example running on YARN
+        /// Usage: Org.Apache.REEF.Examples.HelloREEF SecurityTokenId SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]...
+        /// </summary>
+        /// <param name="args"></param>
+        public static void MainYarn(string[] args)
+        {
+            var configBuilder = TangFactory.GetTang()
+                .NewConfigurationBuilder(GetRuntimeConfiguration(args));
+
+            if (args.Length > 4)
+            {
+                for (int i = 4; i < args.Length; i++)
+                {
+                    configBuilder.BindSetEntry<NodeNames, string>(GenericType<NodeNames>.Class, args[i]);
+                }
+            }
+
+            TangFactory.GetTang().NewInjector(configBuilder.Build()).GetInstance<HelloREEFYarn>().Run();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
index d60e48c..be7fed1 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
@@ -31,9 +31,12 @@
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
     <Compile Include="HelloDriver.cs" />
+    <Compile Include="HelloDriverYarn.cs" />
     <Compile Include="HelloREEF.cs" />
+    <Compile Include="HelloREEFYarn.cs" />
     <Compile Include="HelloTask.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Run.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="$(SolutionDir)\App.config">

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs
new file mode 100644
index 0000000..f8aa715
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Examples.HelloREEF
+{
+    public sealed class Run
+    {
+        /// <summary>
+        /// Program that runs hello reef
+        /// </summary>
+        /// <param name="args"></param>
+        public static void Main(string[] args)
+        {
+            if (args.Length < 2)
+            {
+                HelloREEF.MainSimple(args);
+            }
+            else
+            {
+                HelloREEFYarn.MainYarn(args);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
index 0db4388..518537d 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
@@ -66,8 +66,10 @@ public final class EvaluatorRequestorBridge extends NativeBridge {
   public void submit(final int evaluatorsNumber,
                      final int memory,
                      final int virtualCore,
+                     final boolean relaxLocality,
                      final String rack,
-                     final String runtimeName) {
+                     final String runtimeName,
+                     final ArrayList<String> nodeNames) {
     if (this.isBlocked) {
       throw new RuntimeException("Cannot request additional Evaluator, this is probably because " +
           "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433.");
@@ -85,6 +87,8 @@ public final class EvaluatorRequestorBridge extends NativeBridge {
           .setMemory(memory)
           .setNumberOfCores(virtualCore)
           .setRuntimeName(runtimeName)
+          .setRelaxLocality(relaxLocality)
+          .addNodeNames(nodeNames)
           .build();
 
       LOG.log(Level.FINE, "submitting evaluator request {0}", request);

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
index 38494ac..dfed8f6 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
@@ -40,6 +40,7 @@ public final class EvaluatorRequest {
   private final List<String> nodeNames;
   private final List<String> rackNames;
   private final String runtimeName;
+  private final boolean relaxLocality;
 
   EvaluatorRequest(final int number,
                    final int megaBytes,
@@ -55,12 +56,24 @@ public final class EvaluatorRequest {
                    final List<String> nodeNames,
                    final List<String> rackNames,
                    final String runtimeName) {
+    this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true);
+  }
+
+
+  EvaluatorRequest(final int number,
+                   final int megaBytes,
+                   final int cores,
+                   final List<String> nodeNames,
+                   final List<String> rackNames,
+                   final String runtimeName,
+                   final boolean relaxLocality) {
     this.number = number;
     this.megaBytes = megaBytes;
     this.cores = cores;
     this.nodeNames = nodeNames;
     this.rackNames = rackNames;
     this.runtimeName = runtimeName;
+    this.relaxLocality = relaxLocality;
   }
 
   /**
@@ -137,6 +150,16 @@ public final class EvaluatorRequest {
   }
 
   /**
+   * Access the locality relax flag.
+   *
+   * @return the value of relaxLocality. If not set default is true.
+   */
+  public boolean getRelaxLocality() {
+    return relaxLocality;
+  }
+
+
+  /**
    * {@link EvaluatorRequest}s are build using this Builder.
    */
   public static class Builder<T extends Builder> implements org.apache.reef.util.Builder<EvaluatorRequest> {
@@ -147,6 +170,7 @@ public final class EvaluatorRequest {
     private final List<String> nodeNames = new ArrayList<>();
     private final List<String> rackNames = new ArrayList<>();
     private String runtimeName = "";
+    private boolean relaxLocality = true; //if not set, default to true
 
     @Private
     public Builder() {
@@ -163,6 +187,7 @@ public final class EvaluatorRequest {
       setMemory(request.getMegaBytes());
       setNumberOfCores(request.getNumberOfCores());
       setRuntimeName(request.getRuntimeName());
+      setRelaxLocality(request.getRelaxLocality());
       for (final String nodeName : request.getNodeNames()) {
         addNodeName(nodeName);
       }
@@ -233,6 +258,21 @@ public final class EvaluatorRequest {
     }
 
     /**
+     * Adds node names.They are the preferred locations where the evaluator should
+     * run on. If any of the node is available, the RM will try to allocate the
+     * evaluator there
+     *
+     * @param nodeNamesList preferred node names
+     * @return this Builder.
+     */
+    public T addNodeNames(final List<String> nodeNamesList) {
+      if(nodeNamesList != null) {
+        this.nodeNames.addAll(nodeNamesList);
+      }
+      return (T) this;
+    }
+
+    /**
      * Adds a rack name. It is the preferred location where the evaluator should
      * run on. If the rack is available, the RM will try to allocate the
      * evaluator in one of its nodes. The RM will try to match node names first,
@@ -247,11 +287,25 @@ public final class EvaluatorRequest {
     }
 
     /**
+     * A boolean relaxLocality flag defaulting to true, which tells the ResourceManager
+     * if the application wants locality to be loose (i.e. allows fall-through to rack or any)
+     * or strict (i.e. specify hard constraint on resource allocation).
+     *
+     * @param relaxLocalityFlg locality relaxation is enabled with this ResourceRequest
+     * @return this Builder.
+     */
+    public T setRelaxLocality(final boolean relaxLocalityFlg) {
+      this.relaxLocality = relaxLocalityFlg;
+      return (T) this;
+    }
+
+    /**
      * Builds the {@link EvaluatorRequest}.
      */
     @Override
     public EvaluatorRequest build() {
-      return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames, this.rackNames, this.runtimeName);
+      return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames,
+                                  this.rackNames, this.runtimeName, this.relaxLocality);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/18c24b1c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
index b2faf98..992659f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -29,6 +29,7 @@ import org.apache.reef.util.logging.LoggingScope;
 import org.apache.reef.util.logging.LoggingScopeFactory;
 
 import javax.inject.Inject;
+import java.util.Arrays;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -60,8 +61,11 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
 
   @Override
   public synchronized void submit(final EvaluatorRequest req) {
-    LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.",
-        new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.",
+          new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
+      LOG.log(Level.FINEST, "Node names: " + Arrays.toString(req.getNodeNames().toArray()));
+    }
 
     if (req.getMegaBytes() <= 0) {
       throw new IllegalArgumentException("Given an unsupported memory size: " + req.getMegaBytes());
@@ -82,22 +86,18 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
       throw new IllegalArgumentException("Runtime name cannot be null");
     }
     // for backwards compatibility, we will always set the relax locality flag
-    // to true unless the user configured racks, in which case we will check for
-    // the ANY modifier (*), if not there, then we won't relax the locality
-    boolean relaxLocality = true;
+    // to true unless the user has set it to false in the request, in which case
+    // we will check for the ANY modifier (*), if there, then we relax the
+    // locality regardless of the value set in the request.
+    boolean relaxLocality = req.getRelaxLocality();
     if (!req.getRackNames().isEmpty()) {
       for (final String rackName : req.getRackNames()) {
         if (Constants.ANY_RACK.equals(rackName)) {
           relaxLocality = true;
           break;
         }
-        relaxLocality = false;
       }
     }
-    // if the user specified any node, then we assume they do not want to relax locality
-    if (!req.getNodeNames().isEmpty()) {
-      relaxLocality = false;
-    }
 
     try (LoggingScope ls = this.loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
       final ResourceRequestEvent request = ResourceRequestEventImpl