You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/03/16 00:15:05 UTC

reef git commit: [REEF-1256] Implement bridge for IRunningTask.Dispose()

Repository: reef
Updated Branches:
  refs/heads/master 061f66277 -> 4c9c36852


[REEF-1256] Implement bridge for IRunningTask.Dispose()

  * Implement bridge method for closing a task
  * Add test case

JIRA:
  [REEF-1256](https://issues.apache.org/jira/browse/REEF-1256)

Pull Request:
  This closes #888


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4c9c3685
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4c9c3685
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4c9c3685

Branch: refs/heads/master
Commit: 4c9c368520fdec66f6c3c25359699f230cc9f23c
Parents: 061f662
Author: Julia Wang <ju...@microsoft.com>
Authored: Tue Mar 15 10:54:11 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Mar 15 15:55:05 2016 -0700

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h   |   1 +
 .../RunningTaskClr2Java.cpp                     |  26 +++
 .../Bridge/Clr2java/IRunningTaskClr2Java.cs     |   2 +
 .../Bridge/Events/RunningTask.cs                |   2 +
 .../Functional/Bridge/TestCloseTask.cs          | 209 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 .../reef/javabridge/RunningTaskBridge.java      |   8 +
 7 files changed, 249 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 3a2c60c..381df15 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -142,6 +142,7 @@ namespace Org {
                             virtual String^ GetId();
                             virtual void Send(array<byte>^ message);
                             virtual void Suspend(array<byte>^ message);
+                            virtual void Close(array<byte>^ message);
                         };
 
                         public ref class FailedEvaluatorClr2Java : public IFailedEvaluatorClr2Java {

http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
index 71d6451..d9e264d 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/RunningTaskClr2Java.cpp
@@ -109,6 +109,32 @@ namespace Org {
 						  ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Suspend");
 					  }
 
+					  void RunningTaskClr2Java::Close(array<byte>^ message) {
+						  ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Close");
+						  JNIEnv *env = RetrieveEnv(_jvm);
+						  jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask);
+						  jmethodID jmidClose = env->GetMethodID(jclassRunningTask, "close", "([B)V");
+
+						  if (jmidClose == NULL) {
+							  ManagedLog::LOGGER->Log("jmidClose is NULL");
+							  return;
+						  }
+
+						  if (message == nullptr) {
+							  env->CallObjectMethod(
+								  _jobjectRunningTask,
+								  jmidClose,
+								  NULL);
+						  }
+						  else {
+							  env->CallObjectMethod(
+								  _jobjectRunningTask,
+								  jmidClose,
+								  JavaByteArrayFromManagedByteArray(env, message));
+						  }
+						  ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Close");
+					  }
+
 					  void RunningTaskClr2Java::OnError(String^ message) {
 						  ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError");
 						  JNIEnv *env = RetrieveEnv(_jvm);

http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
index fde12db..656b57a 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IRunningTaskClr2Java.cs
@@ -29,5 +29,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
         void Send(byte[] message);
 
         void Suspend(byte[] message);
+
+        void Close(byte[] message);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
index e993b1c..dbf2d58 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/RunningTask.cs
@@ -70,10 +70,12 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
 
         public void Dispose(byte[] message)
         {
+            _runningTaskClr2Java.Close(message);
         }
 
         public void Dispose()
         {
+            Dispose(null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
new file mode 100644
index 0000000..9cc3cd3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Bridge
+{
+    /// <summary>
+    /// This test is to close a running task from driver
+    /// </summary>
+    public sealed class TestCloseTask : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(TestCloseTask));
+
+        private const string DisposeMessageFromDriver = "DisposeMessageFromDriver";
+        private const string NoMessage = "NO_MESSAGE";
+        private const string CompletedValidationMessage = "CompletedValidationmessage";
+
+        public TestCloseTask()
+        {
+            Init();
+        }
+
+        /// <summary>
+        /// This test is to close a running task over the bridge
+        /// </summary>
+        [Fact]
+        public void TestStopTaskOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1);
+            var messages = new List<string>();
+            messages.Add(DisposeMessageFromDriver);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is to close a running task over the bridge
+        /// </summary>
+        [Fact]
+        public void TestStopTaskOnLocalRuntimeWithNullMessage()
+        {
+            string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(NoMessage), typeof(CloseTaskTestDriver), 1, "testStopTask", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1);
+            var messages = new List<string>();
+            messages.Add("Control protobuf to stop task");
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Driver configuration for the test driver
+        /// </summary>
+        /// <returns></returns>
+        public IConfiguration DriverConfigurations(string taskCloseMessage)
+        {
+            var handlerConfig = DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, GenericType<CloseTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<CloseTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnContextActive, GenericType<CloseTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, GenericType<CloseTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, GenericType<CloseTaskTestDriver>.Class)
+                .Build();
+
+            var messageConfig = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindStringNamedParam<DisposeMessage>(taskCloseMessage)
+                .Build();
+
+            return Configurations.Merge(handlerConfig, messageConfig);
+        }
+
+        [NamedParameter("Message send with task close", "TaskDisposeMessage", NoMessage)]
+        private class DisposeMessage : Name<string> 
+        {
+        }
+
+        private sealed class CloseTaskTestDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IActiveContext>,
+            IObserver<ICompletedTask>,
+            IObserver<IRunningTask>           
+        {
+            private readonly IEvaluatorRequestor _requestor;
+            private int _contextNumber = 0;
+            private int _taskNumber = 0;
+            private string _disposeMessage;
+
+            [Inject]
+            private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor,
+                [Parameter(typeof(DisposeMessage))] string disposeMessage)
+            {
+                _requestor = evaluatorRequestor;
+                _disposeMessage = disposeMessage;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().SetNumber(1).Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitContext(
+                    ContextConfiguration.ConfigurationModule
+                        .Set(ContextConfiguration.Identifier, "ContextID" + _contextNumber++)
+                        .Build());
+            }
+
+            public void OnNext(IActiveContext value)
+            {
+                value.SubmitTask(GetTaskConfiguration());
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                // Log on task completion to signal a passed test.
+                Logger.Log(Level.Info, CompletedValidationMessage + "Task completed: " + value.Id);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                Logger.Log(Level.Info, "Task running: " + value.Id);
+                if (_disposeMessage.Equals(NoMessage))
+                {
+                    value.Dispose();
+                }
+                else
+                {
+                    value.Dispose(Encoding.UTF8.GetBytes(_disposeMessage));
+                }
+            }
+
+            private IConfiguration GetTaskConfiguration()
+            {
+                return TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, "TaskID" + _taskNumber++)
+                    .Set(TaskConfiguration.Task, GenericType<TestCloseTask.StopTestTask>.Class)
+                    .Build();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class StopTestTask : ITask
+        {
+            [Inject]
+            private StopTestTask()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                // TODO[REEF-1257]
+                Thread.Sleep(5 * 1000);
+                return null;
+            }
+
+            public void Dispose()
+            {
+                Logger.Log(Level.Info, "Task is disposed.");
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 921d862..0bc15e6 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -75,6 +75,7 @@ under the License.
   <ItemGroup>
     <Compile Include="Functional\Bridge\HelloSimpleEventHandlers.cs" />
     <Compile Include="Functional\Bridge\TestBridgeClient.cs" />
+    <Compile Include="Functional\Bridge\TestCloseTask.cs" />
     <Compile Include="Functional\Bridge\TestContextStack.cs" />
     <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" />
     <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/4c9c3685/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
index d307be7..e46b26a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -55,6 +55,14 @@ public final class RunningTaskBridge extends NativeBridge implements Identifiabl
     }
   }
 
+  public void close(final byte[] message) {
+    if (message != null) {
+      jrunningTask.close(message);
+    } else {
+      jrunningTask.close();
+    }
+  }
+
   public ActiveContextBridge getActiveContext() {
     return jactiveContext;
   }