You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/10/18 20:25:54 UTC
reef git commit: [REEF-1891] Allow passing job submission environment
variables from .Net client
Repository: reef
Updated Branches:
refs/heads/master e18b5d36a -> 987240cb7
[REEF-1891] Allow passing job submission environment variables from .Net client
* Update AvroYarnClusterJobSubmissionParameters schema for environment map
* Update Java bridge client to desterilize and set the map
* Set the map to ContainerLaunchContext
* Expose it in JobRequest and JobParameters in C#
* Serialize it in YarnREEFParamSerializer.
* Update unit test at both Java and C#
* Updated HelloREEFYarn to set it.
JIRA:
[REEF-1891](https://issues.apache.org/jira/browse/REEF-1891)
Pull request:
This closes #1381
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/987240cb
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/987240cb
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/987240cb
Branch: refs/heads/master
Commit: 987240cb7c36befdda1b168c51231002de41c24d
Parents: e18b5d3
Author: Julia Wang <ju...@apache.org>
Authored: Fri Sep 22 16:23:28 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Oct 18 13:25:01 2017 -0700
----------------------------------------------------------------------
.../YarnREEFParamSerializerTests.cs | 9 +-
.../Org.Apache.REEF.Client/API/JobParameters.cs | 15 +-
.../API/JobParametersBuilder.cs | 16 ++
.../API/JobRequestBuilder.cs | 12 ++
.../AvroYarnClusterJobSubmissionParameters.cs | 13 +-
.../Org.Apache.REEF.Client.csproj | 1 +
.../Org.Apache.REEF.Client/YARN/Environment.cs | 156 +++++++++++++++++++
.../YARN/YarnREEFParamSerializer.cs | 3 +
.../HelloREEFYarn.cs | 4 +-
.../src/main/avro/JobSubmissionParameters.avsc | 1 +
.../client/YarnClusterSubmissionFromCS.java | 26 ++++
.../bridge/client/YarnJobSubmissionClient.java | 1 +
...SubmissionParametersSerializationFromCS.java | 6 +-
.../yarn/client/YarnSubmissionHelper.java | 26 +++-
.../reef/runtime/yarn/util/YarnTypes.java | 34 +++-
15 files changed, 311 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
index 23c8725..040a9f8 100644
--- a/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
+++ b/lang/cs/Org.Apache.REEF.Client.Tests/YarnREEFParamSerializerTests.cs
@@ -159,6 +159,7 @@ namespace Org.Apache.REEF.Client.Tests
"\"securityTokenService\":\"{0}\"," +
"\"maxApplicationSubmissions\":{1}," +
"\"driverMemory\":{1}," +
+ "\"envMap\":{{\"key1\":\"{0}\",\"key2\":\"{0}\"}}," +
"\"driverStdoutFilePath\":\"{0}\"," +
"\"driverStderrFilePath\":\"{0}\"" +
"}}";
@@ -178,15 +179,17 @@ namespace Org.Apache.REEF.Client.Tests
.SetJobIdentifier(AnyString)
.SetMaxApplicationSubmissions(AnyInt)
.SetDriverMemory(AnyInt)
+ .SetJobSubmissionEnvironmentVariable("key1", AnyString)
+ .SetJobSubmissionEnvironmentVariable("key2", AnyString)
.SetDriverStderrFilePath(AnyString)
.SetDriverStdoutFilePath(AnyString)
.Build();
var serializedBytes = serializer.SerializeJobArgsToBytes(jobRequest.JobParameters, AnyString);
- var expectedString = Encoding.UTF8.GetString(serializedBytes);
- var jsonObject = JObject.Parse(expectedString);
+ var actualString = Encoding.UTF8.GetString(serializedBytes);
+ var actualJsonObject = JObject.Parse(actualString);
var expectedJsonObject = JObject.Parse(expectedJson);
- Assert.True(JToken.DeepEquals(jsonObject, expectedJsonObject));
+ Assert.True(JToken.DeepEquals(actualJsonObject, expectedJsonObject));
}
private sealed class DriverStartHandler : IObserver<IDriverStarted>
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
index 6d6c307..5279e3a 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/JobParameters.cs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+using System;
+using System.Collections.Generic;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Logging;
@@ -30,6 +32,7 @@ namespace Org.Apache.REEF.Client.API
private readonly string _jobIdentifier;
private readonly int _maxApplicationSubmissions;
private readonly int _driverMemory;
+ private IDictionary<string, string> _jobSubmissionEnvMap;
private readonly Optional<string> _stdoutFilePath;
private readonly Optional<string> _stderrFilePath;
private readonly JavaLoggingSetting _logSetting;
@@ -38,6 +41,7 @@ namespace Org.Apache.REEF.Client.API
string jobIdentifier,
int maxApplicationSubmissions,
int driverMemory,
+ IDictionary<string, string> jobSubmissionEnvMap,
string stdoutFilePath,
string stderrFilePath,
JavaLoggingSetting logSetting)
@@ -45,7 +49,8 @@ namespace Org.Apache.REEF.Client.API
_jobIdentifier = jobIdentifier;
_maxApplicationSubmissions = maxApplicationSubmissions;
_driverMemory = driverMemory;
-
+ _jobSubmissionEnvMap = jobSubmissionEnvMap;
+
_stdoutFilePath = string.IsNullOrWhiteSpace(stdoutFilePath) ?
Optional<string>.Empty() : Optional<string>.Of(stdoutFilePath);
@@ -81,6 +86,14 @@ namespace Org.Apache.REEF.Client.API
}
/// <summary>
+ /// The job submission environment variable map.
+ /// </summary>
+ public IDictionary<string, string> JobSubmissionEnvMap
+ {
+ get { return new Dictionary<string, string>(_jobSubmissionEnvMap); }
+ }
+
+ /// <summary>
/// Gets the file path for stdout for the driver.
/// </summary>
public Optional<string> StdoutFilePath
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
index 06ab7ee..39df14e 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/JobParametersBuilder.cs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+using System.Collections.Generic;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Client.API
@@ -29,6 +30,7 @@ namespace Org.Apache.REEF.Client.API
private int _driverMemory = 512;
private string _stdoutFilePath = null;
private string _stderrFilePath = null;
+ private readonly IDictionary<string, string> _jobSubmissionMap = new Dictionary<string, string>();
private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.Info;
private JobParametersBuilder()
@@ -53,6 +55,7 @@ namespace Org.Apache.REEF.Client.API
_jobIdentifier,
_maxApplicationSubmissions,
_driverMemory,
+ _jobSubmissionMap,
_stdoutFilePath,
_stderrFilePath,
_javaLogLevel);
@@ -89,6 +92,19 @@ namespace Org.Apache.REEF.Client.API
}
/// <summary>
+ /// Set job submission environment variable.
+ /// If the variable is already in the map, override it.
+ /// </summary>
+ /// <param name="key"></param>
+ /// <param name="value"></param>
+ /// <returns></returns>
+ public JobParametersBuilder SetJobSubmissionEnvironmentVariable(string key, string value)
+ {
+ _jobSubmissionMap[key] = value;
+ return this;
+ }
+
+ /// <summary>
/// Sets the file path to the stdout file for the driver.
/// </summary>
public JobParametersBuilder SetDriverStdoutFilePath(string stdoutFilePath)
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
index 325ed04..234cd7a 100644
--- a/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/API/JobRequestBuilder.cs
@@ -148,6 +148,18 @@ namespace Org.Apache.REEF.Client.API
}
/// <summary>
+ /// Set a job submission environment variable.
+ /// </summary>
+ /// <param name="key">key of the environment variable.</param>
+ /// <param name="value">Value of the environment variable.</param>
+ /// <returns></returns>
+ public JobRequestBuilder SetJobSubmissionEnvironmentVariable(string key, string value)
+ {
+ _jobParametersBuilder.SetJobSubmissionEnvironmentVariable(key, value);
+ return this;
+ }
+
+ /// <summary>
/// Sets the maximum amount of times a job can be submitted.
/// </summary>
public JobRequestBuilder SetMaxApplicationSubmissions(int maxAppSubmissions)
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
index c4e82fb..9bf2a64 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+using System.Collections.Generic;
using System.Runtime.Serialization;
using Org.Apache.REEF.Utilities.Attributes;
@@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Client.Avro.YARN
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroYarnClusterJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi
onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""dfsJobSubmissionFolder"",""type"":""string""},{""name"":""fileSystemUrl"",""type"":""string""},{""name"":""jobSubmissi
onDirectoryPrefix"",""type"":""string""}]}},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""},{""name"":""driverMemory"",""type"":""int""},{""name"":""envMap"",""type"":{""type"":""map"",""values"":""string""}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""driverStdoutFilePath"",""type"":""string""},{""name"":""driverStderrFilePath"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -66,6 +67,12 @@ namespace Org.Apache.REEF.Client.Avro.YARN
public int driverMemory { get; set; }
/// <summary>
+ /// Gets or sets the envMap field.
+ /// </summary>
+ [DataMember]
+ public IDictionary<string, string> envMap { get; set; }
+
+ /// <summary>
/// Gets or sets the maxApplicationSubmissions field.
/// </summary>
[DataMember]
@@ -99,15 +106,17 @@ namespace Org.Apache.REEF.Client.Avro.YARN
/// <param name="securityTokenKind">The securityTokenKind.</param>
/// <param name="securityTokenService">The securityTokenService.</param>
/// <param name="driverMemory">The driverMemory.</param>
+ /// <param name="envMap">The envMap.</param>
/// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param>
/// <param name="driverStdoutFilePath">The driverStdoutFilePath.</param>
/// <param name="driverStderrFilePath">The driverStderrFilePath.</param>
- public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath)
+ public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, string securityTokenKind, string securityTokenService, int driverMemory, IDictionary<string, string> envMap, int maxApplicationSubmissions, string driverStdoutFilePath, string driverStderrFilePath)
{
this.yarnJobSubmissionParameters = yarnJobSubmissionParameters;
this.securityTokenKind = securityTokenKind;
this.securityTokenService = securityTokenService;
this.driverMemory = driverMemory;
+ this.envMap = envMap;
this.maxApplicationSubmissions = maxApplicationSubmissions;
this.driverStdoutFilePath = driverStdoutFilePath;
this.driverStderrFilePath = driverStderrFilePath;
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index 7748895..a0b4f75 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -100,6 +100,7 @@ under the License.
<Compile Include="Local\Parameters\NumberOfEvaluators.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="YARN\ApplicationReport.cs" />
+ <Compile Include="YARN\Environment.cs" />
<Compile Include="YARN\HDI\HDInsightClientConfiguration.cs" />
<Compile Include="YARN\HDI\HDInsightCommandLineEnvironment.cs" />
<Compile Include="YARN\HDI\HDInsightCredential.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs b/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs
new file mode 100644
index 0000000..4704f2b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/Environment.cs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ /// <summary>
+ /// Default environment map keys from YARN.
+ /// <a href="http://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html">
+ /// Hadoop RM REST API</a> documentation.
+ /// </summary>
+ public enum Environment
+ {
+ /**
+ * USER
+ */
+ USER,
+
+ /**
+ * LOGNAME
+ */
+ LOGNAME,
+
+ /**
+ * HOME
+ */
+ HOME,
+
+ /**
+ * PWD
+ */
+ PWD,
+
+ /**
+ * PATH
+ */
+ PATH,
+
+ /**
+ * SHELL
+ */
+ SHELL,
+
+ /**
+ * JAVA_HOME
+ */
+ JAVA_HOME,
+
+ /**
+ * CLASSPATH
+ */
+ CLASSPATH,
+
+ /**
+ * APP_CLASSPATH
+ */
+ APP_CLASSPATH,
+
+ /**
+ * HADOOP_CLASSPATH.
+ */
+ HADOOP_CLASSPATH,
+
+ /**
+ * LD_LIBRARY_PATH
+ */
+ LD_LIBRARY_PATH,
+
+ /**
+ * HADOOP_CONF_DIR
+ */
+ HADOOP_CONF_DIR,
+
+ /**
+ * HADOOP_CLIENT_CONF_DIR Final, non-modifiable.
+ */
+ HADOOP_CLIENT_CONF_DIR,
+
+ /**
+ * $HADOOP_COMMON_HOME
+ */
+ HADOOP_COMMON_HOME,
+
+ /**
+ * $HADOOP_HDFS_HOME
+ */
+ HADOOP_HDFS_HOME,
+
+ /**
+ * $MALLOC_ARENA_MAX
+ */
+ MALLOC_ARENA_MAX,
+
+ /**
+ * $HADOOP_YARN_HOME
+ */
+ HADOOP_YARN_HOME,
+
+ /**
+ * $CLASSPATH_PREPEND_DISTCACHE
+ * Private, Windows specific
+ */
+ CLASSPATH_PREPEND_DISTCACHE,
+
+ /**
+ * $CONTAINER_ID
+ * Exported by NodeManager and non-modifiable by users.
+ */
+ CONTAINER_ID,
+
+ /**
+ * $NM_HOST
+ * Exported by NodeManager and non-modifiable by users.
+ */
+ NM_HOST,
+
+ /**
+ * $NM_HTTP_PORT
+ * Exported by NodeManager and non-modifiable by users.
+ */
+ NM_HTTP_PORT,
+
+ /**
+ * $NM_PORT
+ * Exported by NodeManager and non-modifiable by users.
+ */
+ NM_PORT,
+
+ /**
+ * $LOCAL_DIRS
+ * Exported by NodeManager and non-modifiable by users.
+ */
+ LOCAL_DIRS,
+
+ /**
+ * $LOG_DIRS
+ * Exported by NodeManager and non-modifiable by users.
+ * Comma separate list of directories that the container should use for
+ * logging.
+ */
+ LOG_DIRS
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
index c09eaad..8fd6a4f 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+using System.Collections.Generic;
+using System.Globalization;
using System.IO;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Avro;
@@ -166,6 +168,7 @@ namespace Org.Apache.REEF.Client.YARN
yarnJobSubmissionParameters = avroYarnJobSubmissionParameters,
driverMemory = jobParameters.DriverMemoryInMB,
+ envMap = jobParameters.JobSubmissionEnvMap,
maxApplicationSubmissions = jobParameters.MaxApplicationSubmissions,
driverStdoutFilePath = string.IsNullOrWhiteSpace(jobParameters.StdoutFilePath.Value) ?
_fileNames.GetDefaultYarnDriverStdoutFilePath() : jobParameters.StdoutFilePath.Value,
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
index 917e3eb..2285a50 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
@@ -94,6 +94,8 @@ namespace Org.Apache.REEF.Examples.HelloREEF
.AddDriverConfiguration(driverConfig.Build())
.AddGlobalAssemblyForType(typeof(HelloDriverYarn))
.SetJobIdentifier("HelloREEF")
+ .SetJobSubmissionEnvironmentVariable(Environment.PATH.ToString(), "value1")
+ .SetJobSubmissionEnvironmentVariable("UserDefineKey", "value2")
.SetJavaLogLevel(JavaLoggingSetting.Verbose)
.Build();
@@ -180,7 +182,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
/// <summary>
/// HelloREEF example running on YARN
- /// Usage: Org.Apache.REEF.Examples.HelloREEF SecurityTokenId SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]...
+ /// Usage: Org.Apache.REEF.Examples.HelloREEF TrustedApplicaitonLLQ SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]...
/// </summary>
/// <param name="args"></param>
public static void MainYarn(string[] args)
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index 62bc757..ecf0043 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -60,6 +60,7 @@
{ "name": "securityTokenKind", "type": "string", "default": "NULL" },
{ "name": "securityTokenService", "type": "string", "default": "NULL" },
{ "name": "driverMemory", "type": "int" },
+ {"name": "environmentVariablesMap", "type": {"type": "map", "values": "string"}},
{ "name": "maxApplicationSubmissions", "type": "int" },
{ "name": "driverStdoutFilePath", "type": "string" },
{ "name": "driverStderrFilePath", "type": "string" }
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
index 71968b9..b45a2f2 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
@@ -28,6 +28,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
/**
* Represents a job submission from the CS code.
@@ -57,6 +59,7 @@ final class YarnClusterSubmissionFromCS {
private final String fileSystemUrl;
private final String yarnDriverStdoutFilePath;
private final String yarnDriverStderrFilePath;
+ private final Map<String, String> environmentVariablesMap = new HashMap<>();
private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters;
private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters;
@@ -89,6 +92,13 @@ final class YarnClusterSubmissionFromCS {
this.yarnDriverStdoutFilePath = yarnClusterJobSubmissionParameters.getDriverStdoutFilePath().toString();
this.yarnDriverStderrFilePath = yarnClusterJobSubmissionParameters.getDriverStderrFilePath().toString();
+ if (yarnClusterJobSubmissionParameters.getEnvironmentVariablesMap() != null) {
+ for (Map.Entry<java.lang.CharSequence, java.lang.CharSequence> pair :
+ yarnClusterJobSubmissionParameters.getEnvironmentVariablesMap().entrySet()) {
+ this.environmentVariablesMap.put(pair.getKey().toString(), pair.getValue().toString());
+ }
+ }
+
Validate.notEmpty(jobId, "The job id is null or empty");
Validate.isTrue(driverMemory > 0, "The amount of driver memory given is <= 0.");
Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
@@ -121,9 +131,18 @@ final class YarnClusterSubmissionFromCS {
", tokenService='" + tokenService + '\'' +
", fileSystemUrl='" + fileSystemUrl + '\'' +
", jobSubmissionDirectoryPrefix='" + jobSubmissionDirectoryPrefix + '\'' +
+ envMapString() +
'}';
}
+ private String envMapString() {
+ final StringBuilder sb = new StringBuilder();
+ for (final Map.Entry<String, String> entry : environmentVariablesMap.entrySet()) {
+ sb.append(", Key:" + entry.getKey() + ", value:" + entry.getValue());
+ }
+ return sb.toString();
+ }
+
/**
* @return The local folder where the driver is staged.
*/
@@ -181,6 +200,13 @@ final class YarnClusterSubmissionFromCS {
}
/**
+ * @return The environment map.
+ */
+ Map<String, String> getEnvironmentVariablesMap() {
+ return new HashMap<>(environmentVariablesMap);
+ }
+
+ /**
* @return The max amount of times the application can be submitted.
*/
int getMaxApplicationSubmissions(){
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 0d71517..868201b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -171,6 +171,7 @@ public final class YarnJobSubmissionClient {
.setConfigurationFilePaths(confFiles)
.setDriverStdoutPath(yarnSubmission.getYarnDriverStdoutFilePath())
.setDriverStderrPath(yarnSubmission.getYarnDriverStderrFilePath())
+ .setJobSubmissionEnvMap(yarnSubmission.getEnvironmentVariablesMap())
.submit();
writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
index b8024f9..f733131 100644
--- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
+++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java
@@ -65,6 +65,10 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
"\"securityTokenService\":\"" + NULL_REP + "\"," +
"\"maxApplicationSubmissions\":" + NUMBER_REP + "," +
"\"driverMemory\":" + NUMBER_REP + "," +
+ "\"environmentVariablesMap\":" +
+ "{" +
+ "\"key\":" + STRING_REP_QUOTED +
+ "}," +
"\"driverStdoutFilePath\":" + STRING_REP_QUOTED + "," +
"\"driverStderrFilePath\":" + STRING_REP_QUOTED +
"}";
@@ -179,7 +183,7 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS {
assert yarnClusterSubmissionFromCS.getTokenService().equals(NULL_REP);
assert yarnClusterSubmissionFromCS.getYarnDriverStderrFilePath().equals(STRING_REP);
assert yarnClusterSubmissionFromCS.getYarnDriverStdoutFilePath().equals(STRING_REP);
-
+ assert yarnClusterSubmissionFromCS.getEnvironmentVariablesMap().get("key").equals(STRING_REP);
verifyYarnJobSubmissionParams(yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters(),
yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters());
}
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 5df89e7..7eec786 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -63,6 +63,7 @@ public final class YarnSubmissionHelper implements AutoCloseable {
private String driverStderrFilePath;
private Class launcherClazz = REEFLauncher.class;
private List<String> configurationFilePaths;
+ private final Map<String, String> environmentVariablesMap = new HashMap<>();
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
@@ -241,6 +242,29 @@ public final class YarnSubmissionHelper implements AutoCloseable {
}
/**
+ * Sets environment variable map.
+ * @param map
+ * @return
+ */
+ public YarnSubmissionHelper setJobSubmissionEnvMap(final Map<String, String> map) {
+ for (final Map.Entry<String, String> entry : map.entrySet()) {
+ environmentVariablesMap.put(entry.getKey(), entry.getValue());
+ }
+ return this;
+ }
+
+ /**
+ * Adds a job submission environment variable.
+ * @param key
+ * @param value
+ * @return
+ */
+ public YarnSubmissionHelper setJobSubmissionEnvVariable(final String key, final String value) {
+ environmentVariablesMap.put(key, value);
+ return this;
+ }
+
+ /**
* Sets the Driver stdout file path.
* @param driverStdoutPath
* @return
@@ -278,7 +302,7 @@ public final class YarnSubmissionHelper implements AutoCloseable {
}
final ContainerLaunchContext containerLaunchContext = YarnTypes.getContainerLaunchContext(
- launchCommand, this.resources, tokenProvider.getTokens());
+ launchCommand, this.resources, tokenProvider.getTokens(), environmentVariablesMap);
this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}, driver core: {1}",
http://git-wip-us.apache.org/repos/asf/reef/blob/987240cb/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
index 88b7134..cc22d40 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
@@ -51,27 +51,55 @@ public final class YarnTypes {
final List<String> commands,
final Map<String, LocalResource> localResources,
final byte[] securityTokenBuffer) {
- return getContainerLaunchContext(commands, localResources, securityTokenBuffer, null);
+ return getContainerLaunchContext(commands, localResources, securityTokenBuffer,
+ new HashMap<String, String>(), null);
+ }
+
+ /**
+ * @return a ContainerLaunchContext with the given commands, LocalResources and environment map.
+ */
+ public static ContainerLaunchContext getContainerLaunchContext(
+ final List<String> commands,
+ final Map<String, LocalResource> localResources,
+ final byte[] securityTokenBuffer,
+ final Map<String, String> envMap) {
+ return getContainerLaunchContext(commands, localResources, securityTokenBuffer, envMap, null);
+ }
+
+ /**
+ * Gets a LaunchContext and sets the environment variable.
+ * @return a ContainerLaunchContext with the given commands and LocalResources.
+ */
+ public static ContainerLaunchContext getContainerLaunchContext(
+ final List<String> commands,
+ final Map<String, LocalResource> localResources,
+ final byte[] securityTokenBuffer,
+ final ApplicationId applicationId) {
+ return getContainerLaunchContext(commands, localResources, securityTokenBuffer,
+ new HashMap<String, String>(), null);
}
/**
* Gets a LaunchContext and sets the environment variable
* {@link YarnUtilities#REEF_YARN_APPLICATION_ID_ENV_VAR} for REEF Evaluators.
- * @return a ContainerLaunchContext with the given commands and LocalResources.
+ * @return a ContainerLaunchContext with the given commands, LocalResources and environment map.
*/
public static ContainerLaunchContext getContainerLaunchContext(
final List<String> commands,
final Map<String, LocalResource> localResources,
final byte[] securityTokenBuffer,
+ final Map<String, String> envMap,
final ApplicationId applicationId) {
final ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
context.setLocalResources(localResources);
context.setCommands(commands);
- final Map<String, String> envMap = new HashMap<>();
if (applicationId != null) {
envMap.put(YarnUtilities.REEF_YARN_APPLICATION_ID_ENV_VAR, applicationId.toString());
}
+ for (final Map.Entry entry : envMap.entrySet()) {
+ LOG.log(Level.FINE, "Key : {0}, Value : {1}", new Object[] {entry.getKey(), entry.getValue()});
+ }
context.setEnvironment(envMap);
if (securityTokenBuffer != null) {
context.setTokens(ByteBuffer.wrap(securityTokenBuffer));