You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/03/24 01:23:44 UTC
reef git commit: [REEF-1260] Adding a sample and test for Context
Start handler
Repository: reef
Updated Branches:
refs/heads/master c27ea4220 -> 928fe3aac
[REEF-1260] Adding a sample and test for Context Start handler
JIRA:
[REEF-1260](https://issues.apache.org/jira/browse/REEF-1260)
Pull Request:
This closes #891
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/928fe3aa
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/928fe3aa
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/928fe3aa
Branch: refs/heads/master
Commit: 928fe3aacd957f1e45446199bdd75577fd43b5f5
Parents: c27ea42
Author: Julia Wang <ju...@apache.org>
Authored: Wed Mar 16 18:08:21 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Mar 23 17:05:05 2016 -0700
----------------------------------------------------------------------
.../FaultTolerant/TestContextStart.cs | 244 +++++++++++++++++++
.../Org.Apache.REEF.Tests.csproj | 1 +
2 files changed, 245 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/928fe3aa/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
new file mode 100644
index 0000000..6a421aa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
+{
+ /// <summary>
+ /// This test case servers as an example to put data downloading at part of the ContextStartHandler
+ /// </summary>
+ public class TestContextStart : ReefFunctionalTest
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(TestContextStart));
+ private const string StartedHandlerMessage = "Start Handler is called.";
+ private const string StartedMessage = "Do something started.";
+ private const string CompletedMessage = "Do something completed.";
+
+ public TestContextStart()
+ {
+ Init();
+ }
+
+ /// <summary>
+ /// This test case submit a context with a Context start handler and do something in the handler
+ /// </summary>
+ [Fact]
+ public void TestDosomethingOnContextStartOnLocalRuntime()
+ {
+ string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+ TestRun(DriverConfigurations(), typeof(ContextStartDriver), 1, "ContextStartDriver", "local", testFolder);
+ ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
+
+ var messages = new List<string>();
+ messages.Add(StartedMessage);
+ messages.Add(StartedHandlerMessage);
+ ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 2);
+ CleanUp(testFolder);
+ }
+
+ public IConfiguration DriverConfigurations()
+ {
+ return DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<ContextStartDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ContextStartDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive, GenericType<ContextStartDriver>.Class)
+ .Set(DriverConfiguration.OnTaskCompleted, GenericType<ContextStartDriver>.Class)
+ .Set(DriverConfiguration.OnContextClosed, GenericType<ContextStartDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorCompleted, GenericType<ContextStartDriver>.Class)
+ .Build();
+ }
+
+ private sealed class ContextStartDriver :
+ IObserver<IDriverStarted>,
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>,
+ IObserver<ICompletedTask>,
+ IObserver<IClosedContext>,
+ IObserver<ICompletedEvaluator>
+ {
+ private readonly IEvaluatorRequestor _requestor;
+ private const string ContextId1 = "ContextID1";
+ private const string ContextId2 = "ContextID2";
+ private const string TaskId = "TaskID";
+ private bool _first = true;
+
+ [Inject]
+ private ContextStartDriver(IEvaluatorRequestor evaluatorRequestor)
+ {
+ _requestor = evaluatorRequestor;
+ }
+
+ public void OnNext(IDriverStarted value)
+ {
+ _requestor.Submit(_requestor.NewBuilder().Build());
+ }
+
+ public void OnNext(IActiveContext value)
+ {
+ Logger.Log(Level.Info, "IActiveContext: " + value.Id);
+
+ if (_first)
+ {
+ Assert.Equal(value.Id, ContextId1);
+ _first = false;
+ value.SubmitContext(
+ ContextConfiguration.ConfigurationModule
+ .Set(ContextConfiguration.Identifier, ContextId2)
+ .Set(ContextConfiguration.OnContextStart, GenericType<ContextStartHandler>.Class)
+ .Build());
+ }
+ else
+ {
+ Assert.Equal(value.Id, ContextId2);
+ var c = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, TaskId)
+ .Set(TaskConfiguration.Task, GenericType<TestTask>.Class)
+ .Build();
+ value.SubmitTask(c);
+ }
+ }
+
+ public void OnNext(IAllocatedEvaluator value)
+ {
+ value.SubmitContext(
+ ContextConfiguration.ConfigurationModule
+ .Set(ContextConfiguration.Identifier, ContextId1)
+ .Set(ContextConfiguration.OnContextStart, GenericType<ContextStartHandler>.Class)
+ .Build());
+ }
+
+ public void OnNext(ICompletedTask value)
+ {
+ Logger.Log(Level.Info, "Task is completed:" + value.Id);
+ Assert.Equal(value.Id, TaskId);
+ value.ActiveContext.Dispose();
+ }
+
+ public void OnNext(IClosedContext value)
+ {
+ Logger.Log(Level.Info, "Second context is closed: " + value.Id);
+ Assert.Equal(value.Id, ContextId2);
+ Assert.Equal(value.ParentContext.Id, ContextId1);
+ value.ParentContext.Dispose();
+ }
+
+ public void OnNext(ICompletedEvaluator value)
+ {
+ Logger.Log(Level.Info, "In CompletedEvaluator " + value.Id);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private sealed class ContextStartHandler : IObserver<IContextStart>
+ {
+ private readonly DoSomething _doSomething;
+
+ [Inject]
+ private ContextStartHandler(DoSomething dataDownLoader)
+ {
+ _doSomething = dataDownLoader;
+ }
+
+ public void OnNext(IContextStart value)
+ {
+ Logger.Log(Level.Info, StartedHandlerMessage);
+ _doSomething.DoIt();
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private sealed class DoSomething
+ {
+ private bool _done;
+
+ [Inject]
+ private DoSomething()
+ {
+ _done = false;
+ }
+
+ public void DoIt()
+ {
+ Logger.Log(Level.Info, StartedMessage);
+ _done = true;
+ }
+
+ public bool Done
+ {
+ get { return _done; }
+ }
+ }
+
+ private sealed class TestTask : ITask
+ {
+ private readonly DoSomething _dataDownLoader;
+
+ [Inject]
+ private TestTask(DoSomething dataDownLoader)
+ {
+ _dataDownLoader = dataDownLoader;
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public byte[] Call(byte[] memento)
+ {
+ Logger.Log(Level.Info, "Hello in TestTask");
+ if (_dataDownLoader.Done == true)
+ {
+ Logger.Log(Level.Info, CompletedMessage);
+ return null;
+ }
+ return null;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/928fe3aa/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 0bc15e6..94c6e23 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -83,6 +83,7 @@ under the License.
<Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" />
<Compile Include="Functional\Bridge\TestSuspendTask.cs" />
<Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
+ <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />
<Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" />
<Compile Include="Functional\Driver\TestDriver.cs" />
<Compile Include="Functional\Messaging\MessageDriver.cs" />