You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/11/11 02:33:41 UTC
incubator-reef git commit: [REEF-927] Switch Java .NET submission to
take an Avro configuration file for submission parameters
Repository: incubator-reef
Updated Branches:
refs/heads/master 2515c5377 -> ef5403a75
[REEF-927] Switch Java .NET submission to take an Avro configuration file for submission parameters
This addressed the issue by
* Creating Avro schemas to take the place of ordered arguments to Java from .NET submission.
* Removing ClrClient2JavaClientCuratedParameters.cs.
JIRA:
[REEF-927](https://issues.apache.org/jira/browse/REEF-927)
Pull Request:
This closes #624
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ef5403a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ef5403a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ef5403a7
Branch: refs/heads/master
Commit: ef5403a75df33c009f9c534f2a40b9b9b07af8da
Parents: 2515c53
Author: Andrew Chung <af...@gmail.com>
Authored: Mon Nov 9 18:29:47 2015 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Nov 10 17:31:14 2015 -0800
----------------------------------------------------------------------
.../Avro/AvroJobSubmissionParameters.cs | 98 ++++++++++++++++++
.../Local/AvroLocalJobSubmissionParameters.cs | 74 ++++++++++++++
lang/cs/Org.Apache.REEF.Client/Avro/README.md | 19 ++++
.../AvroYarnClusterJobSubmissionParameters.cs | 90 +++++++++++++++++
.../YARN/AvroYarnJobSubmissionParameters.cs | 90 +++++++++++++++++
.../ClrClient2JavaClientCuratedParameters.cs | 61 -----------
.../Org.Apache.REEF.Client/Local/LocalClient.cs | 81 +++++++++------
.../Org.Apache.REEF.Client.csproj | 9 +-
.../YARN/YARNREEFClient.cs | 58 +++++++----
.../Files/REEFFileNames.cs | 10 ++
lang/java/reef-bridge-client/pom.xml | 16 +++
.../src/main/avro/JobSubmissionParameters.avsc | 67 +++++++++++++
.../apache/reef/bridge/client/LocalClient.java | 10 +-
...ocalRuntimeDriverConfigurationGenerator.java | 11 +-
.../bridge/client/LocalSubmissionFromCS.java | 70 ++++++-------
.../YarnDriverConfigurationGenerator.java | 11 +-
.../bridge/client/YarnJobSubmissionClient.java | 11 +-
.../bridge/client/YarnSubmissionFromCS.java | 100 ++++++++-----------
18 files changed, 674 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
new file mode 100644
index 0000000..6d545a0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AvroJobSubmissionParameters.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Client.Avro
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroJobSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the jobId field.
+ /// </summary>
+ [DataMember]
+ public string jobId { get; set; }
+
+ /// <summary>
+ /// Gets or sets the tcpBeginPort field.
+ /// </summary>
+ [DataMember]
+ public int tcpBeginPort { get; set; }
+
+ /// <summary>
+ /// Gets or sets the tcpRangeCount field.
+ /// </summary>
+ [DataMember]
+ public int tcpRangeCount { get; set; }
+
+ /// <summary>
+ /// Gets or sets the tcpTryCount field.
+ /// </summary>
+ [DataMember]
+ public int tcpTryCount { get; set; }
+
+ /// <summary>
+ /// Gets or sets the jobSubmissionFolder field.
+ /// </summary>
+ [DataMember]
+ public string jobSubmissionFolder { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroJobSubmissionParameters"/> class.
+ /// </summary>
+ public AvroJobSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroJobSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="jobId">The jobId.</param>
+ /// <param name="tcpBeginPort">The tcpBeginPort.</param>
+ /// <param name="tcpRangeCount">The tcpRangeCount.</param>
+ /// <param name="tcpTryCount">The tcpTryCount.</param>
+ /// <param name="jobSubmissionFolder">The jobSubmissionFolder.</param>
+ public AvroJobSubmissionParameters(string jobId, int tcpBeginPort, int tcpRangeCount, int tcpTryCount, string jobSubmissionFolder)
+ {
+ this.jobId = jobId;
+ this.tcpBeginPort = tcpBeginPort;
+ this.tcpRangeCount = tcpRangeCount;
+ this.tcpTryCount = tcpTryCount;
+ this.jobSubmissionFolder = jobSubmissionFolder;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs
new file mode 100644
index 0000000..7010eb0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/Local/AvroLocalJobSubmissionParameters.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Client.Avro.Local
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroLocalJobSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the Local runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""maxNumberOfConcurrentEvaluators"",""type"":""int""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the sharedJobSubmissionParameters field.
+ /// </summary>
+ [DataMember]
+ public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maxNumberOfConcurrentEvaluators field.
+ /// </summary>
+ [DataMember]
+ public int maxNumberOfConcurrentEvaluators { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class.
+ /// </summary>
+ public AvroLocalJobSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroLocalJobSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
+ /// <param name="maxNumberOfConcurrentEvaluators">The maxNumberOfConcurrentEvaluators.</param>
+ public AvroLocalJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int maxNumberOfConcurrentEvaluators)
+ {
+ this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
+ this.maxNumberOfConcurrentEvaluators = maxNumberOfConcurrentEvaluators;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Avro/README.md
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/README.md b/lang/cs/Org.Apache.REEF.Client/Avro/README.md
new file mode 100644
index 0000000..185b3de
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/README.md
@@ -0,0 +1,19 @@
+C# Avro Code-Generated Files
+============================
+The files here are code-generated with modifications. The code
+generation is based on the Avro schemas defined in the folder
+lang/java/reef-bridge-client/src/avro. Please do not modify them
+unless there is absolutely a reason to!
+
+Instructions On C# Code-Generation
+----------------------------------
+To code-generate thes files, please use instructions on how
+to use Microsoft.Hadoop.Avro.Tools.exe as provided
+[here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/)
+on the files in lang/java/reef-bridge-client/src/avro.
+Note that as of the time of writing (11/10/2015), the
+Microsoft Azure HDInsight Avro Library NuGet does not include
+Microsoft.Hadoop.Avro.Tools.exe. To build it directly from source,
+please download the source code [here](http://hadoopsdk.codeplex.com/SourceControl/latest)
+and run msbuild. More information on how to build is available
+on the official documentation provided above.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
new file mode 100644
index 0000000..f31874c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnClusterJobSubmissionParameters.cs
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Client.Avro.YARN
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroYarnClusterJobSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters"",""doc"":""Cross-language submission parameters to the YARN runtime using Hadoop's submission client"",""fields"":[{""name"":""yarnJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}}
,{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}},{""name"":""maxApplicationSubmissions"",""type"":""int""},{""name"":""securityTokenKind"",""type"":""string""},{""name"":""securityTokenService"",""type"":""string""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the yarnJobSubmissionParameters field.
+ /// </summary>
+ [DataMember]
+ public AvroYarnJobSubmissionParameters yarnJobSubmissionParameters { get; set; }
+
+ /// <summary>
+ /// Gets or sets the maxApplicationSubmissions field.
+ /// </summary>
+ [DataMember]
+ public int maxApplicationSubmissions { get; set; }
+
+ /// <summary>
+ /// Gets or sets the securityTokenKind field.
+ /// </summary>
+ [DataMember]
+ public string securityTokenKind { get; set; }
+
+ /// <summary>
+ /// Gets or sets the securityTokenService field.
+ /// </summary>
+ [DataMember]
+ public string securityTokenService { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class.
+ /// </summary>
+ public AvroYarnClusterJobSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnClusterJobSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="yarnJobSubmissionParameters">The yarnJobSubmissionParameters.</param>
+ /// <param name="maxApplicationSubmissions">The maxApplicationSubmissions.</param>
+ /// <param name="securityTokenKind">The securityTokenKind.</param>
+ /// <param name="securityTokenService">The securityTokenService.</param>
+ public AvroYarnClusterJobSubmissionParameters(AvroYarnJobSubmissionParameters yarnJobSubmissionParameters, int maxApplicationSubmissions, string securityTokenKind, string securityTokenService)
+ {
+ this.yarnJobSubmissionParameters = yarnJobSubmissionParameters;
+ this.maxApplicationSubmissions = maxApplicationSubmissions;
+ this.securityTokenKind = securityTokenKind;
+ this.securityTokenService = securityTokenService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
new file mode 100644
index 0000000..a3be9b4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/YARN/AvroYarnJobSubmissionParameters.cs
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Client.Avro.YARN
+{
+ /// <summary>
+ /// Used to serialize and deserialize Avro record org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters.
+ /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
+ public sealed class AvroYarnJobSubmissionParameters
+ {
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters"",""doc"":""General cross-language submission parameters to the YARN runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""tcpBeginPort"",""type"":""int""},{""name"":""tcpRangeCount"",""type"":""int""},{""name"":""tcpTryCount"",""type"":""int""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""driverMemory"",""type"":""int""},{""name"":""driverRecoveryTimeout"",""type"":""int""},{""name"":""jobSubmissionDirectoryPrefix"",""type"":""string""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the sharedJobSubmissionParameters field.
+ /// </summary>
+ [DataMember]
+ public AvroJobSubmissionParameters sharedJobSubmissionParameters { get; set; }
+
+ /// <summary>
+ /// Gets or sets the driverMemory field.
+ /// </summary>
+ [DataMember]
+ public int driverMemory { get; set; }
+
+ /// <summary>
+ /// Gets or sets the driverRecoveryTimeout field.
+ /// </summary>
+ [DataMember]
+ public int driverRecoveryTimeout { get; set; }
+
+ /// <summary>
+ /// Gets or sets the jobSubmissionDirectoryPrefix field.
+ /// </summary>
+ [DataMember]
+ public string jobSubmissionDirectoryPrefix { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnJobSubmissionParameters"/> class.
+ /// </summary>
+ public AvroYarnJobSubmissionParameters()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AvroYarnJobSubmissionParameters"/> class.
+ /// </summary>
+ /// <param name="sharedJobSubmissionParameters">The sharedJobSubmissionParameters.</param>
+ /// <param name="driverMemory">The driverMemory.</param>
+ /// <param name="driverRecoveryTimeout">The driverRecoveryTimeout.</param>
+ /// <param name="jobSubmissionDirectoryPrefix">The jobSubmissionDirectoryPrefix.</param>
+ public AvroYarnJobSubmissionParameters(AvroJobSubmissionParameters sharedJobSubmissionParameters, int driverMemory, int driverRecoveryTimeout, string jobSubmissionDirectoryPrefix)
+ {
+ this.sharedJobSubmissionParameters = sharedJobSubmissionParameters;
+ this.driverMemory = driverMemory;
+ this.driverRecoveryTimeout = driverRecoveryTimeout;
+ this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
deleted file mode 100644
index 9aa025d..0000000
--- a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.Driver;
-using Org.Apache.REEF.Driver.Bridge;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote.Parameters;
-
-namespace Org.Apache.REEF.Client.Common
-{
- /// <summary>
- /// Curated parameters for CLR to Java. Passes a set of command line parameters to YarnJobSubmissionClient on
- /// the Java side. The command line parameters should be strictly ordered.
- /// </summary>
- internal class ClrClient2JavaClientCuratedParameters
- {
- public int TcpPortRangeStart { get; private set; }
- public int TcpPortRangeCount { get; private set; }
- public int TcpPortRangeTryCount { get; private set; }
- public int TcpPortRangeSeed { get; private set; }
- public int MaxApplicationSubmissions { get; private set; }
- public int DriverRestartEvaluatorRecoverySeconds { get; private set; }
-
-
- [Inject]
- private ClrClient2JavaClientCuratedParameters(
- [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart,
- [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount,
- [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount,
- [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed,
- [Parameter(typeof(DriverBridgeConfigurationOptions.MaxApplicationSubmissions))] int maxApplicationSubmissions,
- [Parameter(typeof(DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds))] int driverRestartEvaluatorRecoverySeconds)
- {
- TcpPortRangeStart = tcpPortRangeStart;
- TcpPortRangeCount = tcpPortRangeCount;
- TcpPortRangeTryCount = tcpPortRangeTryCount;
- TcpPortRangeSeed = tcpPortRangeSeed;
- MaxApplicationSubmissions = maxApplicationSubmissions;
- this.DriverRestartEvaluatorRecoverySeconds = driverRestartEvaluatorRecoverySeconds;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
index af2d47b..00b33d9 100644
--- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs
@@ -22,12 +22,16 @@ using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro;
+using Org.Apache.REEF.Client.Avro.Local;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.Local.Parameters;
+using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.Local
{
@@ -49,7 +53,7 @@ namespace Org.Apache.REEF.Client.Local
private static readonly Logger Logger = Logger.GetLogger(typeof(LocalClient));
private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper;
private readonly JavaClientLauncher _javaClientLauncher;
- private readonly int _numberOfEvaluators;
+ private readonly int _maxNumberOfConcurrentEvaluators;
private readonly string _runtimeFolder;
private string _driverUrl;
private REEFFileNames _fileNames;
@@ -57,13 +61,13 @@ namespace Org.Apache.REEF.Client.Local
[Inject]
private LocalClient(DriverFolderPreparationHelper driverFolderPreparationHelper,
[Parameter(typeof(LocalRuntimeDirectory))] string runtimeFolder,
- [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators,
+ [Parameter(typeof(NumberOfEvaluators))] int maxNumberOfConcurrentEvaluators,
JavaClientLauncher javaClientLauncher,
REEFFileNames fileNames)
{
_driverFolderPreparationHelper = driverFolderPreparationHelper;
_runtimeFolder = runtimeFolder;
- _numberOfEvaluators = numberOfEvaluators;
+ _maxNumberOfConcurrentEvaluators = maxNumberOfConcurrentEvaluators;
_javaClientLauncher = javaClientLauncher;
_fileNames = fileNames;
}
@@ -86,7 +90,36 @@ namespace Org.Apache.REEF.Client.Local
// Intentionally left blank.
}
- public void Submit(IJobSubmission jobSubmission)
+ private string CreateBootstrapAvroConfig(IJobSubmission jobSubmission, string driverFolder)
+ {
+ var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
+
+ var bootstrapArgs = new AvroJobSubmissionParameters
+ {
+ jobSubmissionFolder = driverFolder,
+ jobId = jobSubmission.JobIdentifier,
+ tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
+ tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
+ tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
+ };
+
+ var avroLocalBootstrapArgs = new AvroLocalJobSubmissionParameters
+ {
+ sharedJobSubmissionParameters = bootstrapArgs,
+ maxNumberOfConcurrentEvaluators = _maxNumberOfConcurrentEvaluators
+ };
+
+ var submissionArgsFilePath = Path.Combine(driverFolder, _fileNames.GetJobSubmissionParametersFile());
+ using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
+ {
+ var serializedArgs = AvroJsonSerializer<AvroLocalJobSubmissionParameters>.ToBytes(avroLocalBootstrapArgs);
+ argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
+
+ return submissionArgsFilePath;
+ }
+
+ private string PrepareDriverFolder(IJobSubmission jobSubmission)
{
// Prepare the job submission folder
var jobFolder = CreateJobFolder(jobSubmission.JobIdentifier);
@@ -95,41 +128,23 @@ namespace Org.Apache.REEF.Client.Local
_driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolder);
- //TODO: Remove this when we have a generalized way to pass config to java
- var javaParams = TangFactory.GetTang()
- .NewInjector(jobSubmission.DriverConfigurations.ToArray())
- .GetInstance<ClrClient2JavaClientCuratedParameters>();
-
- _javaClientLauncher.Launch(JavaClassName, driverFolder, jobSubmission.JobIdentifier,
- _numberOfEvaluators.ToString(),
- javaParams.TcpPortRangeStart.ToString(),
- javaParams.TcpPortRangeCount.ToString(),
- javaParams.TcpPortRangeTryCount.ToString()
- );
+ return driverFolder;
+ }
+
+ public void Submit(IJobSubmission jobSubmission)
+ {
+ var driverFolder = PrepareDriverFolder(jobSubmission);
+ var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder);
+ _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath);
Logger.Log(Level.Info, "Submitted the Driver for execution.");
}
public IDriverHttpEndpoint SubmitAndGetDriverUrl(IJobSubmission jobSubmission)
{
- // Prepare the job submission folder
- var jobFolder = CreateJobFolder(jobSubmission.JobIdentifier);
- var driverFolder = Path.Combine(jobFolder, DriverFolderName);
- Logger.Log(Level.Info, "Preparing driver folder in " + driverFolder);
-
- _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolder);
+ var driverFolder = PrepareDriverFolder(jobSubmission);
+ var submissionArgsFilePath = CreateBootstrapAvroConfig(jobSubmission, driverFolder);
- //TODO: Remove this when we have a generalized way to pass config to java
- var javaParams = TangFactory.GetTang()
- .NewInjector(jobSubmission.DriverConfigurations.ToArray())
- .GetInstance<ClrClient2JavaClientCuratedParameters>();
-
- Task.Run(() =>
- _javaClientLauncher.Launch(JavaClassName, driverFolder, jobSubmission.JobIdentifier,
- _numberOfEvaluators.ToString(),
- javaParams.TcpPortRangeStart.ToString(),
- javaParams.TcpPortRangeCount.ToString(),
- javaParams.TcpPortRangeTryCount.ToString()
- ));
+ Task.Run(() => _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath));
var fileName = Path.Combine(driverFolder, _fileNames.DriverHttpEndpoint);
HttpClientHelper helper = new HttpClientHelper();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index 5f31ded..ece257c 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -44,6 +44,9 @@ under the License.
<Reference Include="RestSharp, Version=100.0.0.0, Culture=neutral, PublicKeyToken=598062e77f915f75, processorArchitecture=MSIL">
<HintPath>$(PackagesDir)\RestSharpSigned.105.2.3\lib\net45\RestSharp.dll</HintPath>
</Reference>
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.IO.Compression" />
@@ -51,6 +54,7 @@ under the License.
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
<Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Runtime.Serialization" />
</ItemGroup>
<ItemGroup>
<Compile Include="API\ClientFactory.cs" />
@@ -63,8 +67,11 @@ under the License.
<Compile Include="API\JobSubmissionBuilder.cs" />
<Compile Include="API\JobSubmissionBuilderFactory.cs" />
<Compile Include="API\TcpPortConfigurationModule.cs" />
+ <Compile Include="Avro\AvroJobSubmissionParameters.cs" />
+ <Compile Include="Avro\Local\AvroLocalJobSubmissionParameters.cs" />
+ <Compile Include="Avro\YARN\AvroYarnJobSubmissionParameters.cs" />
+ <Compile Include="Avro\YARN\AvroYarnClusterJobSubmissionParameters.cs" />
<Compile Include="Common\ClientConstants.cs" />
- <Compile Include="Common\ClrClient2JavaClientCuratedParameters.cs" />
<Compile Include="Common\DriverFolderPreparationHelper.cs" />
<Compile Include="Common\FileSets.cs" />
<Compile Include="Common\HttpClientHelper.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
index 75a1834..b9f71d1 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
@@ -21,13 +21,17 @@ using System;
using System.IO;
using System.Linq;
using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro;
+using Org.Apache.REEF.Client.Avro.YARN;
using Org.Apache.REEF.Client.Common;
-using Org.Apache.REEF.Client.YARN;
using Org.Apache.REEF.Client.YARN.Parameters;
+using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Parameters;
namespace Org.Apache.REEF.Client.Yarn
{
@@ -95,25 +99,43 @@ namespace Org.Apache.REEF.Client.Yarn
_driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolderPath);
//TODO: Remove this when we have a generalized way to pass config to java
- var javaParams = TangFactory.GetTang()
- .NewInjector(jobSubmission.DriverConfigurations.ToArray())
- .GetInstance<ClrClient2JavaClientCuratedParameters>();
+ var paramInjector = TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
+
+
+ var avroJobSubmissionParameters = new AvroJobSubmissionParameters
+ {
+ jobId = jobSubmission.JobIdentifier,
+ tcpBeginPort = paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
+ tcpRangeCount = paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
+ tcpTryCount = paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
+ jobSubmissionFolder = driverFolderPath
+ };
+
+ var avroYarnJobSubmissionParameters = new AvroYarnJobSubmissionParameters
+ {
+ driverMemory = jobSubmission.DriverMemory,
+ driverRecoveryTimeout = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds, int>(),
+ jobSubmissionDirectoryPrefix = _jobSubmissionPrefix,
+ sharedJobSubmissionParameters = avroJobSubmissionParameters
+ };
+
+ var avroYarnClusterJobSubmissionParameters = new AvroYarnClusterJobSubmissionParameters
+ {
+ maxApplicationSubmissions = paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions, int>(),
+ securityTokenKind = _securityTokenKind,
+ securityTokenService = _securityTokenService,
+ yarnJobSubmissionParameters = avroYarnJobSubmissionParameters
+ };
+
+ var submissionArgsFilePath = Path.Combine(driverFolderPath, _fileNames.GetJobSubmissionParametersFile());
+ using (var argsFileStream = new FileStream(submissionArgsFilePath, FileMode.CreateNew))
+ {
+ var serializedArgs = AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters);
+ argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+ }
// Submit the driver
- _javaClientLauncher.Launch(
- JavaClassName,
- driverFolderPath, // arg: 0
- jobSubmission.JobIdentifier, // arg: 1
- jobSubmission.DriverMemory.ToString(), // arg: 2
- javaParams.TcpPortRangeStart.ToString(), // arg: 3
- javaParams.TcpPortRangeCount.ToString(), // arg: 4
- javaParams.TcpPortRangeTryCount.ToString(), // arg: 5
- javaParams.MaxApplicationSubmissions.ToString(), // arg: 6
- javaParams.DriverRestartEvaluatorRecoverySeconds.ToString(), // arg: 7
- _securityTokenKind, // arg: 8
- _securityTokenService, // arg: 9
- _jobSubmissionPrefix // arg: 10
- );
+ _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath);
Logger.Log(Level.Info, "Submitted the Driver for execution." + jobSubmission.JobIdentifier);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
index 72a1b65..4cfba30 100644
--- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
@@ -54,6 +54,7 @@ namespace Org.Apache.REEF.Common.Files
private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config";
private const string SECURITY_TOKEN_IDENTIFIER_FILE = "SecurityTokenId";
private const string SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd";
+ private const string JOB_SUBMISSION_PARAMETERS_FILE = "job-submission-params.json";
[Inject]
public REEFFileNames()
@@ -234,6 +235,15 @@ namespace Org.Apache.REEF.Common.Files
}
/// <summary>
+ /// The Job Submission parameters file that is used to submit a job through Java,
+ /// either directly or via a "bootstrap" method.
+ /// </summary>
+ public string GetJobSubmissionParametersFile()
+ {
+ return JOB_SUBMISSION_PARAMETERS_FILE;
+ }
+
+ /// <summary>
/// The filename for security token identifier
/// </summary>
/// <returns>filename which contains raw bytes of security token identifier</returns>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml
index f0da5c3..e0b31c0 100644
--- a/lang/java/reef-bridge-client/pom.xml
+++ b/lang/java/reef-bridge-client/pom.xml
@@ -115,6 +115,22 @@ under the License.
</pluginManagement>
<plugins>
<plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+ <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
new file mode 100644
index 0000000..1bf830d
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroJobSubmissionParameters",
+ "doc": "General cross-language submission parameters shared by all runtimes",
+ "fields": [
+ { "name": "jobId", "type": "string" },
+ { "name": "tcpBeginPort", "type": "int" },
+ { "name": "tcpRangeCount", "type": "int" },
+ { "name": "tcpTryCount", "type": "int" },
+ { "name": "jobSubmissionFolder", "type": "string" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroLocalJobSubmissionParameters",
+ "doc": "Cross-language submission parameters to the Local runtime",
+ "fields": [
+ { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" },
+ { "name": "maxNumberOfConcurrentEvaluators", "type": "int" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroYarnJobSubmissionParameters",
+ "doc": "General cross-language submission parameters to the YARN runtime",
+ "fields": [
+ { "name": "sharedJobSubmissionParameters", "type": "AvroJobSubmissionParameters" },
+ { "name": "driverMemory", "type": "int" },
+ { "name": "driverRecoveryTimeout", "type": "int" },
+ { "name": "jobSubmissionDirectoryPrefix", "type": "string" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.reef.bridge.client.avro",
+ "type": "record",
+ "name": "AvroYarnClusterJobSubmissionParameters",
+ "doc": "Cross-language submission parameters to the YARN runtime using Hadoop's submission client",
+ "fields": [
+ { "name": "yarnJobSubmissionParameters", "type": "AvroYarnJobSubmissionParameters" },
+ { "name": "maxApplicationSubmissions", "type": "int" },
+ { "name": "securityTokenKind", "type": "string" },
+ { "name": "securityTokenService", "type": "string" }
+ ]
+ }
+]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index e80164b..cc308ab 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -59,8 +59,14 @@ public final class LocalClient {
launcher.launch(driverFolder, localSubmissionFromCS.getJobId(), CLIENT_REMOTE_ID);
}
- public static void main(final String[] args) throws InjectionException, IOException {
- final LocalSubmissionFromCS localSubmissionFromCS = LocalSubmissionFromCS.fromCommandLine(args);
+ public static void main(final String[] args) throws IOException, InjectionException {
+ final File jobSubmissionParametersFile = new File(args[0]);
+ if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
+ }
+
+ final LocalSubmissionFromCS localSubmissionFromCS =
+ LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
LOG.log(Level.INFO, "Local job submission received from C#: {0}", localSubmissionFromCS);
final Configuration runtimeConfiguration = localSubmissionFromCS.getRuntimeConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
index 70d4ae3..93a8f2c 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -92,8 +92,15 @@ final class LocalRuntimeDriverConfigurationGenerator {
}
public static void main(final String[] args) throws InjectionException, IOException {
- final LocalSubmissionFromCS localSubmission = LocalSubmissionFromCS.fromCommandLine(args);
- LOG.log(Level.INFO, "Local driver config generation received from C#: {0}", localSubmission);
+ final File jobSubmissionParametersFile = new File(args[0]);
+ if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
+ }
+
+ final LocalSubmissionFromCS localSubmission =
+ LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+
+ LOG.log(Level.FINE, "Local driver config generation received from C#: {0}", localSubmission);
final Configuration localRuntimeConfiguration = localSubmission.getRuntimeConfiguration();
final LocalRuntimeDriverConfigurationGenerator localConfigurationGenerator = Tang.Factory.getTang()
.newInjector(localRuntimeConfiguration)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
index 7e926ea..e8d8118 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
@@ -18,9 +18,14 @@
*/
package org.apache.reef.bridge.client;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang.Validate;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
@@ -33,6 +38,8 @@ import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.ArrayList;
/**
@@ -46,32 +53,30 @@ final class LocalSubmissionFromCS {
private final File jobFolder;
private final File runtimeRootFolder;
private final String jobId;
- private final int numberOfEvaluators;
+ private final int maxNumberOfConcurrentEvaluators;
private final int tcpBeginPort;
private final int tcpRangeCount;
private final int tcpTryCount;
- private LocalSubmissionFromCS(final File driverFolder,
- final String jobId,
- final int numberOfEvaluators,
- final int tcpBeginPort,
- final int tcpRangeCount,
- final int tcpTryCount) {
+ private LocalSubmissionFromCS(final AvroLocalJobSubmissionParameters avroLocalJobSubmissionParameters) {
+ // We assume the given path to be the one of the driver. The job folder is one level up from there.
+ final AvroJobSubmissionParameters jobSubmissionParameters =
+ avroLocalJobSubmissionParameters.getSharedJobSubmissionParameters();
+ this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
+ this.jobId = jobSubmissionParameters.getJobId().toString();
+ this.maxNumberOfConcurrentEvaluators = avroLocalJobSubmissionParameters.getMaxNumberOfConcurrentEvaluators();
+ this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
+ this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
+ this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
+ this.jobFolder = driverFolder.getParentFile();
+ this.runtimeRootFolder = jobFolder.getParentFile();
+
Validate.isTrue(driverFolder.exists(), "The driver folder does not exist.");
Validate.notEmpty(jobId, "The job is is null or empty.");
- Validate.isTrue(numberOfEvaluators >= 0, "The number of evaluators is < 0.");
+ Validate.isTrue(maxNumberOfConcurrentEvaluators >= 0, "The number of evaluators is < 0.");
Validate.isTrue(tcpBeginPort >= 0, "The tcp start port given is < 0.");
Validate.isTrue(tcpRangeCount > 0, "The tcp range given is <= 0.");
Validate.isTrue(tcpTryCount > 0, "The tcp retry count given is <= 0.");
- // We assume the given path to be the one of the driver. The job folder is one level up from there.
- this.driverFolder = driverFolder;
- this.jobFolder = driverFolder.getParentFile();
- this.runtimeRootFolder = jobFolder.getParentFile();
- this.jobId = jobId;
- this.numberOfEvaluators = numberOfEvaluators;
- this.tcpBeginPort = tcpBeginPort;
- this.tcpRangeCount = tcpRangeCount;
- this.tcpTryCount = tcpTryCount;
}
/**
@@ -79,7 +84,7 @@ final class LocalSubmissionFromCS {
*/
Configuration getRuntimeConfiguration() {
final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
- .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, Integer.toString(numberOfEvaluators))
+ .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, Integer.toString(maxNumberOfConcurrentEvaluators))
.set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, runtimeRootFolder.getAbsolutePath())
.build();
@@ -108,7 +113,7 @@ final class LocalSubmissionFromCS {
", jobFolder=" + jobFolder +
", runtimeRootFolder=" + runtimeRootFolder +
", jobId='" + jobId + '\'' +
- ", numberOfEvaluators=" + numberOfEvaluators +
+ ", maxNumberOfConcurrentEvaluators=" + maxNumberOfConcurrentEvaluators +
", tcpBeginPort=" + tcpBeginPort +
", tcpRangeCount=" + tcpRangeCount +
", tcpTryCount=" + tcpTryCount +
@@ -130,23 +135,18 @@ final class LocalSubmissionFromCS {
}
/**
- * Gets parameters from C#:
- * <p>
- * args[0]: Driver folder.
- * args[1]: Job ID.
- * args[2]: Number of Evaluators.
- * args[3]: First port to open.
- * args[4]: Port range size.
- * args[5]: Port open trial count.
+ * Takes the local job submission configuration file, deserializes it, and creates submission object.
*/
- static LocalSubmissionFromCS fromCommandLine(final String[] args) {
- final File driverFolder = new File(args[0]);
- final String jobId = args[1];
- final int numberOfEvaluators = Integer.parseInt(args[2]);
- final int tcpBeginPort = Integer.parseInt(args[3]);
- final int tcpRangeCount = Integer.parseInt(args[4]);
- final int tcpTryCount = Integer.parseInt(args[5]);
+ static LocalSubmissionFromCS fromJobSubmissionParametersFile(final File localJobSubmissionParametersFile)
+ throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(localJobSubmissionParametersFile)) {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroLocalJobSubmissionParameters.getClassSchema(), fileInputStream);
+ final SpecificDatumReader<AvroLocalJobSubmissionParameters> reader =
+ new SpecificDatumReader<>(AvroLocalJobSubmissionParameters.class);
+ final AvroLocalJobSubmissionParameters localJobSubmissionParameters = reader.read(null, decoder);
- return new LocalSubmissionFromCS(driverFolder, jobId, numberOfEvaluators, tcpBeginPort, tcpRangeCount, tcpTryCount);
+ return new LocalSubmissionFromCS(localJobSubmissionParameters);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
index 7206a50..2c91b78 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnDriverConfigurationGenerator.java
@@ -125,10 +125,17 @@ final class YarnDriverConfigurationGenerator {
/**
* This main is executed from .NET to perform driver config generation.
* For arguments detail:
- * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromCommandLine(String[])
+ * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromJobSubmissionParametersFile(File)
*/
public static void main(final String[] args) throws InjectionException, IOException {
- final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args);
+ final File jobSubmissionParametersFile = new File(args[0]);
+ if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
+ }
+
+ final YarnSubmissionFromCS yarnSubmission =
+ YarnSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+
LOG.log(Level.INFO, "YARN driver config generation received from C#: {0}", yarnSubmission);
final Configuration yarnConfiguration = yarnSubmission.getRuntimeConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index fbd7abc..b1d0c8b 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -229,10 +229,17 @@ public final class YarnJobSubmissionClient {
/**
* .NET client calls into this main method for job submission.
* For arguments detail:
- * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromCommandLine(String[])
+ * @see org.apache.reef.bridge.client.YarnSubmissionFromCS#fromJobSubmissionParametersFile(File)
*/
public static void main(final String[] args) throws InjectionException, IOException, YarnException {
- final YarnSubmissionFromCS yarnSubmission = YarnSubmissionFromCS.fromCommandLine(args);
+ final File jobSubmissionParametersFile = new File(args[0]);
+ if (!(jobSubmissionParametersFile.exists() && jobSubmissionParametersFile.canRead())) {
+ throw new IOException("Unable to open and read " + jobSubmissionParametersFile.getAbsolutePath());
+ }
+
+ final YarnSubmissionFromCS yarnSubmission =
+ YarnSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+
LOG.log(Level.INFO, "YARN job submission received from C#: {0}", yarnSubmission);
if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {
// We have to write security token to user credential before YarnJobSubmissionClient is created
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ef5403a7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
index 3b38786..e8c5674 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionFromCS.java
@@ -18,10 +18,16 @@
*/
package org.apache.reef.bridge.client;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.lang.Validate;
import org.apache.reef.client.parameters.DriverConfigurationProviders;
import org.apache.reef.driver.parameters.MaxApplicationSubmissions;
import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
@@ -34,6 +40,8 @@ import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -44,6 +52,9 @@ import java.util.List;
* `Org.Apache.REEF.Client.YARN.YARNClient`
*/
final class YarnSubmissionFromCS {
+ private static final int DEFAULT_PRIORITY = 1;
+ private static final String DEFAULT_QUEUE = "default";
+
private final File driverFolder;
private final String jobId;
private final int driverMemory;
@@ -52,6 +63,7 @@ final class YarnSubmissionFromCS {
private final int tcpTryCount;
private final int maxApplicationSubmissions;
private final int driverRecoveryTimeout;
+
// Static for now
private final int priority;
private final String queue;
@@ -59,19 +71,26 @@ final class YarnSubmissionFromCS {
private final String tokenService;
private final String jobSubmissionDirectoryPrefix;
- private YarnSubmissionFromCS(final File driverFolder,
- final String jobId,
- final int driverMemory,
- final int tcpBeginPort,
- final int tcpRangeCount,
- final int tcpTryCount,
- final int maxApplicationSubmissions,
- final int driverRecoveryTimeout,
- final int priority,
- final String queue,
- final String tokenKind,
- final String tokenService,
- final String jobSubmissionDirectoryPrefix) {
+ private YarnSubmissionFromCS(final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
+ final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters =
+ yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
+
+ final AvroJobSubmissionParameters jobSubmissionParameters =
+ yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
+
+ this.driverFolder = new File(jobSubmissionParameters.getJobSubmissionFolder().toString());
+ this.jobId = jobSubmissionParameters.getJobId().toString();
+ this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
+ this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
+ this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
+ this.maxApplicationSubmissions = yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
+ this.driverRecoveryTimeout = yarnJobSubmissionParameters.getDriverRecoveryTimeout();
+ this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
+ this.priority = DEFAULT_PRIORITY;
+ this.queue = DEFAULT_QUEUE;
+ this.tokenKind = yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
+ this.tokenService = yarnClusterJobSubmissionParameters.getSecurityTokenService().toString();
+ this.jobSubmissionDirectoryPrefix = yarnJobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString();
Validate.isTrue(driverFolder.exists(), "The driver folder given does not exist.");
Validate.notEmpty(jobId, "The job id is null or empty");
@@ -84,20 +103,6 @@ final class YarnSubmissionFromCS {
Validate.notEmpty(tokenKind, "Token kind should be either NULL or some custom non empty value");
Validate.notEmpty(tokenService, "Token service should be either NULL or some custom non empty value");
Validate.notEmpty(jobSubmissionDirectoryPrefix, "Job submission directory prefix should not be empty");
-
- this.driverFolder = driverFolder;
- this.jobId = jobId;
- this.driverMemory = driverMemory;
- this.tcpBeginPort = tcpBeginPort;
- this.tcpRangeCount = tcpRangeCount;
- this.tcpTryCount = tcpTryCount;
- this.maxApplicationSubmissions = maxApplicationSubmissions;
- this.driverRecoveryTimeout = driverRecoveryTimeout;
- this.priority = priority;
- this.queue = queue;
- this.tokenKind = tokenKind;
- this.tokenService = tokenService;
- this.jobSubmissionDirectoryPrefix = jobSubmissionDirectoryPrefix;
}
@Override
@@ -197,35 +202,18 @@ final class YarnSubmissionFromCS {
}
/**
- * Takes 9 parameters from the C# side:
- * [0]: String. Driver folder.
- * [1]: String. Driver identifier.
- * [2]: int. Driver memory.
- * [3~5]: int. TCP configurations.
- * [6]: int. Max application submissions.
- * [7]: int. Evaluator recovery timeout for driver restart. > 0 => restart is enabled.
- * [8]: string: Security token kind. "NULL" => No security token is used
- * [9]: string: Security token service. "NULL" => No security token is used
- * [10]: string: Job submission directory prefix.
+ * Takes the YARN cluster job submission configuration file, deserializes it, and creates submission object.
*/
- static YarnSubmissionFromCS fromCommandLine(final String[] args) {
- final File driverFolder = new File(args[0]);
- final String jobId = args[1];
- final int driverMemory = Integer.parseInt(args[2]);
- final int tcpBeginPort = Integer.parseInt(args[3]);
- final int tcpRangeCount = Integer.parseInt(args[4]);
- final int tcpTryCount = Integer.parseInt(args[5]);
- final int maxApplicationSubmissions = Integer.parseInt(args[6]);
- final int driverRecoveryTimeout = Integer.parseInt(args[7]);
- final String securityTokenKind = args[8];
- final String securityTokenService = args[9];
- final String jobSubmissionDirectoryPrefix = args[10];
-
- // Static for now
- final int priority = 1;
- final String queue = "default";
- return new YarnSubmissionFromCS(driverFolder, jobId, driverMemory, tcpBeginPort, tcpRangeCount, tcpTryCount,
- maxApplicationSubmissions, driverRecoveryTimeout, priority, queue, securityTokenKind, securityTokenService,
- jobSubmissionDirectoryPrefix);
+ static YarnSubmissionFromCS fromJobSubmissionParametersFile(final File yarnClusterJobSubmissionParametersFile)
+ throws IOException {
+ try (final FileInputStream fileInputStream = new FileInputStream(yarnClusterJobSubmissionParametersFile)) {
+ final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+ AvroYarnClusterJobSubmissionParameters.getClassSchema(), fileInputStream);
+ final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = new SpecificDatumReader<>(
+ AvroYarnClusterJobSubmissionParameters.class);
+ final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = reader.read(null, decoder);
+
+ return new YarnSubmissionFromCS(yarnClusterJobSubmissionParameters);
+ }
}
}