You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/03/16 00:15:05 UTC
reef git commit: [REEF-1256] Implement bridge for
IRunningTask.Dispose()
Repository: reef
Updated Branches:
refs/heads/master 061f66277 -> 4c9c36852
[REEF-1256] Implement bridge for IRunningTask.Dispose()
* Implement bridge method for closing a task
* Add test case
JIRA:
[REEF-1256](https://issues.apache.org/jira/browse/REEF-1256)
Pull Request:
This closes #888
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4c9c3685
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4c9c3685
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4c9c3685
Branch: refs/heads/master
Commit: 4c9c368520fdec66f6c3c25359699f230cc9f23c
Parents: 061f662
Author: Julia Wang <ju...@microsoft.com>
Authored: Tue Mar 15 10:54:11 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Mar 15 15:55:05 2016 -0700
----------------------------------------------------------------------
lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 1 +
.../RunningTaskClr2Java.cpp | 26 +++
.../Bridge/Clr2java/IRunningTaskClr2Java.cs | 2 +
.../Bridge/Events/RunningTask.cs | 2 +
.../Functional/Bridge/TestCloseTask.cs | 209 +++++++++++++++++++
.../Org.Apache.REEF.Tests.csproj | 1 +
.../reef/javabridge/RunningTaskBridge.java | 8 +
7 files changed, 249 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 3a2c60c..381df15 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -142,6 +142,7 @@ namespace Org {
virtual String^ GetId();
virtual void Send(array<byte>^ message);
virtual void Suspend(array<byte>^ message);
+ virtual void Close(array<byte>^ message);
};
public ref class FailedEvaluatorClr2Java : public IFailedEvaluatorClr2Java {
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
index 71d6451..d9e264d 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
@@ -109,6 +109,32 @@ namespace Org {
ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Suspend");
}
+ void RunningTaskClr2Java::Close(array<byte>^ message) {
+ ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Close");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask);
+ jmethodID jmidClose = env->GetMethodID(jclassRunningTask, "close", "([B)V");
+
+ if (jmidClose == NULL) {
+ ManagedLog::LOGGER->Log("jmidClose is NULL");
+ return;
+ }
+
+ if (message == nullptr) {
+ env->CallObjectMethod(
+ _jobjectRunningTask,
+ jmidClose,
+ NULL);
+ }
+ else {
+ env->CallObjectMethod(
+ _jobjectRunningTask,
+ jmidClose,
+ JavaByteArrayFromManagedByteArray(env, message));
+ }
+ ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Close");
+ }
+
void RunningTaskClr2Java::OnError(String^ message) {
ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError");
JNIEnv *env = RetrieveEnv(_jvm);
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
index fde12db..656b57a 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
@@ -29,5 +29,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
void Send(byte[] message);
void Suspend(byte[] message);
+
+ void Close(byte[] message);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
index e993b1c..dbf2d58 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
@@ -70,10 +70,12 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
public void Dispose(byte[] message)
{
+ _runningTaskClr2Java.Close(message);
}
public void Dispose()
{
+ Dispose(null);
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
new file mode 100644
index 0000000..9cc3cd3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Bridge
+{
+ /// <summary>
+ /// This test is to close a running task from driver
+ /// </summary>
+ public sealed class TestCloseTask : ReefFunctionalTest
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(TestCloseTask));
+
+ private const string DisposeMessageFromDriver = "DisposeMessageFromDriver";
+ private const string NoMessage = "NO_MESSAGE";
+ private const string CompletedValidationMessage = "CompletedValidationmessage";
+
+ public TestCloseTask()
+ {
+ Init();
+ }
+
+ /// <summary>
+ /// This test is to close a running task over the bridge
+ /// </summary>
+ [Fact]
+ public void TestStopTaskOnLocalRuntime()
+ {
+ string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+ TestRun(DriverConfigurations(DisposeMessageFromDriver), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder);
+ ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+ ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1);
+ var messages = new List<string>();
+ messages.Add(DisposeMessageFromDriver);
+ ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1);
+ CleanUp(testFolder);
+ }
+
+ /// <summary>
+ /// This test is to close a running task over the bridge
+ /// </summary>
+ [Fact]
+ public void TestStopTaskOnLocalRuntimeWithNullMessage()
+ {
+ string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+ TestRun(DriverConfigurations(NoMessage), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder);
+ ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+ ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1);
+ var messages = new List<string>();
+ messages.Add("Control protobuf to stop task");
+ ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1);
+ CleanUp(testFolder);
+ }
+
+ /// <summary>
+ /// Driver configuration for the test driver
+ /// </summary>
+ /// <returns></returns>
+ public IConfiguration DriverConfigurations(string taskCloseMessage)
+ {
+ var handlerConfig = DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<CloseTaskTestDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<CloseTaskTestDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive, GenericType<CloseTaskTestDriver>.Class)
+ .Set(DriverConfiguration.OnTaskRunning, GenericType<CloseTaskTestDriver>.Class)
+ .Set(DriverConfiguration.OnTaskCompleted, GenericType<CloseTaskTestDriver>.Class)
+ .Build();
+
+ var messageConfig = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindStringNamedParam<DisposeMessage>(taskCloseMessage)
+ .Build();
+
+ return Configurations.Merge(handlerConfig, messageConfig);
+ }
+
+ [NamedParameter("Message send with task close", "TaskDisposeMessage", NoMessage)]
+ private class DisposeMessage : Name<string>
+ {
+ }
+
+ private sealed class CloseTaskTestDriver :
+ IObserver<IDriverStarted>,
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>,
+ IObserver<ICompletedTask>,
+ IObserver<IRunningTask>
+ {
+ private readonly IEvaluatorRequestor _requestor;
+ private int _contextNumber = 0;
+ private int _taskNumber = 0;
+ private string _disposeMessage;
+
+ [Inject]
+ private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor,
+ [Parameter(typeof(DisposeMessage))] string disposeMessage)
+ {
+ _requestor = evaluatorRequestor;
+ _disposeMessage = disposeMessage;
+ }
+
+ public void OnNext(IDriverStarted value)
+ {
+ _requestor.Submit(_requestor.NewBuilder().SetNumber(1).Build());
+ }
+
+ public void OnNext(IAllocatedEvaluator value)
+ {
+ value.SubmitContext(
+ ContextConfiguration.ConfigurationModule
+ .Set(ContextConfiguration.Identifier, "ContextID" + _contextNumber++)
+ .Build());
+ }
+
+ public void OnNext(IActiveContext value)
+ {
+ value.SubmitTask(GetTaskConfiguration());
+ }
+
+ public void OnNext(ICompletedTask value)
+ {
+ // Log on task completion to signal a passed test.
+ Logger.Log(Level.Info, CompletedValidationMessage + "Task completed: " + value.Id);
+ value.ActiveContext.Dispose();
+ }
+
+ public void OnNext(IRunningTask value)
+ {
+ Logger.Log(Level.Info, "Task running: " + value.Id);
+ if (_disposeMessage.Equals(NoMessage))
+ {
+ value.Dispose();
+ }
+ else
+ {
+ value.Dispose(Encoding.UTF8.GetBytes(_disposeMessage));
+ }
+ }
+
+ private IConfiguration GetTaskConfiguration()
+ {
+ return TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, "TaskID" + _taskNumber++)
+ .Set(TaskConfiguration.Task, GenericType<TestCloseTask.StopTestTask>.Class)
+ .Build();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private sealed class StopTestTask : ITask
+ {
+ [Inject]
+ private StopTestTask()
+ {
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ // TODO[REEF-1257]
+ Thread.Sleep(5 * 1000);
+ return null;
+ }
+
+ public void Dispose()
+ {
+ Logger.Log(Level.Info, "Task is disposed.");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 921d862..0bc15e6 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -75,6 +75,7 @@ under the License.
<ItemGroup>
<Compile Include="Functional\Bridge\HelloSimpleEventHandlers.cs" />
<Compile Include="Functional\Bridge\TestBridgeClient.cs" />
+ <Compile Include="Functional\Bridge\TestCloseTask.cs" />
<Compile Include="Functional\Bridge\TestContextStack.cs" />
<Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" />
<Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
index d307be7..e46b26a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -55,6 +55,14 @@ public final class RunningTaskBridge extends NativeBridge implements Identifiabl
}
}
+ public void close(final byte[] message) {
+ if (message != null) {
+ jrunningTask.close(message);
+ } else {
+ jrunningTask.close();
+ }
+ }
+
public ActiveContextBridge getActiveContext() {
return jactiveContext;
}