You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/03/03 20:08:56 UTC

reef git commit: [REEF-1232] Fix binding in TaskConfiguration for ITaskMessageSource

Repository: reef
Updated Branches:
  refs/heads/master 07874cbb6 -> 00ac1d06a


[REEF-1232] Fix binding in TaskConfiguration for ITaskMessageSource

 * replace the binding in TaskConfiguration for ITaskMessageSource from an
   interface to a set of interface
 * Update TestTaskMessage classes to verify the messages back forth between
   driver and task
 * Update test bases class for evaluator log validation as well.

JIRA:
  [REEF-1232](https://issues.apache.org/jira/browse/REEF-1232)

Pull Request
  This closes #871


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/00ac1d06
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/00ac1d06
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/00ac1d06

Branch: refs/heads/master
Commit: 00ac1d06a144f92f519ae53cf960e88827034a4a
Parents: 07874cb
Author: Julia Wang <ju...@apache.org>
Authored: Wed Mar 2 16:01:54 2016 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Mar 3 11:06:37 2016 -0800

----------------------------------------------------------------------
 .../Tasks/TaskConfiguration.cs                  |  2 +-
 .../Functional/Bridge/TestContextStack.cs       |  4 +-
 .../Bridge/TestFailedEvaluatorEventHandler.cs   |  2 +-
 .../Bridge/TestFailedTaskEventHandler.cs        |  2 +-
 .../Functional/Bridge/TestSimpleContext.cs      |  2 +-
 .../Bridge/TestSimpleEventHandlers.cs           |  2 +-
 .../Functional/Messaging/MessageTask.cs         |  6 +--
 .../Functional/Messaging/TestTaskMessage.cs     | 17 ++++++-
 .../Functional/ReefFunctionalTest.cs            | 51 ++++++++++++++------
 .../Functional/RuntimeName/RuntimeNameTest.cs   |  2 +-
 10 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
index 409a0de..a0f606e 100644
--- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Common.Tasks
             {
                 return new TaskConfiguration()
                     .BindImplementation(GenericType<ITask>.Class, Task)
-                    .BindImplementation(GenericType<ITaskMessageSource>.Class, OnSendMessage)
+                    .BindSetEntry(GenericType<TaskConfigurationOptions.TaskMessageSources>.Class, OnSendMessage)
                     .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage)
                     .BindImplementation(GenericType<IDriverConnectionMessageHandler>.Class, OnDriverConnectionChanged)
                     .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier)

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
index 2c2daa6..f756a36 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
@@ -57,8 +57,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             CleanUp(testFolder);
             TestRun(DriverConfigurations(), typeof(ContextStackHandlers), 1, "testContextStack", "local", testFolder);
             ValidateSuccessForLocalRuntime(2, testFolder: testFolder);
-            ValidateMessageSuccessfullyLogged(TaskValidationMessage, testFolder);
-            ValidateMessageSuccessfullyLogged(ClosedContextValidationMessage, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(TaskValidationMessage, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(ClosedContextValidationMessage, testFolder);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedEvaluatorEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedEvaluatorEventHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedEvaluatorEventHandler.cs
index 755c92c..c7a01c0 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedEvaluatorEventHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedEvaluatorEventHandler.cs
@@ -50,7 +50,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             CleanUp(testFolder);
             TestRun(DriverConfigurations(), typeof(FailedEvaluatorDriver), 1, "failedEvaluatorTest", "local", testFolder);
             ValidateSuccessForLocalRuntime(0, numberOfEvaluatorsToFail: 1, testFolder: testFolder);
-            ValidateMessageSuccessfullyLogged(FailedEvaluatorMessage, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(FailedEvaluatorMessage, testFolder);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
index 82e796a..db1001b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
@@ -51,7 +51,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             CleanUp(testFolder);
             TestRun(DriverConfigurations(), typeof(FailedTaskDriver), 1, "failedTaskTest", "local", testFolder);
             ValidateSuccessForLocalRuntime(numberOfContextsToClose: 1, numberOfTasksToFail: 1, testFolder: testFolder);
-            ValidateMessageSuccessfullyLogged(FailedTaskMessage, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(FailedTaskMessage, testFolder);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleContext.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleContext.cs
index 44c5fe0..d0da1ba 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleContext.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleContext.cs
@@ -64,7 +64,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             CleanUp(testFolder);
             TestRun(configuration, typeof(TestContextHandlers), 1, "testSimpleContext", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
-            ValidateMessageSuccessfullyLogged(ValidationMessage, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(ValidationMessage, testFolder);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs
index b652065..f8b5dbe 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestSimpleEventHandlers.cs
@@ -50,7 +50,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             CleanUp(testFolder);
             TestRun(DriverConfigurations(), typeof(HelloSimpleEventHandlers), 2, "simpleHandler", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
-            ValidateMessageSuccessfullyLogged("Evaluator is assigned with 3072 MB of memory and 1 cores.", testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver("Evaluator is assigned with 3072 MB of memory and 1 cores.", testFolder);
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
index 5ed25a9..6f2785d 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageTask.cs
@@ -44,10 +44,11 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
         public Optional<TaskMessage> Message
         {
             get
-            {
+            {              
                 TaskMessage defaultTaskMessage = TaskMessage.From(
                     "messagingSourceId",
                     ByteUtilities.StringToByteArrays(MessageSend + " generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture)));
+                LOGGER.Log(Level.Info, "Message is sent back from task to driver:" + defaultTaskMessage.Message);
                 return Optional<TaskMessage>.Of(defaultTaskMessage);
             }
 
@@ -70,7 +71,6 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
 
         private void DriverMessage(string message)
         {
-            LOGGER.Log(Level.Info, "Received DriverMessage in TaskMsg: " + message);
             if (!message.Equals(MessageDriver.Message))
             {
                 Exceptions.Throw(new Exception("Unexpected driver message: " + message), "Unexpected driver message received: " + message, LOGGER);
@@ -90,10 +90,10 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
             public void Handle(IDriverMessage value)
             {
                 string message = string.Empty;
-                LOGGER.Log(Level.Verbose, "Received a message from driver, handling it with MessagingDriverMessageHandler");
                 if (value.Message.IsPresent())
                 {
                     message = ByteUtilities.ByteArraysToString(value.Message.Value);
+                    LOGGER.Log(Level.Info, "Received a message from driver, handling it with MessagingDriverMessageHandler:" + message);
                 }
                 _parentTask.DriverMessage(message);
             }

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
index 65e91fe..9152eb8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Defaults;
@@ -48,10 +51,22 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
         //// TODO[JIRA REEF-1184]: add timeout 180 sec
         public void TestSendTaskMessage()
         {
-            string testFolder = DefaultRuntimeFolder + TestNumber++;
+            string testFolder = DefaultRuntimeFolder + Guid.NewGuid();
             CleanUp(testFolder);
             TestRun(DriverConfigurations(), typeof(MessageDriver), 1, "simpleHandler", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+
+            var messages = new List<string>();
+            messages.Add("TaskMessagingTaskMessageHandler received following message from Task:");
+            messages.Add("Message: MESSAGE:TASK generated");
+            messages.Add("is to send message MESSAGE::DRIVER");      
+            ValidateMessageSuccessfullyLogged(messages, "driver", DriverStdout, testFolder, 0);
+
+            var messages2 = new List<string>();
+            messages.Add("Received a message from driver, handling it with MessagingDriverMessageHandler:MESSAGE::DRIVER");
+            messages.Add("Message is sent back from task to driver:");
+            ValidateMessageSuccessfullyLogged(messages2, "Node-*", EvaluatorStdout, testFolder, 0);
+
             CleanUp(testFolder);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index e547988..9250e67 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
 using System.Linq;
@@ -39,10 +40,11 @@ namespace Org.Apache.REEF.Tests.Functional
 {
     public class ReefFunctionalTest : IDisposable
     {
-        protected const string _stdout = "driver.stdout";
-        protected const string _stderr = "driver.stderr";
-        protected const string _cmdFile = "run.cmd";
-        protected const string _binFolder = ".";
+        protected const string DriverStdout = "driver.stdout";
+        protected const string DriverStderr = "driver.stderr";
+        protected const string EvaluatorStdout = "evaluator.stdout";
+        protected const string CmdFile = "run.cmd";
+        protected const string BinFolder = ".";
 
         protected static int TestNumber = 1;
         protected const string DefaultRuntimeFolder = "REEF_LOCAL_RUNTIME";
@@ -99,9 +101,9 @@ namespace Org.Apache.REEF.Tests.Functional
             
             ValidationUtilities.ValidateEnvVariable("JAVA_HOME");
 
-            if (!Directory.Exists(_binFolder))
+            if (!Directory.Exists(BinFolder))
             {
-                throw new InvalidOperationException(_binFolder + " not found in current directory, cannot init test");
+                throw new InvalidOperationException(BinFolder + " not found in current directory, cannot init test");
             }
         }
 
@@ -148,7 +150,7 @@ namespace Org.Apache.REEF.Tests.Functional
             {
                 try
                 {
-                    lines = File.ReadAllLines(GetLogFile(_stdout, testFolder));
+                    lines = File.ReadAllLines(GetLogFile(DriverStdout, "driver", testFolder));
                     break;
                 }
                 catch (Exception)
@@ -174,14 +176,21 @@ namespace Org.Apache.REEF.Tests.Functional
             }
         }
 
-        protected void ValidateMessageSuccessfullyLogged(string message, string testFolder, int numberOfoccurances = 1)
+        protected void ValidateMessageSuccessfullyLoggedForDriver(string message, string testFolder, int numberOfoccurances = 1)
+        {
+            var msgs = new List<string>();
+            msgs.Add(message);
+            ValidateMessageSuccessfullyLogged(msgs, "driver", DriverStdout, testFolder, numberOfoccurances);
+        }
+
+        protected void ValidateMessageSuccessfullyLogged(IList<string> messages, string subfolder, string fileName, string testFolder, int numberOfoccurances = 1)
         {
             string[] lines = null;
             for (int i = 0; i < 60; i++)
             {
                 try
                 {
-                    lines = File.ReadAllLines(GetLogFile(_stdout, testFolder));
+                    lines = File.ReadAllLines(GetLogFile(fileName, subfolder, testFolder));
                     break;
                 }
                 catch (Exception)
@@ -192,8 +201,18 @@ namespace Org.Apache.REEF.Tests.Functional
 
             if (lines != null)
             {
-                string[] successIndicators = lines.Where(s => s.Contains(message)).ToArray();
-                Assert.Equal(numberOfoccurances, successIndicators.Count());
+                foreach (string message in messages)
+                {
+                    string[] successIndicators = lines.Where(s => s.Contains(message)).ToArray();
+                    if (numberOfoccurances > 0)
+                    {
+                        Assert.Equal(numberOfoccurances, successIndicators.Count());
+                    }
+                    else
+                    {
+                        Assert.NotEqual(0, successIndicators.Count());
+                    }
+                }
             }
             else
             {
@@ -214,9 +233,9 @@ namespace Org.Apache.REEF.Tests.Functional
             }
         }
 
-        protected string GetLogFile(string logFileName, string testFolder = DefaultRuntimeFolder)
+        protected string GetLogFile(string logFileName, string subfolder = "driver", string testFolder = DefaultRuntimeFolder)
         {
-            string driverContainerDirectory = Directory.GetDirectories(Path.Combine(Directory.GetCurrentDirectory(), testFolder), "driver", SearchOption.AllDirectories).SingleOrDefault();
+            string driverContainerDirectory = Directory.GetDirectories(Path.Combine(Directory.GetCurrentDirectory(), testFolder), subfolder, SearchOption.AllDirectories).SingleOrDefault();
             Console.WriteLine("GetLogFile, driverContainerDirectory:" + driverContainerDirectory);
 
             if (string.IsNullOrWhiteSpace(driverContainerDirectory))
@@ -226,15 +245,15 @@ namespace Org.Apache.REEF.Tests.Functional
             string logFile = Path.Combine(driverContainerDirectory, logFileName);
             if (!File.Exists(logFile))
             {
-                throw new InvalidOperationException("Driver stdout file not found: " + logFile);
+                throw new InvalidOperationException("Log file not found: " + logFile);
             }
             return logFile;
         }
 
         private void UploadDriverLog()
         {
-            string driverStdout = GetLogFile(_stdout);
-            string driverStderr = GetLogFile(_stderr);
+            string driverStdout = GetLogFile(DriverStdout);
+            string driverStderr = GetLogFile(DriverStderr);
             CloudStorageAccount storageAccount = CloudStorageAccount.Parse(GetStorageConnectionString());
             CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
             CloudBlobContainer container = blobClient.GetContainerReference(DateTime.Now.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture));   

http://git-wip-us.apache.org/repos/asf/reef/blob/00ac1d06/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs
index 7e65596..fa08054 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/RuntimeName/RuntimeNameTest.cs
@@ -50,7 +50,7 @@ namespace Org.Apache.REEF.Tests.Functional.Driver
             string testFolder = DefaultRuntimeFolder + TestNumber++;
             CleanUp(testFolder);
             TestRun(DriverConfigurationsWithEvaluatorRequest(), typeof(EvaluatorRequestingDriver), 1, "EvaluatorRequestingDriver", "local", testFolder);
-            ValidateMessageSuccessfullyLogged("Runtime Name: Local", testFolder, 2);
+            ValidateMessageSuccessfullyLoggedForDriver("Runtime Name: Local", testFolder, 2);
             CleanUp(testFolder);
         }