You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by se...@apache.org on 2015/10/30 20:15:19 UTC
incubator-reef git commit: [REEF-891] Remove the usage of lookup
tables in favor of passing C# handles through explicit classes
Repository: incubator-reef
Updated Branches:
refs/heads/master 8efa4d8e6 -> f5c9f48da
[REEF-891] Remove the usage of lookup tables in favor of passing C# handles through explicit classes
This addressed the issue by
* Adding the classes BridgeHandlerManager both in Java and .NET.
* Removing maps in Java and .NET.
* Modifying InterOp code to be more explicit with the two new classes.
JIRA:
[REEF-891](https://issues.apache.org/jira/browse/REEF-891)
Pull Request:
Closes #601
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/f5c9f48d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/f5c9f48d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/f5c9f48d
Branch: refs/heads/master
Commit: f5c9f48dad8cdb011c2d64a00164d38e094a49a5
Parents: 8efa4d8
Author: Andrew Chung <af...@gmail.com>
Authored: Thu Oct 29 13:33:10 2015 -0700
Committer: Beysim Sezgin <be...@hotmail.com>
Committed: Fri Oct 30 12:12:35 2015 -0700
----------------------------------------------------------------------
.../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 69 +++++--
.../Bridge/BridgeHandlerManager.cs | 61 ++++++
.../Bridge/ClrSystemHandlerWrapper.cs | 19 +-
.../Bridge/DriverBridge.cs | 40 ++--
lang/cs/Org.Apache.REEF.Driver/Constants.cs | 128 -------------
.../Org.Apache.REEF.Driver.csproj | 1 +
.../reef/javabridge/BridgeHandlerManager.java | 186 +++++++++++++++++++
.../apache/reef/javabridge/NativeInterop.java | 49 +----
.../generic/ClrHandlersInitializer.java | 3 +-
.../DriverRestartClrHandlersInitializer.java | 12 +-
.../DriverStartClrHandlersInitializer.java | 10 +-
.../reef/javabridge/generic/JobDriver.java | 154 ++++++---------
12 files changed, 403 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
index f57dab2..06bf5a3 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
@@ -57,6 +57,43 @@ static void MarshalErrorToJava (
env->SetObjectField(jerrorInfo, fieldID, jexceptionString);
}
+void populateJavaBridgeHandlerManager(JNIEnv * env, jobject jbridgeHandlerManager, BridgeHandlerManager^ bridgeHandlerManager) {
+ jclass cls = env->GetObjectClass(jbridgeHandlerManager);
+ jmethodID jsetAllocatedEvaluatorHandlerMid = env->GetMethodID(cls, "setAllocatedEvaluatorHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetAllocatedEvaluatorHandlerMid, bridgeHandlerManager->AllocatedEvaluatorHandler);
+ jmethodID jsetActiveContextHandlerMid = env->GetMethodID(cls, "setActiveContextHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetActiveContextHandlerMid, bridgeHandlerManager->ActiveContextHandler);
+ jmethodID jsetTaskMessageHandlerMid = env->GetMethodID(cls, "setTaskMessageHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetTaskMessageHandlerMid, bridgeHandlerManager->TaskMessageHandler);
+ jmethodID jsetFailedTaskHandlerMid = env->GetMethodID(cls, "setFailedTaskHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetFailedTaskHandlerMid, bridgeHandlerManager->FailedTaskHandler);
+ jmethodID jsetFailedEvaluatorHandlerMid = env->GetMethodID(cls, "setFailedEvaluatorHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetFailedEvaluatorHandlerMid, bridgeHandlerManager->FailedEvaluatorHandler);
+ jmethodID jsetHttpServerEventHandlerMid = env->GetMethodID(cls, "setHttpServerEventHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetHttpServerEventHandlerMid, bridgeHandlerManager->HttpServerHandler);
+ jmethodID jsetCompletedTaskHandlerMid = env->GetMethodID(cls, "setCompletedTaskHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetCompletedTaskHandlerMid, bridgeHandlerManager->CompletedTaskHandler);
+ jmethodID jsetRunningTaskHandlerMid = env->GetMethodID(cls, "setRunningTaskHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetRunningTaskHandlerMid, bridgeHandlerManager->RunningTaskHandler);
+ jmethodID jsetSuspendedTaskHandlerMid = env->GetMethodID(cls, "setSuspendedTaskHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetSuspendedTaskHandlerMid, bridgeHandlerManager->SuspendedTaskHandler);
+ jmethodID jsetCompletedEvaluatorHandlerMid = env->GetMethodID(cls, "setCompletedEvaluatorHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetCompletedEvaluatorHandlerMid, bridgeHandlerManager->CompletedEvaluatorHandler);
+ jmethodID jsetClosedContextHandlerMid = env->GetMethodID(cls, "setClosedContextHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetClosedContextHandlerMid, bridgeHandlerManager->ClosedContextHandler);
+ jmethodID jsetFailedContextHandlerMid = env->GetMethodID(cls, "setFailedContextHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetFailedContextHandlerMid, bridgeHandlerManager->FailedContextHandler);
+ jmethodID jsetContextMessageHandlerMid = env->GetMethodID(cls, "setContextMessageHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetContextMessageHandlerMid, bridgeHandlerManager->ContextMessageHandler);
+ jmethodID jsetDriverRestartActiveContextHandlerMid = env->GetMethodID(cls, "setDriverRestartActiveContextHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartActiveContextHandlerMid, bridgeHandlerManager->DriverRestartActiveContextHandler);
+ jmethodID jsetDriverRestartRunningTaskHandlerMid = env->GetMethodID(cls, "setDriverRestartRunningTaskHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartRunningTaskHandlerMid, bridgeHandlerManager->DriverRestartRunningTaskHandler);
+ jmethodID jsetDriverRestartCompletedHandlerMid = env->GetMethodID(cls, "setDriverRestartCompletedHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartCompletedHandlerMid, bridgeHandlerManager->DriverRestartCompletedHandler);
+ jmethodID jsetDriverRestartFailedEvaluatorHandlerMid = env->GetMethodID(cls, "setDriverRestartFailedEvaluatorHandler", "(J)V");
+ env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartFailedEvaluatorHandlerMid, bridgeHandlerManager->DriverRestartFailedEvaluatorHandler);
+}
// Loading Clr Assembly. Note that we do not use ManagerLogger in this method since the
// logger assembly needs to be loaded by this method before it can be used.
@@ -97,10 +134,10 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_loadClrAsse
/*
* Class: org_apache_reef_javabridge_NativeInterop
* Method: callClrSystemOnStartHandler
- * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J
+ * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V
*/
-JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler
-(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jevaluatorRequestorBridge) {
+JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler
+(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge) {
try {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler");
DateTime dt = DateTime::Now;
@@ -108,13 +145,12 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callC
String^ strPort = ManagedStringFromJavaString(env, httpServerPort);
EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge);
- array<unsigned long long>^ handlers = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge);
- return JavaLongArrayFromManagedLongArray(env, handlers);
+ BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge);
+ populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager);
}
catch (System::Exception^ ex) {
// we cannot get error back to java here since we don't have an object to call back (although we ideally should...)
ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler", ex);
- return NULL;
}
}
@@ -432,25 +468,24 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemCo
}
/*
-* Class: org_apache_reef_javabridge_NativeInterop
-* Method: callClrSystemOnRestartHandler
-* Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J
-*/
-JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler
-(JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) {
+ * Class: org_apache_reef_javabridge_NativeInterop
+ * Method: callClrSystemOnRestartHandler
+ * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler
+(JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) {
try {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler");
String^ strPort = ManagedStringFromJavaString(env, httpServerPort);
EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge);
DriverRestartedClr2Java^ driverRestartedBridge = gcnew DriverRestartedClr2Java(env, jdriverRestartedBridge);
- array<unsigned long long>^ handlers = ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(strPort, evaluatorRequestorBridge, driverRestartedBridge);
- return JavaLongArrayFromManagedLongArray(env, handlers);
+ BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(strPort, evaluatorRequestorBridge, driverRestartedBridge);
+ populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager);
}
catch (System::Exception^ ex) {
// we cannot get error back to java here since we don't have an object to call back (although we ideally should...)
ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler", ex);
- return NULL;
}
}
@@ -533,7 +568,7 @@ static JNINativeMethod methods[] = {
{ "clrBufferedLog", "(ILjava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrBufferedLog },
- { "callClrSystemOnStartHandler", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J",
+ { "callClrSystemOnStartHandler", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler },
{ "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V",
@@ -575,7 +610,7 @@ static JNINativeMethod methods[] = {
{ "clrSystemContextMessageHandlerOnNext", "(JLorg/apache/reef/javabridge/ContextMessageBridge;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemContextMessageHandlerOnNext },
- { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J",
+ { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler },
{ "clrSystemDriverRestartActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;)V",
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs
new file mode 100644
index 0000000..bf0b04c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Driver.Bridge
+{
+ /// <summary>
+ /// A class that holds all .NET handles for Java InterOp calls.
+ /// </summary>
+ public sealed class BridgeHandlerManager
+ {
+ public ulong AllocatedEvaluatorHandler { get; internal set; }
+
+ public ulong TaskMessageHandler { get; internal set; }
+
+ public ulong ActiveContextHandler { get; internal set; }
+
+ public ulong FailedTaskHandler { get; internal set; }
+
+ public ulong RunningTaskHandler { get; internal set; }
+
+ public ulong CompletedTaskHandler { get; internal set; }
+
+ public ulong SuspendedTaskHandler { get; internal set; }
+
+ public ulong FailedEvaluatorHandler { get; internal set; }
+
+ public ulong CompletedEvaluatorHandler { get; internal set; }
+
+ public ulong ClosedContextHandler { get; internal set; }
+
+ public ulong FailedContextHandler { get; internal set; }
+
+ public ulong ContextMessageHandler { get; internal set; }
+
+ public ulong DriverRestartActiveContextHandler { get; internal set; }
+
+ public ulong DriverRestartRunningTaskHandler { get; internal set; }
+
+ public ulong DriverRestartCompletedHandler { get; internal set; }
+
+ public ulong DriverRestartFailedEvaluatorHandler { get; internal set; }
+
+ public ulong HttpServerHandler { get; internal set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
index 5f7b61b..381aeba 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
@@ -230,20 +230,7 @@ namespace Org.Apache.REEF.Driver.Bridge
}
}
- //Deprecate, remove after both Java and C# code gets checked in
- public static ulong[] Call_ClrSystemStartHandler_OnStart(
- DateTime startTime,
- IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java)
- {
- IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java);
- using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart"))
- {
- LOGGER.Log(Level.Info, "*** Start time is " + startTime);
- return GetHandlers(null, evaluatorRequestor);
- }
- }
-
- public static ulong[] Call_ClrSystemStartHandler_OnStart(
+ public static BridgeHandlerManager Call_ClrSystemStartHandler_OnStart(
DateTime startTime,
string httpServerPort,
IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java)
@@ -260,7 +247,7 @@ namespace Org.Apache.REEF.Driver.Bridge
}
}
- public static ulong[] Call_ClrSystemRestartHandler_OnRestart(
+ public static BridgeHandlerManager Call_ClrSystemRestartHandler_OnRestart(
string httpServerPort,
IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java,
IDriverRestartedClr2Java driverRestartedClr2Java)
@@ -277,7 +264,7 @@ namespace Org.Apache.REEF.Driver.Bridge
}
}
- private static ulong[] GetHandlers(string httpServerPortNumber, IEvaluatorRequestor evaluatorRequestor)
+ private static BridgeHandlerManager GetHandlers(string httpServerPortNumber, IEvaluatorRequestor evaluatorRequestor)
{
var injector = BridgeConfigurationProvider.GetBridgeInjector(evaluatorRequestor);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
index 6e02bff..d770d3a 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
@@ -203,9 +203,9 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartFailedEvaluatorSubscriber = new ClrSystemHandler<IFailedEvaluator>();
}
- public ulong[] Subscribe()
+ public BridgeHandlerManager Subscribe()
{
- ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray();
+ var bridgeHandlerManager = new BridgeHandlerManager();
// subscribe to Allocated Evaluator
foreach (var handler in _allocatedEvaluatorHandlers)
@@ -213,7 +213,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_allocatedEvaluatorSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IAllocatedEvaluator handler: " + handler);
}
- handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber);
+ bridgeHandlerManager.AllocatedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber);
// subscribe to TaskMessage
foreach (var handler in _taskMessageHandlers)
@@ -221,7 +221,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_taskMessageSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to ITaskMessage handler: " + handler);
}
- handlers[Constants.Handlers[Constants.TaskMessageHandler]] = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber);
+ bridgeHandlerManager.TaskMessageHandler = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber);
// subscribe to Active Context
foreach (var handler in _activeContextHandlers)
@@ -229,7 +229,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_activeContextSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IActiveContext handler: " + handler);
}
- handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextSubscriber);
+ bridgeHandlerManager.ActiveContextHandler = ClrHandlerHelper.CreateHandler(_activeContextSubscriber);
// subscribe to Failed Task
foreach (var handler in _failedTaskHandlers)
@@ -237,7 +237,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_failedTaskSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IFailedTask handler: " + handler);
}
- handlers[Constants.Handlers[Constants.FailedTaskHandler]] = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber);
+ bridgeHandlerManager.FailedTaskHandler = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber);
// subscribe to Running Task
foreach (var handler in _runningTaskHandlers)
@@ -245,7 +245,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_runningTaskSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IRunningTask handler: " + handler);
}
- handlers[Constants.Handlers[Constants.RunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber);
+ bridgeHandlerManager.RunningTaskHandler = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber);
// subscribe to Completed Task
foreach (var handler in _completedTaskHandlers)
@@ -253,7 +253,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_completedTaskSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to ICompletedTask handler: " + handler);
}
- handlers[Constants.Handlers[Constants.CompletedTaskHandler]] = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber);
+ bridgeHandlerManager.CompletedTaskHandler = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber);
// subscribe to Suspended Task
foreach (var handler in _suspendedTaskHandlers)
@@ -261,7 +261,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_suspendedTaskSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to ISuspendedTask handler: " + handler);
}
- handlers[Constants.Handlers[Constants.SuspendedTaskHandler]] = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber);
+ bridgeHandlerManager.SuspendedTaskHandler = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber);
// subscribe to Failed Evaluator
foreach (var handler in _failedEvaluatorHandlers)
@@ -269,7 +269,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_failedEvaluatorSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IFailedEvaluator handler: " + handler);
}
- handlers[Constants.Handlers[Constants.FailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber);
+ bridgeHandlerManager.FailedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber);
// subscribe to Completed Evaluator
foreach (var handler in _completedEvaluatorHandlers)
@@ -277,7 +277,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_completedEvaluatorSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to ICompletedEvaluator handler: " + handler);
}
- handlers[Constants.Handlers[Constants.CompletedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber);
+ bridgeHandlerManager.CompletedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber);
// subscribe to Closed Context
foreach (var handler in _closedContextHandlers)
@@ -285,7 +285,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_closedContextSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IClosedContext handler: " + handler);
}
- handlers[Constants.Handlers[Constants.ClosedContextHandler]] = ClrHandlerHelper.CreateHandler(_closedContextSubscriber);
+ bridgeHandlerManager.ClosedContextHandler = ClrHandlerHelper.CreateHandler(_closedContextSubscriber);
// subscribe to Failed Context
foreach (var handler in _failedContextHandlers)
@@ -293,7 +293,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_failedContextSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IFailedContext handler: " + handler);
}
- handlers[Constants.Handlers[Constants.FailedContextHandler]] = ClrHandlerHelper.CreateHandler(_failedContextSubscriber);
+ bridgeHandlerManager.FailedContextHandler = ClrHandlerHelper.CreateHandler(_failedContextSubscriber);
// subscribe to Context Message
foreach (var handler in _contextMessageHandlers)
@@ -301,7 +301,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_contextMessageSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to IContextMesage handler: " + handler);
}
- handlers[Constants.Handlers[Constants.ContextMessageHandler]] = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber);
+ bridgeHandlerManager.ContextMessageHandler = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber);
// subscribe to Active Context received during driver restart
foreach (var handler in _driverRestartActiveContextHandlers)
@@ -309,7 +309,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartActiveContextSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to handler for IActiveContext received during driver restart: " + handler);
}
- handlers[Constants.Handlers[Constants.DriverRestartActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber);
+ bridgeHandlerManager.DriverRestartActiveContextHandler = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber);
// subscribe to Running Task received during driver restart
foreach (var handler in _driverRestartRunningTaskHandlers)
@@ -317,7 +317,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartRunningTaskSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to handler for IRunningTask received during driver restart: " + handler);
}
- handlers[Constants.Handlers[Constants.DriverRestartRunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber);
+ bridgeHandlerManager.DriverRestartRunningTaskHandler = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber);
// subscribe to Restart Completed received during driver restart
foreach (var handler in _driverRestartCompletedHandlers)
@@ -325,7 +325,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartCompletedSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to handler for IRestartCompleted received during driver restart: " + handler);
}
- handlers[Constants.Handlers[Constants.DriverRestartCompletedHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartCompletedSubscriber);
+ bridgeHandlerManager.DriverRestartCompletedHandler = ClrHandlerHelper.CreateHandler(_driverRestartCompletedSubscriber);
// subscribe to Failed Evaluator received during driver restart
foreach (var handler in _driverRestartFailedEvaluatorHandlers)
@@ -333,14 +333,14 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartFailedEvaluatorSubscriber.Subscribe(handler);
_logger.Log(Level.Verbose, "subscribed to handler for IFailedEvaluator received during driver restart: " + handler);
}
- handlers[Constants.Handlers[Constants.DriverRestartFailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartFailedEvaluatorSubscriber);
+ bridgeHandlerManager.DriverRestartFailedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_driverRestartFailedEvaluatorSubscriber);
// subscribe to Http message
_httpServerEventSubscriber.Subscribe(_httpServerHandler);
_logger.Log(Level.Verbose, "subscribed to IHttpMessage handler :" + _httpServerHandler);
- handlers[Constants.Handlers[Constants.HttpServerHandler]] = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber);
+ bridgeHandlerManager.HttpServerHandler = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber);
- return handlers;
+ return bridgeHandlerManager;
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/cs/Org.Apache.REEF.Driver/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Constants.cs b/lang/cs/Org.Apache.REEF.Driver/Constants.cs
index ef8b87d..aea8164 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Constants.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Constants.cs
@@ -18,7 +18,6 @@
*/
using System;
-using System.Collections.Generic;
namespace Org.Apache.REEF.Driver
{
@@ -51,101 +50,6 @@ namespace Org.Apache.REEF.Driver
/// </summary>
public const int DefaultMemoryGranularity = 1024;
- /// <summary>
- /// The number of handlers total. Tightly coupled with Java.
- /// </summary>
- public const int HandlersNumber = 18;
-
- /// <summary>
- /// The name for EvaluatorRequestorHandler. Tightly coupled with Java.
- /// </summary>
- public const string EvaluatorRequestorHandler = "EvaluatorRequestor";
-
- /// <summary>
- /// The name for AllocatedEvaluatorHandler. Tightly coupled with Java.
- /// </summary>
- public const string AllocatedEvaluatorHandler = "AllocatedEvaluator";
-
- /// <summary>
- /// The name for CompletedEvaluatorHandler. Tightly coupled with Java.
- /// </summary>
- public const string CompletedEvaluatorHandler = "CompletedEvaluator";
-
- /// <summary>
- /// The name for ActiveContextHandler. Tightly coupled with Java.
- /// </summary>
- public const string ActiveContextHandler = "ActiveContext";
-
- /// <summary>
- /// The name for ClosedContextHandler. Tightly coupled with Java.
- /// </summary>
- public const string ClosedContextHandler = "ClosedContext";
-
- /// <summary>
- /// The name for FailedContextHandler. Tightly coupled with Java.
- /// </summary>
- public const string FailedContextHandler = "FailedContext";
-
- /// <summary>
- /// The name for ContextMessageHandler. Tightly coupled with Java.
- /// </summary>
- public const string ContextMessageHandler = "ContextMessage";
-
- /// <summary>
- /// The name for TaskMessageHandler. Tightly coupled with Java.
- /// </summary>
- public const string TaskMessageHandler = "TaskMessage";
-
- /// <summary>
- /// The name for FailedTaskHandler. Tightly coupled with Java.
- /// </summary>
- public const string FailedTaskHandler = "FailedTask";
-
- /// <summary>
- /// The name for RunningTaskHandler. Tightly coupled with Java.
- /// </summary>
- public const string RunningTaskHandler = "RunningTask";
-
- /// <summary>
- /// The name for FailedEvaluatorHandler. Tightly coupled with Java.
- /// </summary>
- public const string FailedEvaluatorHandler = "FailedEvaluator";
-
- /// <summary>
- /// The name for CompletedTaskHandler. Tightly coupled with Java.
- /// </summary>
- public const string CompletedTaskHandler = "CompletedTask";
-
- /// <summary>
- /// The name for SuspendedTaskHandler. Tightly coupled with Java.
- /// </summary>
- public const string SuspendedTaskHandler = "SuspendedTask";
-
- /// <summary>
- /// The name for HttpServerHandler. Tightly coupled with Java.
- /// </summary>
- public const string HttpServerHandler = "HttpServerHandler";
-
- /// <summary>
- /// The name for DriverRestartActiveContextHandler. Tightly coupled with Java.
- /// </summary>
- public const string DriverRestartActiveContextHandler = "DriverRestartActiveContext";
-
- /// <summary>
- /// The name for DriverRestartRunningTaskHandler. Tightly coupled with Java.
- /// </summary>
- public const string DriverRestartRunningTaskHandler = "DriverRestartRunningTask";
-
- /// <summary>
- /// The name for DriverRestartCompletedHandler. Tightly coupled with Java.
- /// </summary>
- public const string DriverRestartCompletedHandler = "DriverRestartCompleted";
-
- /// <summary>
- /// The name for DriverRestartFailedEvaluatorHandler. Tightly coupled with Java
- /// </summary>
- public const string DriverRestartFailedEvaluatorHandler = "DriverRestartFailedEvaluator";
-
[Obsolete(message:"Use REEFFileNames instead.")]
public const string DriverBridgeConfiguration = Common.Constants.ClrBridgeRuntimeConfiguration;
@@ -184,37 +88,5 @@ namespace Org.Apache.REEF.Driver
/// Configuration for Java verbose logging.
/// </summary>
public const string JavaVerboseLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config";
-
- /// <summary>
- /// A dictionary of handler constants to handler descriptors.
- /// </summary>
- public static Dictionary<string, int> Handlers
- {
- get
- {
- return
- new Dictionary<string, int>()
- {
- { EvaluatorRequestorHandler, 0 },
- { AllocatedEvaluatorHandler, 1 },
- { ActiveContextHandler, 2 },
- { TaskMessageHandler, 3 },
- { FailedTaskHandler, 4 },
- { FailedEvaluatorHandler, 5 },
- { HttpServerHandler, 6 },
- { CompletedTaskHandler, 7 },
- { RunningTaskHandler, 8 },
- { SuspendedTaskHandler, 9 },
- { CompletedEvaluatorHandler, 10 },
- { ClosedContextHandler, 11 },
- { FailedContextHandler, 12 },
- { ContextMessageHandler, 13 },
- { DriverRestartActiveContextHandler, 14 },
- { DriverRestartRunningTaskHandler, 15 },
- { DriverRestartCompletedHandler, 16 },
- { DriverRestartFailedEvaluatorHandler, 17 }
- };
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index 9fb3613..1316cb6 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -39,6 +39,7 @@ under the License.
</ItemGroup>
<ItemGroup>
<Compile Include="Bridge\BridgeConfigurationProvider.cs" />
+ <Compile Include="Bridge\BridgeHandlerManager.cs" />
<Compile Include="Bridge\BridgeLogger.cs" />
<Compile Include="Bridge\Clr2java\IActiveContextClr2Java.cs" />
<Compile Include="Bridge\Clr2java\IAllocatedEvaluaotrClr2Java.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java
new file mode 100644
index 0000000..ce04454
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * A class that holds all handles to the .NET side.
+ * USED BY UNMANAGED CODE! PLEASE DO NOT CHANGE ANY FUNCTION SIGNATURES
+ * UNLESS YOU KNOW WHAT YOU ARE DOING!
+ */
+@Private
+public final class BridgeHandlerManager {
+ private long allocatedEvaluatorHandler = 0;
+ private long activeContextHandler = 0;
+ private long taskMessageHandler = 0;
+ private long failedTaskHandler = 0;
+ private long failedEvaluatorHandler = 0;
+ private long httpServerEventHandler = 0;
+ private long completedTaskHandler = 0;
+ private long runningTaskHandler = 0;
+ private long suspendedTaskHandler = 0;
+ private long completedEvaluatorHandler = 0;
+ private long closedContextHandler = 0;
+ private long failedContextHandler = 0;
+ private long contextMessageHandler = 0;
+ private long driverRestartActiveContextHandler = 0;
+ private long driverRestartRunningTaskHandler = 0;
+ private long driverRestartCompletedHandler = 0;
+ private long driverRestartFailedEvaluatorHandler = 0;
+
+ public BridgeHandlerManager() {
+ }
+
+ public long getAllocatedEvaluatorHandler() {
+ return allocatedEvaluatorHandler;
+ }
+
+ public void setAllocatedEvaluatorHandler(final long allocatedEvaluatorHandler) {
+ this.allocatedEvaluatorHandler = allocatedEvaluatorHandler;
+ }
+
+ public long getActiveContextHandler() {
+ return activeContextHandler;
+ }
+
+ public void setActiveContextHandler(final long activeContextHandler) {
+ this.activeContextHandler = activeContextHandler;
+ }
+
+ public long getTaskMessageHandler() {
+ return taskMessageHandler;
+ }
+
+ public void setTaskMessageHandler(final long taskMessageHandler) {
+ this.taskMessageHandler = taskMessageHandler;
+ }
+
+ public long getFailedTaskHandler() {
+ return failedTaskHandler;
+ }
+
+ public void setFailedTaskHandler(final long failedTaskHandler) {
+ this.failedTaskHandler = failedTaskHandler;
+ }
+
+ public long getFailedEvaluatorHandler() {
+ return failedEvaluatorHandler;
+ }
+
+ public void setFailedEvaluatorHandler(final long failedEvaluatorHandler) {
+ this.failedEvaluatorHandler = failedEvaluatorHandler;
+ }
+
+ public long getHttpServerEventHandler() {
+ return httpServerEventHandler;
+ }
+
+ public void setHttpServerEventHandler(final long httpServerEventHandler) {
+ this.httpServerEventHandler = httpServerEventHandler;
+ }
+
+ public long getCompletedTaskHandler() {
+ return completedTaskHandler;
+ }
+
+ public void setCompletedTaskHandler(final long completedTaskHandler) {
+ this.completedTaskHandler = completedTaskHandler;
+ }
+
+ public long getRunningTaskHandler() {
+ return runningTaskHandler;
+ }
+
+ public void setRunningTaskHandler(final long runningTaskHandler) {
+ this.runningTaskHandler = runningTaskHandler;
+ }
+
+ public long getSuspendedTaskHandler() {
+ return suspendedTaskHandler;
+ }
+
+ public void setSuspendedTaskHandler(final long suspendedTaskHandler) {
+ this.suspendedTaskHandler = suspendedTaskHandler;
+ }
+
+ public long getCompletedEvaluatorHandler() {
+ return completedEvaluatorHandler;
+ }
+
+ public void setCompletedEvaluatorHandler(final long completedEvaluatorHandler) {
+ this.completedEvaluatorHandler = completedEvaluatorHandler;
+ }
+
+ public long getClosedContextHandler() {
+ return closedContextHandler;
+ }
+
+ public void setClosedContextHandler(final long closedContextHandler) {
+ this.closedContextHandler = closedContextHandler;
+ }
+
+ public long getFailedContextHandler() {
+ return failedContextHandler;
+ }
+
+ public void setFailedContextHandler(final long failedContextHandler) {
+ this.failedContextHandler = failedContextHandler;
+ }
+
+ public long getContextMessageHandler() {
+ return contextMessageHandler;
+ }
+
+ public void setContextMessageHandler(final long contextMessageHandler) {
+ this.contextMessageHandler = contextMessageHandler;
+ }
+
+ public long getDriverRestartActiveContextHandler() {
+ return driverRestartActiveContextHandler;
+ }
+
+ public void setDriverRestartActiveContextHandler(final long driverRestartActiveContextHandler) {
+ this.driverRestartActiveContextHandler = driverRestartActiveContextHandler;
+ }
+
+ public long getDriverRestartRunningTaskHandler() {
+ return driverRestartRunningTaskHandler;
+ }
+
+ public void setDriverRestartRunningTaskHandler(final long driverRestartRunningTaskHandler) {
+ this.driverRestartRunningTaskHandler = driverRestartRunningTaskHandler;
+ }
+
+ public long getDriverRestartCompletedHandler() {
+ return driverRestartCompletedHandler;
+ }
+
+ public void setDriverRestartCompletedHandler(final long driverRestartCompletedHandler) {
+ this.driverRestartCompletedHandler = driverRestartCompletedHandler;
+ }
+
+ public long getDriverRestartFailedEvaluatorHandler() {
+ return driverRestartFailedEvaluatorHandler;
+ }
+
+ public void setDriverRestartFailedEvaluatorHandler(final long driverRestartFailedEvaluatorHandler) {
+ this.driverRestartFailedEvaluatorHandler = driverRestartFailedEvaluatorHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
index 59a8d85..28a7faf 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -20,63 +20,21 @@ package org.apache.reef.javabridge;
import org.apache.reef.javabridge.generic.DriverRestartCompletedBridge;
-import java.util.HashMap;
-
/**
* Java interfaces of CLR/Java bridge.
* Implementations of the methods can be found at lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp.
*/
public final class NativeInterop {
- public static final String CLASS_HIERARCHY_FILENAME = "clrClassHierarchy.bin";
public static final String GLOBAL_LIBRARIES_FILENAME = "userSuppliedGlobalLibraries.txt";
- public static final String ALLOCATED_EVALUATOR_KEY = "AllocatedEvaluator";
- public static final String ACTIVE_CONTEXT_KEY = "ActiveContext";
- public static final String TASK_MESSAGE_KEY = "TaskMessage";
- public static final String FAILED_TASK_KEY = "FailedTask";
- public static final String FAILED_EVALUATOR_KEY = "FailedEvaluator";
- public static final String HTTP_SERVER_KEY = "HttpServerKey";
- public static final String COMPLETED_TASK_KEY = "CompletedTask";
- public static final String RUNNING_TASK_KEY = "RunningTask";
- public static final String SUSPENDED_TASK_KEY = "SuspendedTask";
- public static final String COMPLETED_EVALUATOR_KEY = "CompletedEvaluator";
- public static final String CLOSED_CONTEXT_KEY = "ClosedContext";
- public static final String FAILED_CONTEXT_KEY = "FailedContext";
- public static final String CONTEXT_MESSAGE_KEY = "ContextMessage";
- public static final String DRIVER_RESTART_ACTIVE_CONTEXT_KEY = "DriverRestartActiveContext";
- public static final String DRIVER_RESTART_RUNNING_TASK_KEY = "DriverRestartRunningTask";
- public static final String DRIVER_RESTART_COMPLETED_KEY = "DriverRestartCompleted";
- public static final String DRIVER_RESTART_FAILED_EVALUATOR_KEY = "DriverRestartFailedEvaluator";
- public static final HashMap<String, Integer> HANDLERS = new HashMap<String, Integer>() {
- {
- put(ALLOCATED_EVALUATOR_KEY, 1);
- put(ACTIVE_CONTEXT_KEY, 2);
- put(TASK_MESSAGE_KEY, 3);
- put(FAILED_TASK_KEY, 4);
- put(FAILED_EVALUATOR_KEY, 5);
- put(HTTP_SERVER_KEY, 6);
- put(COMPLETED_TASK_KEY, 7);
- put(RUNNING_TASK_KEY, 8);
- put(SUSPENDED_TASK_KEY, 9);
- put(COMPLETED_EVALUATOR_KEY, 10);
- put(CLOSED_CONTEXT_KEY, 11);
- put(FAILED_CONTEXT_KEY, 12);
- put(CONTEXT_MESSAGE_KEY, 13);
- put(DRIVER_RESTART_ACTIVE_CONTEXT_KEY, 14);
- put(DRIVER_RESTART_RUNNING_TASK_KEY, 15);
- put(DRIVER_RESTART_COMPLETED_KEY, 16);
- put(DRIVER_RESTART_FAILED_EVALUATOR_KEY, 17);
- }
- };
-
- public static final int N_HANDLERS = 18;
public static native void loadClrAssembly(final String filePath);
public static native void clrBufferedLog(final int level, final String message);
- public static native long[] callClrSystemOnStartHandler(
+ public static native void callClrSystemOnStartHandler(
final String dateTime,
final String httpServerPortNumber,
+ final BridgeHandlerManager bridgeHandlerManager,
final EvaluatorRequestorBridge javaEvaluatorRequestorBridge);
public static native void clrSystemAllocatedEvaluatorHandlerOnNext(
@@ -153,8 +111,9 @@ public final class NativeInterop {
final ContextMessageBridge contextMessageBridge
);
- public static native long[] callClrSystemOnRestartHandler(
+ public static native void callClrSystemOnRestartHandler(
final String httpServerPortNumber,
+ final BridgeHandlerManager bridgeHandlerManager,
final EvaluatorRequestorBridge javaEvaluatorRequestorBridge,
final DriverRestartedBridge driverRestartedBridge
);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
index 34968f9..a151fd1 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java
@@ -21,6 +21,7 @@ package org.apache.reef.javabridge.generic;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.javabridge.BridgeHandlerManager;
import org.apache.reef.javabridge.EvaluatorRequestorBridge;
/**
@@ -34,5 +35,5 @@ interface ClrHandlersInitializer {
/**
* Returns the set of CLR handles.
*/
- long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge);
+ BridgeHandlerManager getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
index d97a1cd..1da5347 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.javabridge.BridgeHandlerManager;
import org.apache.reef.javabridge.DriverRestartedBridge;
import org.apache.reef.javabridge.EvaluatorRequestorBridge;
import org.apache.reef.javabridge.NativeInterop;
@@ -41,9 +42,12 @@ final class DriverRestartClrHandlersInitializer implements ClrHandlersInitialize
}
@Override
- public long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge) {
- return NativeInterop.callClrSystemOnRestartHandler(
- portNumber,
- evaluatorRequestorBridge, new DriverRestartedBridge(driverRestarted));
+ public BridgeHandlerManager getClrHandlers(final String portNumber,
+ final EvaluatorRequestorBridge evaluatorRequestorBridge) {
+ final BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager();
+ NativeInterop.callClrSystemOnRestartHandler(portNumber, bridgeHandlerManager, evaluatorRequestorBridge,
+ new DriverRestartedBridge(driverRestarted));
+
+ return bridgeHandlerManager;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
index 117bfbd..765869c 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java
@@ -21,6 +21,7 @@ package org.apache.reef.javabridge.generic;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.javabridge.BridgeHandlerManager;
import org.apache.reef.javabridge.EvaluatorRequestorBridge;
import org.apache.reef.javabridge.NativeInterop;
import org.apache.reef.wake.time.event.StartTime;
@@ -40,7 +41,12 @@ final class DriverStartClrHandlersInitializer implements ClrHandlersInitializer
}
@Override
- public long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge) {
- return NativeInterop.callClrSystemOnStartHandler(startTime.toString(), portNumber, evaluatorRequestorBridge);
+ public BridgeHandlerManager getClrHandlers(final String portNumber,
+ final EvaluatorRequestorBridge evaluatorRequestorBridge) {
+ BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager();
+ NativeInterop.callClrSystemOnStartHandler(startTime.toString(), portNumber, bridgeHandlerManager,
+ evaluatorRequestorBridge);
+
+ return bridgeHandlerManager;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/f5c9f48d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index 36f576c..422cb62 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -120,24 +120,7 @@ public final class JobDriver {
*/
private final LoggingScopeFactory loggingScopeFactory;
- private long allocatedEvaluatorHandler = 0;
- private long activeContextHandler = 0;
- private long taskMessageHandler = 0;
- private long failedTaskHandler = 0;
- private long failedEvaluatorHandler = 0;
- private long httpServerEventHandler = 0;
- private long completedTaskHandler = 0;
- private long runningTaskHandler = 0;
- private long suspendedTaskHandler = 0;
- private long completedEvaluatorHandler = 0;
- private long closedContextHandler = 0;
- private long failedContextHandler = 0;
- private long contextMessageHandler = 0;
- private long driverRestartActiveContextHandler = 0;
- private long driverRestartRunningTaskHandler = 0;
- private long driverRestartCompletedHandler = 0;
- private long driverRestartFailedEvaluatorHandler = 0;
- private boolean clrBridgeSetup = false;
+ private BridgeHandlerManager handlerManager = null;
private boolean isRestarted = false;
// We are holding on to following on bridge side.
// Need to add references here so that GC does not collect them.
@@ -211,42 +194,13 @@ public final class JobDriver {
this.evaluatorRequestorBridge =
new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory);
- final long[] handlers = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge);
- if (handlers != null) {
- if (handlers.length != NativeInterop.N_HANDLERS) {
- throw new RuntimeException(
- String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers",
- String.valueOf(handlers.length),
- String.valueOf(NativeInterop.N_HANDLERS)));
- }
- this.allocatedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ALLOCATED_EVALUATOR_KEY)];
- this.activeContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ACTIVE_CONTEXT_KEY)];
- this.taskMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.TASK_MESSAGE_KEY)];
- this.failedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_TASK_KEY)];
- this.failedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_EVALUATOR_KEY)];
- this.httpServerEventHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.HTTP_SERVER_KEY)];
- this.completedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_TASK_KEY)];
- this.runningTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.RUNNING_TASK_KEY)];
- this.suspendedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.SUSPENDED_TASK_KEY)];
- this.completedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_EVALUATOR_KEY)];
- this.closedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CLOSED_CONTEXT_KEY)];
- this.failedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_CONTEXT_KEY)];
- this.contextMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CONTEXT_MESSAGE_KEY)];
- this.driverRestartActiveContextHandler =
- handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_ACTIVE_CONTEXT_KEY)];
- this.driverRestartRunningTaskHandler =
- handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_RUNNING_TASK_KEY)];
- this.driverRestartCompletedHandler =
- handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_COMPLETED_KEY)];
- this.driverRestartFailedEvaluatorHandler =
- handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_FAILED_EVALUATOR_KEY)];
- }
+ JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge);
try (final LoggingScope lp =
this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) {
final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
- NativeInterop.clrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge,
- this.interopLogger);
+ NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
+ httpServerEventBridge, this.interopLogger);
final String specList = httpServerEventBridge.getUriSpecification();
LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
if (specList != null) {
@@ -258,7 +212,6 @@ public final class JobDriver {
}
}
}
- this.clrBridgeSetup = true;
}
LOG.log(Level.INFO, "CLR Bridge setup.");
}
@@ -277,14 +230,14 @@ public final class JobDriver {
eval.setProcess(process);
LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
new Object[]{eval.getId(), JobDriver.this.contexts.size()});
- if (JobDriver.this.allocatedEvaluatorHandler == 0) {
+ if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) {
throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
}
final AllocatedEvaluatorBridge allocatedEvaluatorBridge =
this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo);
allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge);
- NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler,
- allocatedEvaluatorBridge, this.interopLogger);
+ NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(
+ JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger);
}
}
@@ -302,9 +255,11 @@ public final class JobDriver {
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
if (isRestartFailed) {
- evaluatorFailedHandlerWaitForCLRBridgeSetup(driverRestartFailedEvaluatorHandler, eval, isRestartFailed);
+ evaluatorFailedHandlerWaitForCLRBridgeSetup(
+ JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed);
} else {
- evaluatorFailedHandlerWaitForCLRBridgeSetup(failedEvaluatorHandler, eval, isRestartFailed);
+ evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(),
+ eval, isRestartFailed);
}
}
}
@@ -314,7 +269,7 @@ public final class JobDriver {
final FailedEvaluator eval,
final boolean isRestartFailed) {
if (handle == 0) {
- if (JobDriver.this.clrBridgeSetup) {
+ if (JobDriver.this.handlerManager != null) {
final String message = "No CLR FailedEvaluator handler was set, exiting now";
LOG.log(Level.WARNING, message);
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8));
@@ -322,7 +277,7 @@ public final class JobDriver {
clock.scheduleAlarm(0, new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
+ if (JobDriver.this.handlerManager != null) {
handleFailedEvaluatorInCLR(eval, isRestartFailed);
} else {
LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
@@ -344,9 +299,12 @@ public final class JobDriver {
JobDriver.this.isRestarted, loggingScopeFactory);
if (isRestartFailed) {
NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
- JobDriver.this.driverRestartFailedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger);
+ JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(),
+ failedEvaluatorBridge, JobDriver.this.interopLogger);
} else {
- NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge,
+ NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(
+ JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(),
+ failedEvaluatorBridge,
JobDriver.this.interopLogger);
}
@@ -365,12 +323,12 @@ public final class JobDriver {
private void submit(final ActiveContext context) {
try {
LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
- if (JobDriver.this.activeContextHandler == 0) {
+ if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) {
throw new RuntimeException("Active Context Handler not initialized by CLR.");
}
final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
- NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge,
- JobDriver.this.interopLogger);
+ NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(),
+ activeContextBridge, JobDriver.this.interopLogger);
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Fail to submit task to active context");
context.close();
@@ -427,13 +385,13 @@ public final class JobDriver {
}
LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
- if (JobDriver.this.completedTaskHandler == 0) {
+ if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) {
LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
} else {
LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
- NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge,
- JobDriver.this.interopLogger);
+ NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(),
+ completedTaskBridge, JobDriver.this.interopLogger);
}
}
}
@@ -488,12 +446,11 @@ public final class JobDriver {
final String requestString = httpSerializer.toString(avroHttpRequest);
final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET));
- //final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest);
try {
final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
- NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge,
- JobDriver.this.interopLogger);
+ NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(),
+ httpServerEventBridge, JobDriver.this.interopLogger);
final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
response.getWriter().println(responseBody);
LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
@@ -512,14 +469,14 @@ public final class JobDriver {
@Override
public void onNext(final FailedTask task) throws RuntimeException {
LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
- if (JobDriver.this.failedTaskHandler == 0) {
+ if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) {
LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
throw new RuntimeException("Failed Task Handler not initialized by CLR.");
}
try {
final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
- NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge,
- JobDriver.this.interopLogger);
+ NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(),
+ failedTaskBridge, JobDriver.this.interopLogger);
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
throw new RuntimeException(ex);
@@ -534,14 +491,14 @@ public final class JobDriver {
@Override
public void onNext(final RunningTask task) {
try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
- if (JobDriver.this.runningTaskHandler == 0) {
+ if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) {
LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
} else {
LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
try {
final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
- NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge,
- JobDriver.this.interopLogger);
+ NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(),
+ runningTaskBridge, JobDriver.this.interopLogger);
} catch (final Exception ex) {
LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
throw new RuntimeException(ex);
@@ -561,11 +518,11 @@ public final class JobDriver {
clock.scheduleAlarm(0, new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
- if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
+ if (JobDriver.this.handlerManager != null) {
+ if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) {
LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext(
- JobDriver.this.driverRestartRunningTaskHandler,
+ JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(),
new RunningTaskBridge(task, activeContextBridgeFactory));
} else {
LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " +
@@ -594,11 +551,11 @@ public final class JobDriver {
clock.scheduleAlarm(0, new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
- if (JobDriver.this.driverRestartActiveContextHandler != 0) {
+ if (JobDriver.this.handlerManager != null) {
+ if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) {
LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext(
- JobDriver.this.driverRestartActiveContextHandler,
+ JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(),
activeContextBridgeFactory.getActiveContextBridge(context));
} else {
LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " +
@@ -660,11 +617,12 @@ public final class JobDriver {
driverRestartCompleted.getCompletedTime());
try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
driverRestartCompleted.getCompletedTime().getTimeStamp())) {
- if (JobDriver.this.driverRestartCompletedHandler != 0) {
+ if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) {
LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(
- JobDriver.this.driverRestartCompletedHandler, new DriverRestartCompletedBridge(driverRestartCompleted));
+ JobDriver.this.handlerManager.getDriverRestartCompletedHandler(),
+ new DriverRestartCompletedBridge(driverRestartCompleted));
} else {
LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
}
@@ -696,11 +654,11 @@ public final class JobDriver {
final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8);
LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
//try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
- if (JobDriver.this.taskMessageHandler != 0) {
+ if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) {
final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
// if CLR implements the task message handler, handle the bytes in CLR handler
- NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(),
- taskMessageBridge, JobDriver.this.interopLogger);
+ NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(),
+ taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
}
//}
}
@@ -715,11 +673,12 @@ public final class JobDriver {
final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
LOG.log(Level.INFO, message);
try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
- if (JobDriver.this.suspendedTaskHandler != 0) {
+ if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) {
final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
// if CLR implements the suspended task handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
- NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
+ NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(),
+ suspendedTaskBridge);
}
JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
}
@@ -734,11 +693,12 @@ public final class JobDriver {
public void onNext(final CompletedEvaluator evaluator) {
LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
- if (JobDriver.this.completedEvaluatorHandler != 0) {
+ if (JobDriver.this.handlerManager.getCompletedEvaluatorHandler() != 0) {
final CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
// if CLR implements the completed evaluator handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
- NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge);
+ NativeInterop.clrSystemCompletedEvaluatorHandlerOnNext(
+ JobDriver.this.handlerManager.getCompletedEvaluatorHandler(), completedEvaluatorBridge);
allocatedEvaluatorBridges.remove(completedEvaluatorBridge.getId());
}
}
@@ -755,11 +715,12 @@ public final class JobDriver {
public void onNext(final ClosedContext context) {
LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
- if (JobDriver.this.closedContextHandler != 0) {
+ if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) {
final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
// if CLR implements the closed context handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
- NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
+ NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(),
+ closedContextBridge);
}
synchronized (JobDriver.this) {
JobDriver.this.contexts.remove(context.getId());
@@ -778,11 +739,12 @@ public final class JobDriver {
public void onNext(final FailedContext context) {
LOG.log(Level.SEVERE, "FailedContext", context);
try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
- if (JobDriver.this.failedContextHandler != 0) {
+ if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) {
final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
// if CLR implements the failed context handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
- NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);
+ NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(),
+ failedContextBridge);
}
synchronized (JobDriver.this) {
JobDriver.this.contexts.remove(context.getId());
@@ -804,11 +766,11 @@ public final class JobDriver {
LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
try (final LoggingScope ls =
loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) {
- if (JobDriver.this.contextMessageHandler != 0) {
+ if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) {
final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
// if CLR implements the context message handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
- NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler,
+ NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(),
contextMessageBridge);
}
}