You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2016/03/25 02:02:57 UTC
reef git commit: [REEF-1267] Implement
IActiveContext.SubmitContextAndService
Repository: reef
Updated Branches:
refs/heads/master 1680fe31d -> 9e0181653
[REEF-1267] Implement IActiveContext.SubmitContextAndService
This addressed the issue by
* Implements SubmitContextAndService on ActiveContext.
* Adds a LocalRuntime test to make sure that the SubmitContextAndService call works.
JIRA:
[REEF-1267](https://issues.apache.org/jira/browse/REEF-1267)
This closes #899
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/9e018165
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/9e018165
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/9e018165
Branch: refs/heads/master
Commit: 9e01816530b3d6aef5453da6f7838138d071e5d5
Parents: 1680fe3
Author: Andrew Chung <af...@gmail.com>
Authored: Wed Mar 23 13:45:08 2016 -0700
Committer: Julia Wang <ju...@apache.org>
Committed: Thu Mar 24 13:21:46 2016 -0700
----------------------------------------------------------------------
.../ActiveContextClr2Java.cpp | 18 +-
lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 +
.../Bridge/Clr2java/IActiveContextClr2Java.cs | 2 +
.../Bridge/Events/ActiveContext.cs | 5 +-
.../Functional/Bridge/TestCloseTask.cs | 1 +
.../Functional/Bridge/TestContextStack.cs | 278 +++++++++++++++----
.../reef/javabridge/ActiveContextBridge.java | 5 +-
.../common/driver/context/EvaluatorContext.java | 43 ++-
8 files changed, 270 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
index 51ff432..b3e8685 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/ActiveContextClr2Java.cpp
@@ -84,20 +84,28 @@ namespace Org {
}
void ActiveContextClr2Java::SubmitContext(String^ contextConfigStr) {
- ManagedLog::LOGGER->LogStart("ActiveContextClr2Java::SubmitContext");
+ SubmitContextAndService(contextConfigStr, nullptr);
+ }
+
+ void ActiveContextClr2Java::SubmitContextAndService(String^ contextConfigStr, String^ serviceConfigStr) {
JNIEnv *env = RetrieveEnv(_jvm);
jclass jclassActiveContext = env->GetObjectClass(_jobjectActiveContext);
- jmethodID jmidSubmitContext = env->GetMethodID(jclassActiveContext, "submitContextString", "(Ljava/lang/String;)V");
+ jmethodID jmidSubmitContext =
+ env->GetMethodID(jclassActiveContext, "submitContextStringAndServiceString", "(Ljava/lang/String;Ljava/lang/String;)V");
if (jmidSubmitContext == NULL) {
- ManagedLog::LOGGER->Log("jmidSubmitContext is NULL");
+ ManagedLog::LOGGER->Log("jmidSubmitContextStringAndServiceString is NULL");
return;
}
+
+ const jstring serviceConfigJavaStr =
+ serviceConfigStr == nullptr ? NULL : JavaStringFromManagedString(env, serviceConfigStr);
+
env->CallObjectMethod(
_jobjectActiveContext,
jmidSubmitContext,
- JavaStringFromManagedString(env, contextConfigStr));
- ManagedLog::LOGGER->LogStop("ActiveContextClr2Java::SubmitContext");
+ JavaStringFromManagedString(env, contextConfigStr),
+ serviceConfigJavaStr);
}
void ActiveContextClr2Java::OnError(String^ message) {
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 381df15..3254640 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -85,6 +85,7 @@ namespace Org {
!ActiveContextClr2Java();
virtual void SubmitTask(String^ taskConfigStr);
virtual void SubmitContext(String^ contextConfigStr);
+ virtual void SubmitContextAndService(String^ contextConfigStr, String^ serviceConfigStr);
virtual void Close();
virtual void OnError(String^ message);
virtual String^ GetParentId();
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
index 119b110..a3ebafe 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IActiveContextClr2Java.cs
@@ -26,6 +26,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
void SubmitTask(string taskConfigStr);
void SubmitContext(string contextConfigStr);
+
+ void SubmitContextAndService(string contextConfigStr, string serviceConfigStr);
void Close();
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
index 52c62e9..1d94004 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-using System;
using System.Runtime.Serialization;
using Org.Apache.REEF.Driver.Bridge.Clr2java;
using Org.Apache.REEF.Driver.Context;
@@ -87,7 +86,9 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
{
- throw new NotImplementedException();
+ var contextConfigString = _serializer.ToString(contextConfiguration);
+ var serviceConfigString = _serializer.ToString(serviceConfiguration);
+ Clr2Java.SubmitContextAndService(contextConfigString, serviceConfigString);
}
public void SendMessage(byte[] message)
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
index 3bf553b..aeb0917 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -42,6 +42,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
/// <summary>
/// This test is to close a running task from driver
/// </summary>
+ [Collection("FunctionalTests")]
public sealed class TestCloseTask : ReefFunctionalTest
{
private static readonly Logger Logger = Logger.GetLogger(typeof(TestCloseTask));
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
index f756a36..e21065c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
@@ -16,6 +16,8 @@
// under the License.
using System;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Services;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
@@ -55,55 +57,140 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
{
string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
CleanUp(testFolder);
- TestRun(DriverConfigurations(), typeof(ContextStackHandlers), 1, "testContextStack", "local", testFolder);
+ TestRun(DriverConfigurations(GenericType<ActiveContextSubmitContextHandler>.Class),
+ typeof(ContextStackHandlers), 1, "testContextStack", "local", testFolder);
ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
ValidateMessageSuccessfullyLoggedForDriver(TaskValidationMessage, testFolder);
ValidateMessageSuccessfullyLoggedForDriver(ClosedContextValidationMessage, testFolder);
CleanUp(testFolder);
}
- public IConfiguration DriverConfigurations()
+ /// <summary>
+ /// Does a simple test of whether a context can be submitted on top of another context
+ /// using SubmitContextAndService.
+ /// </summary>
+ [Fact]
+ public void TestContextStackingWithServiceOnLocalRuntime()
{
- var helloDriverConfiguration = DriverConfiguration.ConfigurationModule
- .Set(DriverConfiguration.OnDriverStarted, GenericType<ContextStackHandlers>.Class)
- .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ContextStackHandlers>.Class)
- .Set(DriverConfiguration.OnContextActive, GenericType<ContextStackHandlers>.Class)
- .Set(DriverConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class)
- .Set(DriverConfiguration.OnTaskCompleted, GenericType<ContextStackHandlers>.Class)
- .Set(DriverConfiguration.OnContextClosed, GenericType<ContextStackHandlers>.Class)
- .Build();
-
- return TangFactory.GetTang().NewConfigurationBuilder(helloDriverConfiguration).Build();
+ string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+ CleanUp(testFolder);
+ TestRun(DriverConfigurations(GenericType<ActiveContextSubmitContextAndServiceHandler>.Class),
+ typeof(ContextStackHandlers), 1, "testContextAndServiceStack", "local", testFolder);
+ ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
+ ValidateMessageSuccessfullyLoggedForDriver(TaskValidationMessage, testFolder);
+ ValidateMessageSuccessfullyLoggedForDriver(ClosedContextValidationMessage, testFolder);
+ CleanUp(testFolder);
}
- private sealed class ContextStackHandlers :
- IObserver<IDriverStarted>,
- IObserver<IAllocatedEvaluator>,
- IObserver<IActiveContext>,
- IObserver<ICompletedTask>,
- IObserver<IClosedContext>
+ public IConfiguration DriverConfigurations<T>(GenericType<T> activeContextHandlerType) where T : IObserver<IActiveContext>
{
- private readonly IEvaluatorRequestor _requestor;
- private IAllocatedEvaluator _evaluator;
- private bool _contextTwoClosed = false;
+ return TangFactory.GetTang().NewConfigurationBuilder(
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<ContextStackHandlers>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ContextStackHandlers>.Class)
+ .Set(DriverConfiguration.OnContextActive, activeContextHandlerType)
+ .Set(DriverConfiguration.OnTaskMessage, GenericType<HelloTaskMessageHandler>.Class)
+ .Set(DriverConfiguration.OnTaskCompleted, GenericType<ContextStackHandlers>.Class)
+ .Set(DriverConfiguration.OnContextClosed, GenericType<ContextStackHandlers>.Class)
+ .Build())
+ .Build();
+ }
+ /// <summary>
+ /// ActiveContext Handler that stacks 2 contexts and submits a Task on the second context.
+ /// </summary>
+ private sealed class ActiveContextSubmitContextHandler : IObserver<IActiveContext>
+ {
[Inject]
- private ContextStackHandlers(IEvaluatorRequestor evaluatorRequestor)
+ private ActiveContextSubmitContextHandler()
{
- _requestor = evaluatorRequestor;
}
- public void OnNext(IDriverStarted value)
+ public void OnNext(IActiveContext value)
{
- _requestor.Submit(_requestor.NewBuilder().Build());
+ Logger.Log(Level.Verbose, "ContextId: " + value.Id);
+ switch (value.Id)
+ {
+ case ContextOneId:
+ var contextConfig =
+ Common.Context.ContextConfiguration.ConfigurationModule.Set(
+ Common.Context.ContextConfiguration.Identifier, ContextTwoId)
+ .Build();
+ var stackingContextConfig =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<IInjectableInterface>.Class,
+ GenericType<InjectableInterfaceImpl>.Class)
+ .Build();
+
+ Assert.False(value.ParentId.IsPresent());
+
+ value.SubmitContext(Configurations.Merge(stackingContextConfig, contextConfig));
+ break;
+ case ContextTwoId:
+ Assert.True(value.ParentId.IsPresent());
+ Assert.Equal(value.ParentId.Value, ContextOneId);
+
+ value.SubmitTask(
+ TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, "contextStackTestTask")
+ .Set(TaskConfiguration.Task, GenericType<TestContextStackTask>.Class)
+ .Build());
+ break;
+ default:
+ throw new Exception("Unexpected ContextId: " + value.Id);
+ }
}
- public void OnNext(IAllocatedEvaluator value)
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ /// <summary>
+ /// Context Start Handler that invokes Start on the injected TestService.
+ /// </summary>
+ private sealed class TestContextStackContextStartHandler : IObserver<IContextStart>
+ {
+ private readonly TestService _service;
+
+ [Inject]
+ private TestContextStackContextStartHandler(TestService service)
+ {
+ _service = service;
+ }
+
+ public void OnNext(IContextStart value)
+ {
+ _service.Start();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ /// <summary>
+ /// ActiveContext Handler that stacks 2 contexts.
+ /// Primarily used to test out the functionality of SubmitContextAndService.
+ /// The ActiveContext Handler starts the TestService with a ContextStartHandler on the second Context.
+ /// </summary>
+ private sealed class ActiveContextSubmitContextAndServiceHandler : IObserver<IActiveContext>
+ {
+ [Inject]
+ private ActiveContextSubmitContextAndServiceHandler()
{
- value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule
- .Set(Common.Context.ContextConfiguration.Identifier, ContextOneId)
- .Build());
- _evaluator = value;
}
public void OnNext(IActiveContext value)
@@ -113,9 +200,11 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
{
case ContextOneId:
var contextConfig =
- Common.Context.ContextConfiguration.ConfigurationModule.Set(
- Common.Context.ContextConfiguration.Identifier, ContextTwoId)
+ Common.Context.ContextConfiguration.ConfigurationModule
+ .Set(Common.Context.ContextConfiguration.Identifier, ContextTwoId)
+ .Set(Common.Context.ContextConfiguration.OnContextStart, GenericType<TestContextStackContextStartHandler>.Class)
.Build();
+
var stackingContextConfig =
TangFactory.GetTang()
.NewConfigurationBuilder()
@@ -125,15 +214,22 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
Assert.False(value.ParentId.IsPresent());
- value.SubmitContext(Configurations.Merge(stackingContextConfig, contextConfig));
+ var stackingContextServiceConfig =
+ ServiceConfiguration.ConfigurationModule
+ .Set(ServiceConfiguration.Services, GenericType<TestService>.Class)
+ .Build();
+
+ value.SubmitContextAndService(
+ Configurations.Merge(stackingContextConfig, contextConfig), stackingContextServiceConfig);
+
break;
case ContextTwoId:
Assert.True(value.ParentId.IsPresent());
Assert.Equal(value.ParentId.Value, ContextOneId);
value.SubmitTask(
- TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, "contextStackTestTask")
- .Set(TaskConfiguration.Task, GenericType<TestContextStackTask>.Class)
+ TaskConfiguration.ConfigurationModule.Set(TaskConfiguration.Identifier, "contextServiceStackTestTask")
+ .Set(TaskConfiguration.Task, GenericType<TestContextAndServiceStackTask>.Class)
.Build());
break;
default:
@@ -141,6 +237,68 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
}
}
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ /// <summary>
+ /// A simple Service class.
+ /// </summary>
+ private sealed class TestService
+ {
+ [Inject]
+ private TestService()
+ {
+ Started = false;
+ }
+
+ public void Start()
+ {
+ Started = true;
+ }
+
+ /// <summary>
+ /// Returns whether the Start function has been called or not.
+ /// </summary>
+ public bool Started { get; private set; }
+ }
+
+ /// <summary>
+ /// Basic handlers used to verify that Contexts are indeed stacked.
+ /// </summary>
+ private sealed class ContextStackHandlers :
+ IObserver<IDriverStarted>,
+ IObserver<IAllocatedEvaluator>,
+ IObserver<ICompletedTask>,
+ IObserver<IClosedContext>
+ {
+ private readonly IEvaluatorRequestor _requestor;
+
+ [Inject]
+ private ContextStackHandlers(IEvaluatorRequestor evaluatorRequestor)
+ {
+ _requestor = evaluatorRequestor;
+ }
+
+ public void OnNext(IDriverStarted value)
+ {
+ _requestor.Submit(_requestor.NewBuilder().Build());
+ }
+
+ public void OnNext(IAllocatedEvaluator value)
+ {
+ value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule
+ .Set(Common.Context.ContextConfiguration.Identifier, ContextOneId)
+ .Build());
+ }
+
public void OnNext(ICompletedTask value)
{
Logger.Log(Level.Info, TaskValidationMessage);
@@ -151,20 +309,10 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
{
Logger.Log(Level.Info, ClosedContextValidationMessage);
- if (_contextTwoClosed == false)
- {
- Assert.Equal(value.Id, ContextTwoId);
- Assert.True(value.ParentId.IsPresent());
- Assert.Equal(value.ParentId.Value, ContextOneId);
- Assert.Equal(value.ParentContext.Id, ContextOneId);
- _contextTwoClosed = true;
- }
- else
- {
- Assert.Equal(value.Id, ContextOneId);
- Assert.False(value.ParentId.IsPresent());
- Assert.Equal(value.ParentContext, null);
- }
+ Assert.Equal(value.Id, ContextTwoId);
+ Assert.True(value.ParentId.IsPresent());
+ Assert.Equal(value.ParentId.Value, ContextOneId);
+ Assert.Equal(value.ParentContext.Id, ContextOneId);
value.ParentContext.Dispose();
}
@@ -203,10 +351,42 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
}
}
+ /// <summary>
+ /// A Task to ensure that an object configured in the second context configuration
+ /// is properly injected.
+ /// </summary>
+ private sealed class TestContextAndServiceStackTask : ITask
+ {
+ [Inject]
+ private TestContextAndServiceStackTask(IInjectableInterface injectableInterface, TestService service)
+ {
+ Assert.NotNull(injectableInterface);
+ Assert.True(injectableInterface is InjectableInterfaceImpl);
+ Assert.NotNull(service);
+ Assert.True(service.Started);
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ return null;
+ }
+ }
+
+ /// <summary>
+ /// Empty interface to check whether Context configurations are
+ /// set correctly or not on context stacking.
+ /// </summary>
private interface IInjectableInterface
{
}
+ /// <summary>
+ /// An implementation of <see cref="IInjectableInterface"/>.
+ /// </summary>
private sealed class InjectableInterfaceImpl : IInjectableInterface
{
[Inject]
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
index 3fb82f2..7298a7a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
@@ -79,12 +79,13 @@ public final class ActiveContextBridge extends NativeBridge implements Identifia
((EvaluatorContext)jactiveContext).submitTask(taskConfigurationString);
}
- public void submitContextString(final String contextConfigurationString) {
+ public void submitContextStringAndServiceString(final String contextConfigurationString,
+ final String serviceConfigurationString) {
if (StringUtils.isEmpty(contextConfigurationString)) {
throw new RuntimeException("empty contextConfigurationString provided.");
}
- ((EvaluatorContext)jactiveContext).submitContext(contextConfigurationString);
+ ((EvaluatorContext)jactiveContext).submitContextAndService(contextConfigurationString, serviceConfigurationString);
}
public String getEvaluatorDescriptorString() {
http://git-wip-us.apache.org/repos/asf/reef/blob/9e018165/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
index dbca918..5525b0c 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
@@ -154,44 +154,37 @@ public final class EvaluatorContext implements ActiveContext {
}
public synchronized void submitContext(final String contextConf) {
- if (this.isClosed) {
- throw new RuntimeException("Active context already closed");
- }
-
- LOG.log(Level.FINEST, "Submit context: RunningEvaluator id[{0}] for context id[{1}]",
- new Object[]{getEvaluatorId(), getId()});
-
- final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
- EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
- .setAddContext(
- EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
- .setParentContextId(getId())
- .setContextConfiguration(contextConf)
- .build())
- .build();
-
- this.contextControlHandler.send(contextControlProto);
+ submitContextAndService(contextConf, Optional.<String>empty());
}
@Override
public synchronized void submitContextAndService(
final Configuration contextConfiguration, final Configuration serviceConfiguration) {
+ submitContextAndService(
+ this.configurationSerializer.toString(contextConfiguration),
+ this.configurationSerializer.toString(serviceConfiguration));
+ }
+
+ public synchronized void submitContextAndService(final String contextConf, final String serviceConf) {
+ submitContextAndService(contextConf, Optional.ofNullable(serviceConf));
+ }
+ public synchronized void submitContextAndService(final String contextConf, final Optional<String> serviceConf) {
if (this.isClosed) {
throw new RuntimeException("Active context already closed");
}
- LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
- new Object[]{getEvaluatorId(), getId()});
+ EvaluatorRuntimeProtocol.AddContextProto.Builder contextBuilder =
+ EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
+ .setParentContextId(getId()).setContextConfiguration(contextConf);
+
+ if (serviceConf.isPresent()) {
+ contextBuilder = contextBuilder.setServiceConfiguration(serviceConf.get());
+ }
final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
- .setAddContext(
- EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
- .setParentContextId(getId())
- .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
- .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration))
- .build())
+ .setAddContext(contextBuilder.build())
.build();
this.contextControlHandler.send(contextControlProto);