You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/10/17 22:39:35 UTC
reef git commit: [REEF-1895] REEF Bridge performance improvement for
allocated evaluators
Repository: reef
Updated Branches:
refs/heads/master 35dc55eb6 -> e18b5d36a
[REEF-1895] REEF Bridge performance improvement for allocated evaluators
* Remove synchronize in JobDriver for allocated evaluator/context handlers
* Remove match code in AllocatedEvaluator in bridge as it is not used
* Bridge code change to improve code runtime reuse
* Reduce the cross bridge calls in AllocatedEvaluatorClr2Java
* Adding performance test
JIRA:
[REEF-1895](https://issues.apache.org/jira/browse/REEF-1895)
Pull Request:
This closes #1385
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/e18b5d36
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/e18b5d36
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/e18b5d36
Branch: refs/heads/master
Commit: e18b5d36ad714a877e2806f1c48f56401d6132da
Parents: 35dc55e
Author: Julia Wang <jw...@yahoo.com>
Authored: Mon Oct 9 20:19:31 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Oct 17 15:34:42 2017 -0700
----------------------------------------------------------------------
.../AllocatedEvaluatorClr2Java.cpp | 29 +--
lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 6 +-
lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp | 13 +-
.../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 6 +-
.../Context/ContextConfiguration.cs | 37 +++-
.../Tasks/TaskConfiguration.cs | 41 +++--
.../Bridge/ClrSystemHandlerWrapper.cs | 4 +-
.../Bridge/DriverBridge.cs | 23 ++-
.../Bridge/Events/AllocatedEvaluator.cs | 43 +----
.../Org.Apache.REEF.Tests.csproj | 3 +
.../TestHelloREEF/TestHelloDriver.cs | 158 ++++++++++++++++
.../TestHelloREEF/TestHelloREEFClient.cs | 181 +++++++++++++++++++
.../Performance/TestHelloREEF/TestHelloTask.cs | 45 +++++
.../Org.Apache.REEF.Utilities/Logging/Logger.cs | 4 +-
.../javabridge/AllocatedEvaluatorBridge.java | 8 +
.../apache/reef/javabridge/NativeInterop.java | 4 +-
.../reef/javabridge/generic/JobDriver.java | 91 +++++-----
.../yarn/client/YarnSubmissionHelper.java | 3 +-
18 files changed, 550 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp
index 387888c..af89bea 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp
@@ -30,7 +30,7 @@ namespace Org {
static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>");
};
- AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator) {
+ AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator, jstring nameServerInfo, jstring evaluatorId) {
ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java");
@@ -40,11 +40,8 @@ namespace Org {
}
_jobjectAllocatedEvaluator = reinterpret_cast<jobject>(env->NewGlobalRef(jallocatedEvaluator));
- jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator);
- _jstringId = CommonUtilities::GetJObjectId(env, _jobjectAllocatedEvaluator, jclassAllocatedEvaluator);
-
- jmethodID jmidGetNameServerInfo = env->GetMethodID(jclassAllocatedEvaluator, "getNameServerInfo", "()Ljava/lang/String;");
- _jstringNameServerInfo = CommonUtilities::CallGetMethodNewGlobalRef<jstring>(env, _jobjectAllocatedEvaluator, jmidGetNameServerInfo);
+ _evaluatorId = ManagedStringFromJavaString(env, evaluatorId);
+ _nameServerInfo = ManagedStringFromJavaString(env, nameServerInfo);
ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java");
}
@@ -58,14 +55,6 @@ namespace Org {
if (_jobjectAllocatedEvaluator != NULL) {
env->DeleteGlobalRef(_jobjectAllocatedEvaluator);
}
-
- if (_jstringId != NULL) {
- env->DeleteGlobalRef(_jstringId);
- }
-
- if (_jstringNameServerInfo != NULL) {
- env->DeleteGlobalRef(_jstringNameServerInfo);
- }
}
void AllocatedEvaluatorClr2Java::SubmitContext(String^ evaluatorConfigStr, String^ contextConfigStr) {
@@ -87,7 +76,7 @@ namespace Org {
}
void AllocatedEvaluatorClr2Java::SubmitContextAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ taskConfigStr) {
- ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndTask");
+ ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndTask" + taskConfigStr);
JNIEnv *env = RetrieveEnv(_jvm);
jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator);
jmethodID jmidSubmitContextAndTask = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndTaskString", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V");
@@ -102,7 +91,7 @@ namespace Org {
JavaStringFromManagedString(env, evaluatorConfigStr),
JavaStringFromManagedString(env, contextConfigStr),
JavaStringFromManagedString(env, taskConfigStr));
- ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContextAndTask");
+ ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContextAndTask" + taskConfigStr);
}
void AllocatedEvaluatorClr2Java::SubmitContextAndService(String^ evaluatorConfigStr, String^ contextConfigStr, String^ serviceConfigStr) {
@@ -166,15 +155,11 @@ namespace Org {
}
String^ AllocatedEvaluatorClr2Java::GetId() {
- ManagedLog::LOGGER->Log("AllocatedEvaluatorClr2Java::GetId");
- JNIEnv *env = RetrieveEnv(_jvm);
- return ManagedStringFromJavaString(env, _jstringId);
+ return _evaluatorId;
}
String^ AllocatedEvaluatorClr2Java::GetNameServerInfo() {
- ManagedLog::LOGGER->Log("AllocatedEvaluatorClr2Java::GetNameServerInfo");
- JNIEnv *env = RetrieveEnv(_jvm);
- return ManagedStringFromJavaString(env, _jstringNameServerInfo);
+ return _nameServerInfo;
}
IEvaluatorDescriptor^ AllocatedEvaluatorClr2Java::GetEvaluatorDescriptor() {
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index cf4b947..5fa16e7 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -56,10 +56,10 @@ namespace Org {
public ref class AllocatedEvaluatorClr2Java : public IAllocatedEvaluatorClr2Java {
jobject _jobjectAllocatedEvaluator = NULL;
JavaVM* _jvm;
- jstring _jstringId = NULL;
- jstring _jstringNameServerInfo = NULL;
+ String^ _evaluatorId;
+ String^ _nameServerInfo;
public:
- AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator);
+ AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator, jstring nameServerInfo, jstring evaluatorId);
~AllocatedEvaluatorClr2Java();
!AllocatedEvaluatorClr2Java();
virtual void SubmitContextAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ taskConfigStr);
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
index 4402c44..9618485 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
@@ -135,13 +135,16 @@ jbyteArray JavaByteArrayFromManagedByteArray(
return NULL;
}
+thread_local JNIEnv *t_env = NULL;
JNIEnv* RetrieveEnv(JavaVM* jvm) {
- JNIEnv *env;
- if (jvm->AttachCurrentThread((void **) &env, NULL) != 0) {
- ManagedLog::LOGGER->Log("cannot attach jni env to current jvm thread.");
- throw;
+ if (NULL == t_env)
+ {
+ if (jvm->AttachCurrentThread((void **)&t_env, NULL) != 0) {
+ ManagedLog::LOGGER->Log("cannot attach jni env to current jvm thread.");
+ throw;
+ }
}
- return env;
+ return t_env;
}
String^ FormatJavaExceptionMessage(String^ errorMessage, Exception^ exception, int recursionDepth) {
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
index 25a4c77..40611f7 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
@@ -155,9 +155,9 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSyst
* Signature: (JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V
*/
JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemAllocatedEvaluatorHandlerOnNext
-(JNIEnv *env, jclass cls, jlong handle, jobject jallocatedEvaluatorBridge, jobject jlogger) {
+(JNIEnv *env, jclass cls, jlong handle, jobject jallocatedEvaluatorBridge, jobject jlogger, jstring nameServerInfo, jstring evaluatorId) {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemAllocatedEvaluatorHandlerOnNext:");
- AllocatedEvaluatorClr2Java^ allocatedEval = gcnew AllocatedEvaluatorClr2Java(env, jallocatedEvaluatorBridge);
+ AllocatedEvaluatorClr2Java^ allocatedEval = gcnew AllocatedEvaluatorClr2Java(env, jallocatedEvaluatorBridge, nameServerInfo, evaluatorId);
try {
ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext(handle, allocatedEval);
}
@@ -600,7 +600,7 @@ static JNINativeMethod methods[] = {
{ "callClrSystemOnStartHandler", "()V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler },
- { "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V",
+ { "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;Ljava/lang/String;Ljava/lang/String;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemAllocatedEvaluatorHandlerOnNext },
{ "clrSystemActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;Lorg/apache/reef/javabridge/InteropLogger;)V",
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs
index 115dc4a..721f287 100644
--- a/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs
@@ -17,6 +17,7 @@
using System;
using System.Diagnostics.CodeAnalysis;
+using System.Net.NetworkInformation;
using Org.Apache.REEF.Common.Events;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
@@ -73,19 +74,37 @@ namespace Org.Apache.REEF.Common.Context
[SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
public static readonly OptionalImpl<IContextMessageHandler> OnMessage = new OptionalImpl<IContextMessageHandler>();
+ private static ConfigurationModule contextConfig;
+
+ private static readonly object ConfigLock = new object();
+
public static ConfigurationModule ConfigurationModule
{
get
{
- return new ContextConfiguration()
- .BindNamedParameter(GenericType<ContextConfigurationOptions.ContextIdentifier>.Class, Identifier)
- .BindSetEntry(GenericType<ContextConfigurationOptions.StartHandlers>.Class, OnContextStart)
- .BindSetEntry(GenericType<ContextConfigurationOptions.StopHandlers>.Class, OnContextStop)
- .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageSources>.Class, OnSendMessage)
- .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageHandlers>.Class, OnMessage)
- .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart)
- .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop)
- .Build();
+ if (contextConfig == null)
+ {
+ lock (ConfigLock)
+ {
+ if (contextConfig == null)
+ {
+ contextConfig = new ContextConfiguration()
+ .BindNamedParameter(GenericType<ContextConfigurationOptions.ContextIdentifier>.Class,
+ Identifier)
+ .BindSetEntry(GenericType<ContextConfigurationOptions.StartHandlers>.Class,
+ OnContextStart)
+ .BindSetEntry(GenericType<ContextConfigurationOptions.StopHandlers>.Class, OnContextStop)
+ .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageSources>.Class,
+ OnSendMessage)
+ .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageHandlers>.Class,
+ OnMessage)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop)
+ .Build();
+ }
+ }
+ }
+ return contextConfig;
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
index 34c683b..8f9ae2d 100644
--- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
@@ -87,22 +87,39 @@ namespace Org.Apache.REEF.Common.Tasks
[SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>();
+ private static ConfigurationModule taskConfig;
+
+ private static readonly object ConfigLock = new object();
+
public static ConfigurationModule ConfigurationModule
{
get
{
- return new TaskConfiguration()
- .BindImplementation(GenericType<ITask>.Class, Task)
- .BindSetEntry(GenericType<TaskConfigurationOptions.TaskMessageSources>.Class, OnSendMessage)
- .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage)
- .BindImplementation(GenericType<IDriverConnectionMessageHandler>.Class, OnDriverConnectionChanged)
- .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier)
- .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento)
- .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose)
- .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, OnSuspend)
- .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart)
- .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop)
- .Build();
+ if (taskConfig == null)
+ {
+ lock (ConfigLock)
+ {
+ if (taskConfig == null)
+ {
+ taskConfig = new TaskConfiguration()
+ .BindImplementation(GenericType<ITask>.Class, Task)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.TaskMessageSources>.Class,
+ OnSendMessage)
+ .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage)
+ .BindImplementation(GenericType<IDriverConnectionMessageHandler>.Class,
+ OnDriverConnectionChanged)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class,
+ OnSuspend)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop)
+ .Build();
+ }
+ }
+ }
+ return taskConfig;
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
index 0895aa0..b203b8a 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
@@ -40,11 +40,11 @@ namespace Org.Apache.REEF.Driver.Bridge
public static void Call_ClrSystemAllocatedEvaluatorHandler_OnNext(ulong handle, IAllocatedEvaluatorClr2Java clr2Java)
{
- using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext"))
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext", clr2Java.GetId()))
{
GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
ClrSystemHandler<IAllocatedEvaluator> obj = (ClrSystemHandler<IAllocatedEvaluator>)gc.Target;
- obj.OnNext(new AllocatedEvaluator(clr2Java, _driverBridge.ConfigurationProviders));
+ obj.OnNext(new AllocatedEvaluator(clr2Java, _driverBridge.ConfigurationStringForProviders));
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
index dd30462..8d2bf29 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
@@ -19,6 +19,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
+using System.Linq;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Common.Evaluator;
using Org.Apache.REEF.Common.Evaluator.DriverConnectionConfigurationProviders;
@@ -32,6 +33,8 @@ using Org.Apache.REEF.Common.Evaluator.Parameters;
using Org.Apache.REEF.Driver.Bridge.Clr2java;
using Org.Apache.REEF.Driver.Bridge.Events;
using Org.Apache.REEF.Driver.Defaults;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -117,7 +120,7 @@ namespace Org.Apache.REEF.Driver.Bridge
private readonly HttpServerHandler _httpServerHandler;
- private readonly ISet<IConfigurationProvider> _configurationProviders;
+ private readonly string _configurationProviderString;
private readonly IProgressProvider _progressProvider;
@@ -148,7 +151,8 @@ namespace Org.Apache.REEF.Driver.Bridge
IDriverReconnConfigProvider driverReconnConfigProvider,
IDriverConnection driverConnection,
HttpServerHandler httpServerHandler,
- IProgressProvider progressProvider)
+ IProgressProvider progressProvider,
+ AvroConfigurationSerializer serializer)
{
foreach (TraceListener listener in traceListeners)
{
@@ -188,10 +192,10 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartFailedEvaluatorHandlers = driverRestartFailedEvaluatorHandlers;
_httpServerHandler = httpServerHandler;
- _configurationProviders = new HashSet<IConfigurationProvider>(configurationProviders) { driverReconnConfigProvider };
+ var configurationProviderSet = new HashSet<IConfigurationProvider>(configurationProviders) { driverReconnConfigProvider };
+ _configurationProviderString = serializer.ToString(Configurations.Merge(configurationProviderSet.Select(x => x.GetConfiguration()).ToArray()));
+ _progressProvider = progressProvider;
- _progressProvider = progressProvider;
-
_allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>();
_completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>();
_taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>();
@@ -384,9 +388,12 @@ namespace Org.Apache.REEF.Driver.Bridge
}
}
- internal ISet<IConfigurationProvider> ConfigurationProviders
- {
- get { return _configurationProviders; }
+ /// <summary>
+ /// Serialized configuration string for configurations from configuration providers.
+ /// </summary>
+ internal string ConfigurationStringForProviders
+ {
+ get { return _configurationProviderString; }
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
index 174ec72..3e70d67 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
@@ -16,14 +16,12 @@
// under the License.
using System;
-using System.Collections.Generic;
using System.Runtime.Serialization;
using Org.Apache.REEF.Common.Catalog;
using Org.Apache.REEF.Common.Evaluator;
using Org.Apache.REEF.Driver.Bridge.Clr2java;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Logging;
@@ -31,7 +29,7 @@ using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Driver.Bridge.Events
{
[DataContract]
- internal class AllocatedEvaluator : IAllocatedEvaluator
+ internal sealed class AllocatedEvaluator : IAllocatedEvaluator
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(AllocatedEvaluator));
@@ -41,23 +39,19 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
private readonly string _evaluatorConfigStr;
- public AllocatedEvaluator(IAllocatedEvaluatorClr2Java clr2Java, ISet<IConfigurationProvider> configurationProviders)
+ public AllocatedEvaluator(IAllocatedEvaluatorClr2Java clr2Java, string configuration)
{
- _serializer = new AvroConfigurationSerializer();
-
- var evaluatorConfig = TangFactory.GetTang().NewConfigurationBuilder().Build();
- foreach (var configurationProvider in configurationProviders)
+ using (LOGGER.LogFunction("AllocatedEvaluator::AllocatedEvaluator:", clr2Java.GetId()))
{
- evaluatorConfig = Configurations.Merge(evaluatorConfig, configurationProvider.GetConfiguration());
- }
-
- _evaluatorConfigStr = _serializer.ToString(evaluatorConfig);
+ _serializer = TangFactory.GetTang().NewInjector().GetInstance<AvroConfigurationSerializer>();
+ _evaluatorConfigStr = configuration;
- Clr2Java = clr2Java;
- Id = Clr2Java.GetId();
- ProcessNewEvaluator();
+ Clr2Java = clr2Java;
+ Id = Clr2Java.GetId();
+ ProcessNewEvaluator();
- NameServerInfo = Clr2Java.GetNameServerInfo();
+ NameServerInfo = Clr2Java.GetNameServerInfo();
+ }
}
public string Id { get; private set; }
@@ -134,23 +128,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
private void ProcessNewEvaluator()
{
_evaluatorDescriptor = Clr2Java.GetEvaluatorDescriptor();
- lock (EvaluatorRequestor.Evaluators)
- {
- foreach (KeyValuePair<string, IEvaluatorDescriptor> pair in EvaluatorRequestor.Evaluators)
- {
- if (pair.Value.Equals(_evaluatorDescriptor))
- {
- var key = pair.Key;
- EvaluatorRequestor.Evaluators.Remove(key);
- var assignedId = key.Substring(0, key.LastIndexOf(EvaluatorRequestor.BatchIdxSeparator));
-
- LOGGER.Log(Level.Verbose, "Received evaluator [{0}] of memory {1}MB that matches request of {2}MB with batch id [{3}].",
- Id, _evaluatorDescriptor.Memory, pair.Value.Memory, assignedId);
- EvaluatorBatchId = assignedId;
- break;
- }
- }
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 252bf75..b83b705 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -165,6 +165,9 @@ under the License.
<Compile Include="Functional\Telemetry\MetricsDriver.cs" />
<Compile Include="Functional\Telemetry\MetricsTask.cs" />
<Compile Include="Functional\Telemetry\TestMetricsMessage.cs" />
+ <Compile Include="Performance\TestHelloREEF\TestHelloDriver.cs" />
+ <Compile Include="Performance\TestHelloREEF\TestHelloREEFClient.cs" />
+ <Compile Include="Performance\TestHelloREEF\TestHelloTask.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utility\TestDriverConfigGenerator.cs" />
<Compile Include="Utility\TestExceptions.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs
new file mode 100644
index 0000000..2609857
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Performance.TestHelloREEF
+{
+ /// <summary>
+ /// The Driver for HelloREEF: It requests a single Evaluator and then submits the HelloTask to it.
+ /// </summary>
+ public sealed class TestHelloDriver : IObserver<IAllocatedEvaluator>,
+ IObserver<IDriverStarted>,
+ IObserver<IFailedEvaluator>,
+ IObserver<IFailedTask>,
+ IObserver<ICompletedTask>,
+ IObserver<IRunningTask>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(TestHelloDriver));
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
+
+ /// <summary>
+ /// Specify if the desired node names is relaxed
+ /// </summary>
+ private readonly bool _relaxLocality;
+
+ private readonly int _numberOfContainers;
+
+ /// <summary>
+ /// Constructor of the driver
+ /// </summary>
+ /// <param name="evaluatorRequestor">Evaluator Requestor</param>
+ /// <param name="relaxLocality">Relax indicator of evaluator node request</param>
+ /// <param name="numberOfContainers">Relax indicator of evaluator node request</param>
+ [Inject]
+ private TestHelloDriver(IEvaluatorRequestor evaluatorRequestor,
+ [Parameter(typeof(RelaxLocality))] bool relaxLocality,
+ [Parameter(typeof(NumberOfContainers))] int numberOfContainers)
+ {
+ Logger.Log(Level.Info, "HelloDriverYarn Driver: numberOfContainers: {0}.", numberOfContainers);
+ _evaluatorRequestor = evaluatorRequestor;
+ _relaxLocality = relaxLocality;
+ _numberOfContainers = numberOfContainers;
+ }
+
+ /// <summary>
+ /// Submits the HelloTask to the Evaluator.
+ /// </summary>
+ /// <param name="allocatedEvaluator"></param>
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+ {
+ var msg = string.Format("Received allocatedEvaluator-HostName: {0}, id {1}",
+ allocatedEvaluator.GetEvaluatorDescriptor().NodeDescriptor.HostName,
+ allocatedEvaluator.Id);
+ using (Logger.LogFunction("IAllocatedEvaluator handler:", msg))
+ {
+ var taskConfiguration = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, "HelloTask-" + allocatedEvaluator.Id)
+ .Set(TaskConfiguration.Task, GenericType<TestHelloTask>.Class)
+ .Build();
+ allocatedEvaluator.SubmitTask(taskConfiguration);
+ }
+ }
+
+ /// <summary>
+ /// Called to start the user mode driver
+ /// </summary>
+ /// <param name="driverStarted"></param>
+ public void OnNext(IDriverStarted driverStarted)
+ {
+ Logger.Log(Level.Info, "Received IDriverStarted, numberOfContainers: {0}", _numberOfContainers);
+
+ _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
+ .SetMegabytes(64)
+ .SetNumber(_numberOfContainers)
+ .SetRelaxLocality(_relaxLocality)
+ .SetCores(1)
+ .Build());
+ }
+
+ /// <summary>
+ /// A simple ICompletedTask handler.
+ /// </summary>
+ /// <param name="value"></param>
+ void IObserver<ICompletedTask>.OnNext(ICompletedTask value)
+ {
+ Logger.Log(Level.Info, "Received ICompletedTask: {0} with evaluator id: {1}.", value.Id, value.ActiveContext.EvaluatorId);
+ value.ActiveContext.Dispose();
+ }
+
+ /// <summary>
+ /// A simple IFailedTask handler.
+ /// </summary>
+ /// <param name="value"></param>
+ void IObserver<IFailedTask>.OnNext(IFailedTask value)
+ {
+ Logger.Log(Level.Info, "Received IFailedTask: {0} with evaluator id: {1}.", value.Id, value.GetActiveContext().Value.EvaluatorId);
+ value.GetActiveContext().Value.Dispose();
+ }
+
+ /// <summary>
+ /// A simple IFailedEvaluator handler.
+ /// </summary>
+ /// <param name="value"></param>
+ void IObserver<IFailedEvaluator>.OnNext(IFailedEvaluator value)
+ {
+ Logger.Log(Level.Info, "Received IFailedEvaluator: {0}.", value.Id);
+ }
+
+ /// <summary>
+ /// A simple IRunningTask handler.
+ /// </summary>
+ /// <param name="value"></param>
+ void IObserver<IRunningTask>.OnNext(IRunningTask value)
+ {
+ Logger.Log(Level.Info, "Received IRunningTask: {0} with evaluator id: {1}", value.Id, value.ActiveContext.EvaluatorId);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ }
+ }
+
+ [NamedParameter(documentation: "RelaxLocality for specifying evaluator node names", shortName: "RelaxLocality", defaultValue: "true")]
+ internal class RelaxLocality : Name<bool>
+ {
+ }
+
+ [NamedParameter(documentation: "NumberOfContainers", defaultValue: "1")]
+ internal class NumberOfContainers : Name<int>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs
new file mode 100644
index 0000000..454cd32
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text;
+using System.Threading;
+using Newtonsoft.Json;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro.YARN;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Local;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Performance.TestHelloREEF
+{
+ /// <summary>
+ /// Test Hello REEF for scalability
+ /// </summary>
+ [Collection("PerformanceTests")]
+ public class TestHelloREEFClient : ReefFunctionalTest
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(TestHelloREEFClient));
+
+ private const int ReTryCounts = 300;
+ private const int SleepTime = 2000;
+ private const string DefaultPortRangeStart = "2000";
+ private const string DefaultPortRangeCount = "20";
+ private const string TrustedApplicationTokenIdentifier = "TrustedApplicationTokenIdentifier";
+
+ /// <summary>
+ /// Test HelloREEF on local runtime.
+ /// </summary>
+ [Fact]
+ [Trait("Priority", "1")]
+ [Trait("Category", "FunctionalGated")]
+ [Trait("Description", "Test Hello Handler on local runtime")]
+ public void TestHelloREEFOnLocal()
+ {
+ int numberOfContainers = 5;
+ int driverMemory = 1024;
+ string testFolder = DefaultRuntimeFolder + TestId;
+ TestRun(GetRuntimeConfigurationForLocal(numberOfContainers, testFolder), driverMemory);
+ CleanUp(testFolder);
+ }
+
+ /// <summary>
+ /// Test HelloREEF on YARN.
+ /// The parameter are provided on command line arguments: token password numberOfContainers
+ /// e.g. TestDriver.exe TrustedApplication001 none 2000
+ /// </summary>
+ /// <param name="args"></param>
+ [Fact]
+ [Trait("Environment", "Yarn")]
+ [Trait("Priority", "1")]
+ [Trait("Description", "Run CLR Test on Yarn")]
+ public void TestHelloREEFOnYarn(string[] args)
+ {
+ TestRun(GetRuntimeConfigurationForYarn(args), 10240);
+ }
+
+ /// <summary>
+ /// Test run for the runtime in the given injector.
+ /// </summary>
+ /// <param name="config">runtime configuration.</param>
+ /// <param name="driverMemory">driver memory in MB.</param>
+ private void TestRun(IConfiguration config, int driverMemory)
+ {
+ IInjector injector = TangFactory.GetTang().NewInjector(config);
+ var jobRequestBuilder = injector.GetInstance<JobRequestBuilder>();
+ var reefClient = injector.GetInstance<IREEFClient>();
+ var numberOfContainers = injector.GetNamedInstance<NumberOfContainers, int>(GenericType<NumberOfContainers>.Class);
+
+ //// The driver configuration contains all the needed handler bindings
+ var helloDriverConfiguration = DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TestHelloDriver>.Class)
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<TestHelloDriver>.Class)
+ .Set(DriverConfiguration.OnTaskCompleted, GenericType<TestHelloDriver>.Class)
+ .Set(DriverConfiguration.OnTaskFailed, GenericType<TestHelloDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TestHelloDriver>.Class)
+ .Set(DriverConfiguration.OnTaskRunning, GenericType<TestHelloDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
+ .Build();
+
+ var driverConfig = TangFactory.GetTang()
+ .NewConfigurationBuilder(helloDriverConfiguration)
+ .BindIntNamedParam<NumberOfContainers>(numberOfContainers.ToString());
+
+ // The JobSubmission contains the Driver configuration as well as the files needed on the Driver.
+ var helloJobRequest = jobRequestBuilder
+ .AddDriverConfiguration(driverConfig.Build())
+ .AddGlobalAssemblyForType(typeof(TestHelloDriver))
+ .SetJobIdentifier("TestHelloREEF")
+ .SetDriverMemory(driverMemory)
+ .Build();
+
+ var result = reefClient.SubmitAndGetJobStatus(helloJobRequest);
+ var state = PullFinalJobStatus(result);
+ Logger.Log(Level.Info, "Application final state : {0}.", state);
+ Assert.Equal(FinalState.SUCCEEDED, state);
+ }
+
+ /// <summary>
+ /// Get runtime configuration
+ /// </summary>
+ private static IConfiguration GetRuntimeConfigurationForYarn(string[] args)
+ {
+ var token = new SecurityToken(
+ TrustedApplicationTokenIdentifier,
+ TrustedApplicationTokenIdentifier,
+ ByteUtilities.StringToByteArrays(args[0]),
+ Encoding.ASCII.GetBytes(args[1]));
+
+ var clientConfig = YARNClientConfiguration.ConfigurationModule
+ .Set(YARNClientConfiguration.SecurityTokenStr, JsonConvert.SerializeObject(token))
+ .Build();
+
+ var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule
+ .Set(TcpPortConfigurationModule.PortRangeStart, args.Length > 3 ? args[3] : DefaultPortRangeStart)
+ .Set(TcpPortConfigurationModule.PortRangeCount, args.Length > 4 ? args[4] : DefaultPortRangeCount)
+ .Build();
+
+ var c = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindIntNamedParam<NumberOfContainers>(args[2])
+ .Build();
+
+ return Configurations.Merge(clientConfig, tcpPortConfig, c);
+ }
+
+ private static IConfiguration GetRuntimeConfigurationForLocal(int numberOfContainers, string testFolder)
+ {
+ var runtimeConfig = LocalRuntimeClientConfiguration.ConfigurationModule
+ .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, numberOfContainers.ToString())
+ .Set(LocalRuntimeClientConfiguration.RuntimeFolder, testFolder)
+ .Build();
+
+ return TangFactory.GetTang().NewConfigurationBuilder(runtimeConfig)
+ .BindIntNamedParam<NumberOfContainers>(numberOfContainers.ToString())
+ .Build();
+ }
+
+ /// <summary>
+ /// Sample code to pull job final status until the Job is done
+ /// </summary>
+ /// <param name="jobSubmitionResult"></param>
+ /// <returns></returns>
+ private FinalState PullFinalJobStatus(IJobSubmissionResult jobSubmitionResult)
+ {
+ int n = 0;
+ var state = jobSubmitionResult.FinalState;
+ while (state.Equals(FinalState.UNDEFINED) && n++ < ReTryCounts)
+ {
+ Thread.Sleep(SleepTime);
+ state = jobSubmitionResult.FinalState;
+ }
+ return state;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs
new file mode 100644
index 0000000..b9aac5d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Tests.Performance.TestHelloREEF
+{
+ /// <summary>
+ /// A Task that merely prints a greeting and exits.
+ /// </summary>
+ public sealed class TestHelloTask : ITask
+ {
+ [Inject]
+ private TestHelloTask()
+ {
+ }
+
+ public void Dispose()
+ {
+ Console.WriteLine("Disposed.");
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ Console.WriteLine("Hello, REEF!");
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
index 1733639..8ee9691 100644
--- a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
+++ b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs
@@ -145,7 +145,7 @@ namespace Org.Apache.REEF.Utilities.Logging
/// <summary>
/// Log the message with the specified Log Level.
///
- /// If addtional arguments are passed, the message will be treated as
+ /// If additional arguments are passed, the message will be treated as
/// a format string. The format string and the additional arguments
/// will be formatted according to string.Format()
/// </summary>
@@ -161,7 +161,7 @@ namespace Org.Apache.REEF.Utilities.Logging
DateTime.Now.ToString("o", CultureInfo.InvariantCulture)
+ " "
+ System.Threading.Thread.CurrentThread.ManagedThreadId.ToString("D4", CultureInfo.InvariantCulture)
- + Environment.NewLine + LogLevel[(int)level] + ": "
+ + " : " + LogLevel[(int)level] + ": "
+ msg;
_traceSource.TraceEvent(
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
index 9f2a57a..568a3d7 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
@@ -24,6 +24,9 @@ import org.apache.reef.io.naming.Identifiable;
import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,6 +64,11 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden
public void submitContextAndTaskString(final String evaluatorConfigurationString,
final String contextConfigurationString,
final String taskConfigurationString) {
+
+ final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ LOG.log(Level.FINE, "AllocatedEvaluatorBridge:submitContextAndTaskString for evaluator id: {0}, time: {1}.",
+ new Object[] {evaluatorId, dateFormat.format(new Date())});
+
if (evaluatorConfigurationString.isEmpty()) {
throw new RuntimeException("empty evaluatorConfigurationString provided.");
}
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
index 83f783f..de9f95b 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -40,7 +40,9 @@ public final class NativeInterop {
public static native void clrSystemAllocatedEvaluatorHandlerOnNext(
final long handle,
final AllocatedEvaluatorBridge javaEvaluatorBridge,
- final InteropLogger interopLogger
+ final InteropLogger interopLogger,
+ final String nameServerInfo,
+ final String evaluatorId
);
public static native void clrSystemActiveContextHandlerOnNext(
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index 8a0fb86..ddc3385 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -53,7 +53,10 @@ import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -109,7 +112,7 @@ public final class JobDriver {
/**
* Map from context ID to running evaluator context.
*/
- private final Map<String, ActiveContext> contexts = new HashMap<>();
+ private final ConcurrentHashMap<String, ActiveContext> contexts = new ConcurrentHashMap<>();
private final REEFFileNames reefFileNames;
private final LocalAddressProvider localAddressProvider;
@@ -118,13 +121,14 @@ public final class JobDriver {
*/
private final LoggingScopeFactory loggingScopeFactory;
private final Set<String> definedRuntimes;
+ private final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
private BridgeHandlerManager handlerManager = null;
private boolean isRestarted = false;
// We are holding on to following on bridge side.
// Need to add references here so that GC does not collect them.
- private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges =
- new HashMap<>();
+ private final ConcurrentHashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges =
+ new ConcurrentHashMap<>();
private EvaluatorRequestorBridge evaluatorRequestorBridge;
@@ -230,41 +234,40 @@ public final class JobDriver {
}
private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) {
- synchronized (JobDriver.this) {
- eval.setProcess(process);
- LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
- new Object[]{eval.getId(), JobDriver.this.contexts.size()});
- if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) {
- throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
- }
- final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
- this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
- allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
- NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(
- JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger);
+ eval.setProcess(process);
+ LOG.log(Level.FINE, "Allocated Evaluator: {0}, total running running {1}.",
+ new Object[]{eval.getId(), JobDriver.this.contexts.size()});
+ final long handler = JobDriver.this.handlerManager.getAllocatedEvaluatorHandler();
+ if (0 == handler) {
+ throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
}
+ final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
+ this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
+ allocatedEvaluatorBridges.putIfAbsent(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
+ NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(
+ handler, allocatedEvaluatorBridge, this.interopLogger, this.nameServerInfo, eval.getId());
+ LOG.log(Level.FINE, "End of JobDriver.Allocated Evaluator: {0}, time: {1}",
+ new Object[] {eval.getId(), dateFormat.format(new Date())});
}
private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) {
try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
- synchronized (JobDriver.this) {
- LOG.log(Level.SEVERE, "FailedEvaluator", eval);
- for (final FailedContext failedContext : eval.getFailedContextList()) {
- final String failedContextId = failedContext.getId();
- LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
- JobDriver.this.contexts.remove(failedContextId);
- }
- String message = "Evaluator " + eval.getId() + " failed with message: "
- + eval.getEvaluatorException().getMessage();
- JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
+ LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+ for (final FailedContext failedContext : eval.getFailedContextList()) {
+ final String failedContextId = failedContext.getId();
+ LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
+ JobDriver.this.contexts.remove(failedContextId);
+ }
+ final String message = "Evaluator " + eval.getId() + " failed with message: "
+ + eval.getEvaluatorException().getMessage();
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
- if (isRestartFailed) {
- evaluatorFailedHandlerWaitForCLRBridgeSetup(
- JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed);
- } else {
- evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
- eval, isRestartFailed);
- }
+ if (isRestartFailed) {
+ evaluatorFailedHandlerWaitForCLRBridgeSetup(
+ JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed);
+ } else {
+ evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
+ eval, isRestartFailed);
}
}
}
@@ -347,10 +350,8 @@ public final class JobDriver {
@Override
public void onNext(final AllocatedEvaluator allocatedEvaluator) {
try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
- JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess());
- }
+ LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
+ JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess());
}
}
}
@@ -362,12 +363,10 @@ public final class JobDriver {
@Override
public void onNext(final ActiveContext context) {
try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
- new Object[]{context.getId()});
- JobDriver.this.contexts.put(context.getId(), context);
- JobDriver.this.submit(context);
- }
+ LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
+ new Object[]{context.getId()});
+ JobDriver.this.contexts.put(context.getId(), context);
+ JobDriver.this.submit(context);
}
}
}
@@ -733,9 +732,7 @@ public final class JobDriver {
NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(),
closedContextBridge);
}
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
+ JobDriver.this.contexts.remove(context.getId());
}
}
}
@@ -757,9 +754,7 @@ public final class JobDriver {
NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(),
failedContextBridge);
}
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
+ JobDriver.this.contexts.remove(context.getId());
final Optional<byte[]> err = context.getData();
if (err.isPresent()) {
JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index 72aa640..5df89e7 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -281,7 +281,8 @@ public final class YarnSubmissionHelper implements AutoCloseable {
launchCommand, this.resources, tokenProvider.getTokens());
this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
- LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId);
+ LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}, driver core: {1}",
+ new Object[] {this.applicationId, this.applicationSubmissionContext.getResource().getVirtualCores()});
if (LOG.isLoggable(Level.INFO)) {
LOG.log(Level.INFO, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));