You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/10/18 20:25:54 UTC

reef git commit: [REEF-1891] Allow passing job submission environment variables from .Net client

Repository: reef
Updated Branches:
  refs/heads/master e18b5d36a -> 987240cb7


[REEF-1891] Allow passing job submission environment variables from .Net client

* Update AvroYarnClusterJobSubmissionParameters schema for environment map
* Update Java bridge client to desterilize and set the map
* Set the map to ContainerLaunchContext
* Expose it in JobRequest and JobParameters in C#
* Serialize it in YarnREEFParamSerializer.
* Update unit test at both Java and C#
* Updated HelloREEFYarn to set it.

JIRA:
  [REEF-1891](https://issues.apache.org/jira/browse/REEF-1891)

Pull request:
  This closes #1381


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/987240cb
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/987240cb
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/987240cb

Branch: refs/heads/master
Commit: 987240cb7c36befdda1b168c51231002de41c24d
Parents: e18b5d3
Author: Julia Wang <ju...@apache.org>
Authored: Fri Sep 22 16:23:28 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Oct 18 13:25:01 2017 -0700

----------------------------------------------------------------------
 .../YarnREEFParamSerializerTests.cs             |   9 +-
 .../Org.Apache.REEF.Client/API/JobParameters.cs |  15 +-
 .../API/JobParametersBuilder.cs                 |  16 ++
 .../API/JobRequestBuilder.cs                    |  12 ++
 .../AvroYarnClusterJobSubmissionParameters.cs   |  13 +-
 .../Org.Apache.REEF.Client.csproj               |   1 +
 .../Org.Apache.REEF.Client/YARN/Environment.cs  | 156 +++++++++++++++++++
 .../YARN/YarnREEFParamSerializer.cs             |   3 +
 .../HelloREEFYarn.cs                            |   4 +-
 .../src/main/avro/JobSubmissionParameters.avsc  |   1 +
 .../client/YarnClusterSubmissionFromCS.java     |  26 ++++
 .../bridge/client/YarnJobSubmissionClient.java  |   1 +
 ...SubmissionParametersSerializationFromCS.java |   6 +-
 .../yarn/client/YarnSubmissionHelper.java       |  26 +++-
 .../reef/runtime/yarn/util/YarnTypes.java       |  34 +++-
 15 files changed, 311 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
index 23c8725..040a9f8 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
@@ -159,6 +159,7 @@ namespace Org.Apache.REEF.Client.Tests
                 "\"securityTokenService\":\"{0}\"," +
                 "\"maxApplicationSubmissions\":{1}," +
                 "\"driverMemory\":{1}," +
+                "\"envMap\":{{\"key1\":\"{0}\",\"key2\":\"{0}\"}}," +
                 "\"driverStdoutFilePath\":\"{0}\"," +
                 "\"driverStderrFilePath\":\"{0}\"" +
                 "}}";
@@ -178,15 +179,17 @@ namespace Org.Apache.REEF.Client.Tests
                 .SetJobIdentifier(AnyString)
                 .SetMaxApplicationSubmissions(AnyInt)
                 .SetDriverMemory(AnyInt)
+                .SetJobSubmissionEnvironmentVariable("key1", AnyString)
+                .SetJobSubmissionEnvironmentVariable("key2", AnyString)
                 .SetDriverStderrFilePath(AnyString)
                 .SetDriverStdoutFilePath(AnyString)
                 .Build();
 
             var serializedBytes = serializer.SerializeJobArgsToBytes(jobRequest.JobParameters, AnyString);
-            var expectedString = Encoding.UTF8.GetString(serializedBytes);
-            var jsonObject = JObject.Parse(expectedString);
+            var actualString = Encoding.UTF8.GetString(serializedBytes);
+            var actualJsonObject = JObject.Parse(actualString);
             var expectedJsonObject = JObject.Parse(expectedJson);
-            Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject));
+            Assert.True(JToken.DeepEquals(actualJsonObject, expectedJsonObject));
         }
 
         private sealed class DriverStartHandler : IObserver<IDriverStarted>

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
index 6d6c307..5279e3a 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
+using System.Collections.Generic;
 using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Logging;
 
@@ -30,6 +32,7 @@ namespace Org.Apache.REEF.Client.API
         private readonly string _jobIdentifier;
         private readonly int _maxApplicationSubmissions;
         private readonly int _driverMemory;
+        private IDictionary<string, string> _jobSubmissionEnvMap;
         private readonly Optional<string> _stdoutFilePath;
         private readonly Optional<string> _stderrFilePath;
         private readonly JavaLoggingSetting _logSetting;
@@ -38,6 +41,7 @@ namespace Org.Apache.REEF.Client.API
             string jobIdentifier, 
             int maxApplicationSubmissions, 
             int driverMemory,
+            IDictionary<string, string> jobSubmissionEnvMap, 
             string stdoutFilePath,
             string stderrFilePath,
             JavaLoggingSetting logSetting)
@@ -45,7 +49,8 @@ namespace Org.Apache.REEF.Client.API
             _jobIdentifier = jobIdentifier;
             _maxApplicationSubmissions = maxApplicationSubmissions;
             _driverMemory = driverMemory;
-            
+            _jobSubmissionEnvMap = jobSubmissionEnvMap;
+
             _stdoutFilePath = string.IsNullOrWhiteSpace(stdoutFilePath) ? 
                 Optional<string>.Empty() : Optional<string>.Of(stdoutFilePath);
 
@@ -81,6 +86,14 @@ namespace Org.Apache.REEF.Client.API
         }
 
         /// <summary>
+        /// The job submission environment variable map.
+        /// </summary>
+        public IDictionary<string, string> JobSubmissionEnvMap
+        {
+            get { return new Dictionary<string, string>(_jobSubmissionEnvMap); }
+        }
+
+        /// <summary>
         /// Gets the file path for stdout for the driver.
         /// </summary>
         public Optional<string> StdoutFilePath

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
index 06ab7ee..39df14e 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Collections.Generic;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Client.API
@@ -29,6 +30,7 @@ namespace Org.Apache.REEF.Client.API
         private int _driverMemory = 512;
         private string _stdoutFilePath = null;
         private string _stderrFilePath = null;
+        private readonly IDictionary<string, string> _jobSubmissionMap = new Dictionary<string, string>();
         private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.Info;
 
         private JobParametersBuilder()
@@ -53,6 +55,7 @@ namespace Org.Apache.REEF.Client.API
                 _jobIdentifier, 
                 _maxApplicationSubmissions, 
                 _driverMemory,
+                _jobSubmissionMap,
                 _stdoutFilePath,
                 _stderrFilePath,
                 _javaLogLevel);
@@ -89,6 +92,19 @@ namespace Org.Apache.REEF.Client.API
         }
 
         /// <summary>
+        /// Set job submission environment variable.
+        /// If the variable is already in the map, override it. 
+        /// </summary>
+        /// <param name="key"></param>
+        /// <param name="value"></param>
+        /// <returns></returns>
+        public JobParametersBuilder SetJobSubmissionEnvironmentVariable(string key, string value)
+        {
+            _jobSubmissionMap[key] = value;
+            return this;
+        }        
+
+        /// <summary>
         /// Sets the file path to the stdout file for the driver.
         /// </summary>
         public JobParametersBuilder SetDriverStdoutFilePath(string stdoutFilePath)

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
index 325ed04..234cd7a 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
@@ -148,6 +148,18 @@ namespace Org.Apache.REEF.Client.API
         }
 
         /// <summary>
+        /// Set a job submission environment variable.
+        /// </summary>
+        /// <param name="key">key of the environment variable.</param>
+        /// <param name="value">Value of the environment variable.</param>
+        /// <returns></returns>
+        public JobRequestBuilder SetJobSubmissionEnvironmentVariable(string key, string value)
+        {
+            _jobParametersBuilder.SetJobSubmissionEnvironmentVariable(key, value);
+            return this;
+        }
+
+        /// <summary>
         /// Sets the maximum amount of times a job can be submitted.
         /// </summary>
         public JobRequestBuilder SetMaxApplicationSubmissions(int maxAppSubmissions)

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
index c4e82fb..9bf2a64 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Collections.Generic;
 using System.Runtime.Serialization;
 using Org.Apache.REEF.Utilities.Attributes;
 
@@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
     [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
     public sealed class AvroYarnClusterJobSubmissionParameters
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi
 onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}";
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi
 onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""envMap"",""type"":{""type"":""map"",""values"":""string""}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}";
 
         /// <summary>
         /// Gets the schema.
@@ -66,6 +67,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN
         public int driverMemory { get; set; }
 
         /// <summary>
+        /// Gets or sets the envMap field.
+        /// </summary>
+        [DataMember]
+        public IDictionary<string, string> envMap { get; set; }
+
+        /// <summary>
         /// Gets or sets the maxApplicationSubmissions field.
         /// </summary>
         [DataMember]
@@ -99,15 +106,17 @@ namespace Org.Apache.REEF.Client.Avro.YARN
         /// <param name="securityTokenKind">The securityTokenKind.</param>
         /// <param name="securityTokenService">The securityTokenService.</param>
         /// <param name="driverMemory">The driverMemory.</param>
+        /// <param name="envMap">The envMap.</param>
         /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param>
         /// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param>
         /// <param name="driverStderrFilePath">The driverStderrFilePath.</param>
-        public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath)
+        public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, IDictionary<string, string> envMap, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath)
         {
             this.yarnJobSubmissionParameters = yarnJobSubmissionParameters;
             this.securityTokenKind = securityTokenKind;
             this.securityTokenService = securityTokenService;
             this.driverMemory = driverMemory;
+            this.envMap = envMap;
             this.maxApplicationSubmissions = maxApplicationSubmissions;
             this.driverStdoutFilePath = driverStdoutFilePath;
             this.driverStderrFilePath = driverStderrFilePath;

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index 7748895..a0b4f75 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -100,6 +100,7 @@ under the License.
     <Compile Include="Local\Parameters\NumberOfEvaluators.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="YARN\ApplicationReport.cs" />
+    <Compile Include="YARN\Environment.cs" />
     <Compile Include="YARN\HDI\HDInsightClientConfiguration.cs" />
     <Compile Include="YARN\HDI\HDInsightCommandLineEnvironment.cs" />
     <Compile Include="YARN\HDI\HDInsightCredential.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs b/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs
new file mode 100644
index 0000000..4704f2b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Client.YARN
+{
+    /// <summary>
+    /// Default environment map keys from YARN.
+    /// <a href="http://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html">
+    /// Hadoop RM REST API</a> documentation.
+    /// </summary>
+    public enum Environment
+    {
+        /**
+         * USER
+         */
+        USER,
+
+        /**
+         * LOGNAME
+         */
+        LOGNAME,
+
+        /**
+         * HOME
+         */
+        HOME,
+
+        /**
+         * PWD
+         */
+        PWD,
+
+        /**
+         * PATH
+         */
+        PATH,
+
+        /**
+         * SHELL
+         */
+        SHELL,
+
+        /**
+         * JAVA_HOME
+         */
+        JAVA_HOME,
+
+        /**
+         * CLASSPATH
+         */
+        CLASSPATH,
+
+        /**
+         * APP_CLASSPATH
+         */
+        APP_CLASSPATH,
+
+        /**
+         * HADOOP_CLASSPATH.
+         */
+        HADOOP_CLASSPATH,
+
+        /**
+         * LD_LIBRARY_PATH
+         */
+        LD_LIBRARY_PATH,
+
+        /**
+         * HADOOP_CONF_DIR
+         */
+        HADOOP_CONF_DIR,
+
+        /**
+         * HADOOP_CLIENT_CONF_DIR Final, non-modifiable.
+         */
+        HADOOP_CLIENT_CONF_DIR,
+
+        /**
+         * $HADOOP_COMMON_HOME
+         */
+        HADOOP_COMMON_HOME,
+
+        /**
+         * $HADOOP_HDFS_HOME
+         */
+        HADOOP_HDFS_HOME,
+
+        /**
+         * $MALLOC_ARENA_MAX
+         */
+        MALLOC_ARENA_MAX,
+
+        /**
+         * $HADOOP_YARN_HOME
+         */
+        HADOOP_YARN_HOME,
+
+        /**
+         * $CLASSPATH_PREPEND_DISTCACHE
+         * Private, Windows specific
+         */
+        CLASSPATH_PREPEND_DISTCACHE,
+
+        /**
+         * $CONTAINER_ID
+         * Exported by NodeManager and non-modifiable by users.
+         */
+        CONTAINER_ID,
+
+        /**
+         * $NM_HOST
+         * Exported by NodeManager and non-modifiable by users.
+         */
+        NM_HOST,
+
+        /**
+         * $NM_HTTP_PORT
+         * Exported by NodeManager and non-modifiable by users.
+         */
+        NM_HTTP_PORT,
+
+        /**
+         * $NM_PORT
+         * Exported by NodeManager and non-modifiable by users.
+         */
+        NM_PORT,
+
+        /**
+         * $LOCAL_DIRS
+         * Exported by NodeManager and non-modifiable by users.
+         */
+        LOCAL_DIRS,
+
+        /**
+         * $LOG_DIRS
+         * Exported by NodeManager and non-modifiable by users.
+         * Comma separate list of directories that the container should use for
+         * logging.
+         */
+        LOG_DIRS
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
index c09eaad..8fd6a4f 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Collections.Generic;
+using System.Globalization;
 using System.IO;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.Avro;
@@ -166,6 +168,7 @@ namespace Org.Apache.REEF.Client.YARN
 
                 yarnJobSubmissionParameters = avroYarnJobSubmissionParameters,
                 driverMemory = jobParameters.DriverMemoryInMB,
+                envMap = jobParameters.JobSubmissionEnvMap,
                 maxApplicationSubmissions = jobParameters.MaxApplicationSubmissions,
                 driverStdoutFilePath = string.IsNullOrWhiteSpace(jobParameters.StdoutFilePath.Value) ?
                     _fileNames.GetDefaultYarnDriverStdoutFilePath() : jobParameters.StdoutFilePath.Value,

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
index 917e3eb..2285a50 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
@@ -94,6 +94,8 @@ namespace Org.Apache.REEF.Examples.HelloREEF
                 .AddDriverConfiguration(driverConfig.Build())
                 .AddGlobalAssemblyForType(typeof(HelloDriverYarn))
                 .SetJobIdentifier("HelloREEF")
+                .SetJobSubmissionEnvironmentVariable(Environment.PATH.ToString(), "value1")
+                .SetJobSubmissionEnvironmentVariable("UserDefineKey", "value2")
                 .SetJavaLogLevel(JavaLoggingSetting.Verbose)
                 .Build();
 
@@ -180,7 +182,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
 
         /// <summary>
         /// HelloREEF example running on YARN
-        /// Usage: Org.Apache.REEF.Examples.HelloREEF SecurityTokenId SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]...
+        /// Usage: Org.Apache.REEF.Examples.HelloREEF TrustedApplicaitonLLQ SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]...
         /// </summary>
         /// <param name="args"></param>
         public static void MainYarn(string[] args)

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index 62bc757..ecf0043 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -60,6 +60,7 @@
         { "name": "securityTokenKind", "type": "string", "default": "NULL" },
         { "name": "securityTokenService", "type": "string", "default": "NULL" },
         { "name": "driverMemory", "type": "int" },
+        {"name": "environmentVariablesMap", "type": {"type": "map", "values": "string"}},
         { "name": "maxApplicationSubmissions", "type": "int" },
         { "name": "driverStdoutFilePath", "type": "string" },
         { "name": "driverStderrFilePath", "type": "string" }

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
index 71968b9..b45a2f2 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
@@ -28,6 +28,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Represents a job submission from the CS code.
@@ -57,6 +59,7 @@ final class YarnClusterSubmissionFromCS {
   private final String fileSystemUrl;
   private final String yarnDriverStdoutFilePath;
   private final String yarnDriverStderrFilePath;
+  private final Map<String, String> environmentVariablesMap = new HashMap<>();
 
   private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters;
   private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters;
@@ -89,6 +92,13 @@ final class YarnClusterSubmissionFromCS {
     this.yarnDriverStdoutFilePath = yarnClusterJobSubmissionParameters.getDriverStdoutFilePath().toString();
     this.yarnDriverStderrFilePath = yarnClusterJobSubmissionParameters.getDriverStderrFilePath().toString();
 
+    if (yarnClusterJobSubmissionParameters.getEnvironmentVariablesMap() != null) {
+      for (Map.Entry<java.lang.CharSequence, java.lang.CharSequence> pair :
+          yarnClusterJobSubmissionParameters.getEnvironmentVariablesMap().entrySet()) {
+        this.environmentVariablesMap.put(pair.getKey().toString(), pair.getValue().toString());
+      }
+    }
+
     Validate.notEmpty(jobId, "The job id is null or empty");
     Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0.");
     Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
@@ -121,9 +131,18 @@ final class YarnClusterSubmissionFromCS {
         ", tokenService='" + tokenService + '\'' +
         ", fileSystemUrl='" + fileSystemUrl + '\'' +
         ", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' +
+        envMapString() +
         '}';
   }
 
+  private String envMapString() {
+    final StringBuilder sb = new StringBuilder();
+    for (final Map.Entry<String, String> entry : environmentVariablesMap.entrySet()) {
+      sb.append(", Key:" + entry.getKey() + ", value:" + entry.getValue());
+    }
+    return sb.toString();
+  }
+
   /**
    * @return The local folder where the driver is staged.
    */
@@ -181,6 +200,13 @@ final class YarnClusterSubmissionFromCS {
   }
 
   /**
+   * @return The environment map.
+   */
+  Map<String, String> getEnvironmentVariablesMap() {
+    return new HashMap<>(environmentVariablesMap);
+  }
+
+  /**
    * @return The max amount of times the application can be submitted.
    */
   int getMaxApplicationSubmissions(){

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 0d71517..868201b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -171,6 +171,7 @@ public final class YarnJobSubmissionClient {
           .setConfigurationFilePaths(confFiles)
           .setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath())
           .setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath())
+          .setJobSubmissionEnvMap(yarnSubmission.getEnvironmentVariablesMap())
           .submit();
       writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
           submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
index b8024f9..f733131 100644
--- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
@@ -65,6 +65,10 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
           "\"securityTokenService\":\"" + NULL_REP + "\"," +
           "\"maxApplicationSubmissions\":" + NUMBER_REP + "," +
           "\"driverMemory\":" + NUMBER_REP + "," +
+          "\"environmentVariablesMap\":" +
+          "{" +
+            "\"key\":" + STRING_REP_QUOTED +
+          "}," +
           "\"driverStdoutFilePath\":" + STRING_REP_QUOTED + "," +
           "\"driverStderrFilePath\":" + STRING_REP_QUOTED +
       "}";
@@ -179,7 +183,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
     assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP);
     assert yarnClusterSubmissionFromCS.getYarnDriverStderrFilePath().equals(STRING_REP);
     assert yarnClusterSubmissionFromCS.getYarnDriverStdoutFilePath().equals(STRING_REP);
-
+    assert yarnClusterSubmissionFromCS.getEnvironmentVariablesMap().get("key").equals(STRING_REP);
     verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(),
         yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters());
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 5df89e7..7eec786 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -63,6 +63,7 @@ public final class YarnSubmissionHelper implements AutoCloseable {
   private String driverStderrFilePath;
   private Class launcherClazz = REEFLauncher.class;
   private List<String> configurationFilePaths;
+  private final Map<String, String> environmentVariablesMap = new HashMap<>();
 
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
@@ -241,6 +242,29 @@ public final class YarnSubmissionHelper implements AutoCloseable {
   }
 
   /**
+   * Sets environment variable map.
+   * @param map
+   * @return
+   */
+  public YarnSubmissionHelper setJobSubmissionEnvMap(final Map<String, String> map) {
+    for (final Map.Entry<String, String> entry : map.entrySet()) {
+      environmentVariablesMap.put(entry.getKey(), entry.getValue());
+    }
+    return this;
+  }
+
+  /**
+   * Adds a job submission environment variable.
+   * @param key
+   * @param value
+   * @return
+   */
+  public YarnSubmissionHelper setJobSubmissionEnvVariable(final String key, final String value) {
+    environmentVariablesMap.put(key, value);
+    return this;
+  }
+
+  /**
    * Sets the Driver stdout file path.
    * @param driverStdoutPath
    * @return
@@ -278,7 +302,7 @@ public final class YarnSubmissionHelper implements AutoCloseable {
     }
 
     final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext(
-        launchCommand, this.resources, tokenProvider.getTokens());
+        launchCommand, this.resources, tokenProvider.getTokens(), environmentVariablesMap);
     this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
 
     LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}, driver core: {1}",

http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
index 88b7134..cc22d40 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
@@ -51,27 +51,55 @@ public final class YarnTypes {
       final List<String> commands,
       final Map<String, LocalResource> localResources,
       final byte[] securityTokenBuffer) {
-    return getContainerLaunchContext(commands, localResources, securityTokenBuffer, null);
+    return getContainerLaunchContext(commands, localResources, securityTokenBuffer,
+        new HashMap<String, String>(), null);
+  }
+
+  /**
+   * @return a ContainerLaunchContext with the given commands, LocalResources and environment map.
+   */
+  public static ContainerLaunchContext getContainerLaunchContext(
+      final List<String> commands,
+      final Map<String, LocalResource> localResources,
+      final byte[] securityTokenBuffer,
+      final Map<String, String> envMap) {
+    return getContainerLaunchContext(commands, localResources, securityTokenBuffer, envMap, null);
+  }
+
+  /**
+   * Gets a LaunchContext and sets the environment variable.
+   * @return a ContainerLaunchContext with the given commands and LocalResources.
+   */
+  public static ContainerLaunchContext getContainerLaunchContext(
+      final List<String> commands,
+      final Map<String, LocalResource> localResources,
+      final byte[] securityTokenBuffer,
+      final ApplicationId applicationId) {
+    return getContainerLaunchContext(commands, localResources, securityTokenBuffer,
+        new HashMap<String, String>(), null);
   }
 
   /**
    * Gets a LaunchContext and sets the environment variable
    * {@link YarnUtilities#REEF_YARN_APPLICATION_ID_ENV_VAR} for REEF Evaluators.
-   * @return a ContainerLaunchContext with the given commands and LocalResources.
+   * @return a ContainerLaunchContext with the given commands, LocalResources and environment map.
    */
   public static ContainerLaunchContext getContainerLaunchContext(
       final List<String> commands,
       final Map<String, LocalResource> localResources,
       final byte[] securityTokenBuffer,
+      final Map<String, String> envMap,
       final ApplicationId applicationId) {
     final ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
     context.setLocalResources(localResources);
     context.setCommands(commands);
-    final Map<String, String> envMap = new HashMap<>();
     if (applicationId != null) {
       envMap.put(YarnUtilities.REEF_YARN_APPLICATION_ID_ENV_VAR, applicationId.toString());
     }
 
+    for (final Map.Entry entry : envMap.entrySet()) {
+      LOG.log(Level.FINE, "Key : {0}, Value : {1}", new Object[] {entry.getKey(), entry.getValue()});
+    }
     context.setEnvironment(envMap);
     if (securityTokenBuffer != null) {
       context.setTokens(ByteBuffer.wrap(securityTokenBuffer));