You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/03/24 01:23:44 UTC

reef git commit: [REEF-1260] Adding a sample and test for Context Start handler

Repository: reef
Updated Branches:
  refs/heads/master c27ea4220 -> 928fe3aac


[REEF-1260] Adding a sample and test for Context Start handler

JIRA:
  [REEF-1260](https://issues.apache.org/jira/browse/REEF-1260)

Pull Request:
  This closes #891


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/928fe3aa
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/928fe3aa
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/928fe3aa

Branch: refs/heads/master
Commit: 928fe3aacd957f1e45446199bdd75577fd43b5f5
Parents: c27ea42
Author: Julia Wang <ju...@apache.org>
Authored: Wed Mar 16 18:08:21 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Mar 23 17:05:05 2016 -0700

----------------------------------------------------------------------
 .../FaultTolerant/TestContextStart.cs           | 244 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 2 files changed, 245 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/928fe3aa/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
new file mode 100644
index 0000000..6a421aa
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
+{
+    /// <summary>
+    /// This test case servers as an example to put data downloading at part of the ContextStartHandler
+    /// </summary>
+    public class TestContextStart : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(TestContextStart));
+        private const string StartedHandlerMessage = "Start Handler is called.";
+        private const string StartedMessage = "Do something started.";
+        private const string CompletedMessage = "Do something completed.";
+
+        public TestContextStart()
+        {
+            Init();
+        }
+
+        /// <summary>
+        /// This test case submit a context with a Context start handler and do something in the handler
+        /// </summary>
+        [Fact]
+        public void TestDosomethingOnContextStartOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(), typeof(ContextStartDriver), 1, "ContextStartDriver", "local", testFolder);
+            ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
+
+            var messages = new List<string>();
+            messages.Add(StartedMessage);
+            messages.Add(StartedHandlerMessage);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 2);
+            CleanUp(testFolder);
+        }
+
+        public IConfiguration DriverConfigurations()
+        {
+            return DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, GenericType<ContextStartDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ContextStartDriver>.Class)
+                .Set(DriverConfiguration.OnContextActive, GenericType<ContextStartDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, GenericType<ContextStartDriver>.Class)
+                .Set(DriverConfiguration.OnContextClosed, GenericType<ContextStartDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorCompleted, GenericType<ContextStartDriver>.Class)
+                .Build();
+        }
+
+        private sealed class ContextStartDriver :
+             IObserver<IDriverStarted>,
+             IObserver<IAllocatedEvaluator>,
+             IObserver<IActiveContext>,
+             IObserver<ICompletedTask>,
+             IObserver<IClosedContext>,
+             IObserver<ICompletedEvaluator>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+            private const string ContextId1 = "ContextID1";
+            private const string ContextId2 = "ContextID2";
+            private const string TaskId = "TaskID";
+            private bool _first = true;
+
+            [Inject]
+            private ContextStartDriver(IEvaluatorRequestor evaluatorRequestor)
+            {
+                _requestor = evaluatorRequestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IActiveContext value)
+            {               
+                Logger.Log(Level.Info, "IActiveContext: " + value.Id);
+
+                if (_first)
+                {
+                    Assert.Equal(value.Id, ContextId1);
+                    _first = false;
+                    value.SubmitContext(
+                        ContextConfiguration.ConfigurationModule
+                            .Set(ContextConfiguration.Identifier, ContextId2)
+                            .Set(ContextConfiguration.OnContextStart, GenericType<ContextStartHandler>.Class)
+                            .Build());
+                }
+                else
+                {
+                    Assert.Equal(value.Id, ContextId2);
+                    var c = TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, TaskId)
+                        .Set(TaskConfiguration.Task, GenericType<TestTask>.Class)
+                        .Build();
+                    value.SubmitTask(c);
+                }
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitContext(
+                    ContextConfiguration.ConfigurationModule
+                        .Set(ContextConfiguration.Identifier, ContextId1)
+                        .Set(ContextConfiguration.OnContextStart, GenericType<ContextStartHandler>.Class)
+                        .Build());
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, "Task is completed:" + value.Id);
+                Assert.Equal(value.Id, TaskId);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnNext(IClosedContext value)
+            {
+                Logger.Log(Level.Info, "Second context is closed: " + value.Id);
+                Assert.Equal(value.Id, ContextId2);
+                Assert.Equal(value.ParentContext.Id, ContextId1);
+                value.ParentContext.Dispose();
+            }
+
+            public void OnNext(ICompletedEvaluator value)
+            {
+                Logger.Log(Level.Info, "In CompletedEvaluator " + value.Id);
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class ContextStartHandler : IObserver<IContextStart>
+        {
+            private readonly DoSomething _doSomething;
+
+            [Inject]
+            private ContextStartHandler(DoSomething dataDownLoader)
+            {
+                _doSomething = dataDownLoader;
+            }
+
+            public void OnNext(IContextStart value)
+            {
+                Logger.Log(Level.Info, StartedHandlerMessage);
+                _doSomething.DoIt();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class DoSomething
+        {
+            private bool _done;
+
+            [Inject]
+            private DoSomething()
+            {
+                _done = false;
+            }
+
+            public void DoIt()
+            {
+                Logger.Log(Level.Info, StartedMessage);
+                _done = true;
+            }
+
+            public bool Done
+            {
+                get { return _done; }
+            }
+        }
+
+        private sealed class TestTask : ITask
+        {
+            private readonly DoSomething _dataDownLoader;
+
+            [Inject]
+            private TestTask(DoSomething dataDownLoader)
+            {
+                _dataDownLoader = dataDownLoader;
+            }
+
+            public void Dispose()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                Logger.Log(Level.Info, "Hello in TestTask");
+                if (_dataDownLoader.Done == true)
+                {
+                    Logger.Log(Level.Info, CompletedMessage);
+                    return null;
+                }
+                return null;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/928fe3aa/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 0bc15e6..94c6e23 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -83,6 +83,7 @@ under the License.
     <Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" />
     <Compile Include="Functional\Bridge\TestSuspendTask.cs" />
     <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
+    <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />
     <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" />
     <Compile Include="Functional\Driver\TestDriver.cs" />
     <Compile Include="Functional\Messaging\MessageDriver.cs" />