You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:06:08 UTC
[47/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs
new file mode 100644
index 0000000..3a0b474
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ public class ContextClientCodeException : Exception
+ {
+ private readonly string _contextId;
+ private readonly Optional<string> _parentId;
+
+ /// <summary>
+ /// construt the exception that caused the error
+ /// </summary>
+ /// <param name="contextId"> the id of the failed context.</param>
+ /// <param name="parentId"> the id of the failed context's parent, if any.</param>
+ /// <param name="message"> the error message </param>
+ /// <param name="cause"> the exception that caused the error</param>
+ public ContextClientCodeException(
+ string contextId,
+ Optional<string> parentId,
+ string message,
+ Exception cause)
+ : base("Failure in context '" + contextId + "': " + message, cause)
+ {
+ _contextId = contextId;
+ _parentId = parentId;
+ }
+
+ public string ContextId
+ {
+ get { return _contextId; }
+ }
+
+ public Optional<string> ParentId
+ {
+ get { return _parentId; }
+ }
+
+ /// <summary>
+ /// Extracts a context id from the given configuration.
+ /// </summary>
+ /// <param name="c"></param>
+ /// <returns>the context id in the given configuration.</returns>
+ public static string GetId(IConfiguration c)
+ {
+ // TODO: update after TANG is available
+ return string.Empty;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs
new file mode 100644
index 0000000..bcd7fb0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using Org.Apache.REEF.Tang.Types;
+
+namespace Org.Apache.REEF.Common.Evaluator.Context
+{
+ public class ContextConfiguration : IConfiguration
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration));
+
+ private Dictionary<string, string> _settings;
+
+ public ContextConfiguration(string configString)
+ {
+ using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn"))
+ {
+ ContainerDirectory = Directory.GetCurrentDirectory();
+
+ _settings = new Dictionary<string, string>();
+ AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
+ foreach (ConfigurationEntry config in avroConfiguration.Bindings)
+ {
+ if (config.key.Contains(REEF.Evaluator.Constants.ContextIdentifier))
+ {
+ config.key = REEF.Evaluator.Constants.ContextIdentifier;
+ LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value));
+ }
+ _settings.Add(config.key, config.value);
+ }
+ if (!_settings.ContainsKey(REEF.Evaluator.Constants.ContextIdentifier))
+ {
+ string msg = "Required parameter ContextIdentifier not provided.";
+ LOGGER.Log(Level.Error, msg);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
+ }
+ }
+ }
+
+ public string Id
+ {
+ get { return _settings[REEF.Evaluator.Constants.ContextIdentifier]; }
+ }
+
+ public string ContainerDirectory { get; set; }
+
+ public IConfigurationBuilder newBuilder()
+ {
+ throw new NotImplementedException();
+ }
+
+ public string GetNamedParameter(INamedParameterNode np)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IClassHierarchy GetClassHierarchy()
+ {
+ throw new NotImplementedException();
+ }
+
+ public ISet<object> GetBoundSet(INamedParameterNode np)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IClassNode GetBoundConstructor(IClassNode cn)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IClassNode GetBoundImplementation(IClassNode cn)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IConstructorDef GetLegacyConstructor(IClassNode cn)
+ {
+ throw new NotImplementedException();
+ }
+
+ public ICollection<IClassNode> GetBoundImplementations()
+ {
+ throw new NotImplementedException();
+ }
+
+ public ICollection<IClassNode> GetBoundConstructors()
+ {
+ throw new NotImplementedException();
+ }
+
+ public ICollection<INamedParameterNode> GetNamedParameters()
+ {
+ throw new NotImplementedException();
+ }
+
+ public ICollection<IClassNode> GetLegacyConstructors()
+ {
+ throw new NotImplementedException();
+ }
+
+ public IList<object> GetBoundList(INamedParameterNode np)
+ {
+ throw new NotImplementedException();
+ }
+
+ public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets()
+ {
+ throw new NotImplementedException();
+ }
+
+ public IDictionary<INamedParameterNode, IList<object>> GetBoundList()
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs
new file mode 100644
index 0000000..97e65c0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Events;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ /// <summary>
+ /// This class is used to trigger all the context life-cycle dependent events.
+ /// </summary>
+ class ContextLifeCycle
+ {
+ private HashSet<IObserver<IContextStart>> _contextStartHandlers;
+
+ private HashSet<IObserver<IContextStop>> _contextStopHandlers;
+
+ private HashSet<IContextMessageSource> _contextMessageSources;
+
+ // @Inject
+ public ContextLifeCycle(
+ string id,
+ HashSet<IObserver<IContextStart>> contextStartHandlers,
+ HashSet<IObserver<IContextStop>> contextStopHandlers,
+ HashSet<IContextMessageSource> contextMessageSources)
+ {
+ Id = id;
+ _contextStartHandlers = contextStartHandlers;
+ _contextStopHandlers = contextStopHandlers;
+ _contextMessageSources = contextMessageSources;
+ }
+
+ public ContextLifeCycle(string contextId)
+ {
+ Id = contextId;
+ _contextStartHandlers = new HashSet<IObserver<IContextStart>>();
+ _contextStopHandlers = new HashSet<IObserver<IContextStop>>();
+ _contextMessageSources = new HashSet<IContextMessageSource>();
+ }
+
+ public string Id { get; private set; }
+
+ public HashSet<IContextMessageSource> ContextMessageSources
+ {
+ get { return _contextMessageSources; }
+ }
+
+ /// <summary>
+ /// Fires ContextStart to all registered event handlers.
+ /// </summary>
+ public void Start()
+ {
+ IContextStart contextStart = new ContextStartImpl(Id);
+
+ // TODO: enable
+ //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers)
+ //{
+ // startHandler.OnNext(contextStart);
+ //}
+ }
+
+ /// <summary>
+ /// Fires ContextStop to all registered event handlers.
+ /// </summary>
+ public void Close()
+ {
+ //IContextStop contextStop = new ContextStopImpl(Id);
+ //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers)
+ //{
+ // startHandler.OnNext(contextStop);
+ //}
+ }
+
+ public void HandleContextMessage(byte[] message)
+ {
+ //contextMessageHandler.onNext(message);
+ }
+
+ /// <summary>
+ /// get the set of ContextMessageSources configured
+ /// </summary>
+ /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns>
+ public HashSet<IContextMessageSource> GetContextMessageSources()
+ {
+ return new HashSet<IContextMessageSource>(_contextMessageSources);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs
new file mode 100644
index 0000000..7c4d288
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Task;
+using Org.Apache.REEF.Evaluator;
+using Org.Apache.REEF.Services;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Globalization;
+using System.Linq;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ public class ContextManager : IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager));
+
+ private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>();
+
+ private readonly HeartBeatManager _heartBeatManager;
+
+ private RootContextLauncher _rootContextLauncher;
+
+ public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
+ {
+ using (LOGGER.LogFunction("ContextManager::ContextManager"))
+ {
+ _heartBeatManager = heartBeatManager;
+ _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig);
+ }
+ }
+
+ /// <summary>
+ /// Start the context manager. This initiates the root context.
+ /// </summary>
+ public void Start()
+ {
+ lock (_contextStack)
+ {
+ ContextRuntime rootContext = _rootContextLauncher.GetRootContext();
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id));
+ _contextStack.Push(rootContext);
+
+ if (_rootContextLauncher.RootTaskConfig.IsPresent())
+ {
+ LOGGER.Log(Level.Info, "Launching the initial Task");
+ try
+ {
+ _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
+ }
+ catch (TaskClientCodeException e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER);
+ HandleTaskException(e);
+ }
+ }
+ }
+ }
+
+ public bool ContextStackIsEmpty()
+ {
+ lock (_contextStack)
+ {
+ return (_contextStack.Count == 0);
+ }
+ }
+
+ // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later
+
+ /// <summary>
+ /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
+ /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
+ /// </summary>
+ /// <param name="controlMessage"></param>
+ public void HandleTaskControl(ContextControlProto controlMessage)
+ {
+ try
+ {
+ byte[] message = controlMessage.task_message;
+ if (controlMessage.add_context != null && controlMessage.remove_context != null)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER);
+ }
+ if (controlMessage.add_context != null)
+ {
+ LOGGER.Log(Level.Info, "AddContext");
+ AddContext(controlMessage.add_context);
+ // support submitContextAndTask()
+ if (controlMessage.start_task != null)
+ {
+ LOGGER.Log(Level.Info, "StartTask");
+ StartTask(controlMessage.start_task);
+ }
+ else
+ {
+ // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime
+ // Therefore this call can not go into addContext
+ LOGGER.Log(Level.Info, "Trigger Heartbeat");
+ _heartBeatManager.OnNext();
+ }
+ }
+ else if (controlMessage.remove_context != null)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id));
+ RemoveContext(controlMessage.remove_context.context_id);
+ }
+ else if (controlMessage.start_task != null)
+ {
+ LOGGER.Log(Level.Info, "StartTask only");
+ StartTask(controlMessage.start_task);
+ }
+ else if (controlMessage.stop_task != null)
+ {
+ LOGGER.Log(Level.Info, "CloseTask");
+ _contextStack.Peek().CloseTask(message);
+ }
+ else if (controlMessage.suspend_task != null)
+ {
+ LOGGER.Log(Level.Info, "SuspendTask");
+ _contextStack.Peek().SuspendTask(message);
+ }
+ else if (controlMessage.task_message != null)
+ {
+ LOGGER.Log(Level.Info, "DeliverTaskMessage");
+ _contextStack.Peek().DeliverTaskMessage(message);
+ }
+ else if (controlMessage.context_message != null)
+ {
+ LOGGER.Log(Level.Info, "Handle context contol message");
+ ContextMessageProto contextMessageProto = controlMessage.context_message;
+ bool deliveredMessage = false;
+ foreach (ContextRuntime context in _contextStack)
+ {
+ if (context.Id.Equals(contextMessageProto.context_id))
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message));
+ context.HandleContextMessaage(controlMessage.context_message.message);
+ deliveredMessage = true;
+ break;
+ }
+ }
+ if (!deliveredMessage)
+ {
+ InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ }
+ else
+ {
+ InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString()));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ }
+ catch (Exception e)
+ {
+ if (e is TaskClientCodeException)
+ {
+ HandleTaskException((TaskClientCodeException)e);
+ }
+ else if (e is ContextClientCodeException)
+ {
+ HandlContextException((ContextClientCodeException)e);
+ }
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER);
+ }
+ }
+
+ /// <summary>
+ /// Get TaskStatusProto of the currently running task, if there is any
+ /// </summary>
+ /// <returns>the TaskStatusProto of the currently running task, if there is any</returns>
+ public Optional<TaskStatusProto> GetTaskStatus()
+ {
+ if (_contextStack.Count == 0)
+ {
+ return Optional<TaskStatusProto>.Empty();
+
+ //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running.");
+ }
+ return _contextStack.Peek().GetTaskStatus();
+ }
+
+ /// <summary>
+ /// get status of all contexts in the stack.
+ /// </summary>
+ /// <returns>the status of all contexts in the stack.</returns>
+ public ICollection<ContextStatusProto> GetContextStatusCollection()
+ {
+ ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>();
+ foreach (ContextRuntime runtime in _contextStack)
+ {
+ ContextStatusProto contextStatusProto = runtime.GetContextStatus();
+ LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto));
+ result.Add(contextStatusProto);
+ }
+ return result;
+ }
+
+ /// <summary>
+ /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
+ /// starting at the top.
+ /// </summary>
+ public void Dispose()
+ {
+ lock (_contextStack)
+ {
+ if (_contextStack != null && _contextStack.Any())
+ {
+ LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime.");
+ ContextRuntime runtime = _contextStack.Last();
+ if (runtime != null)
+ {
+ runtime.Dispose();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Add a context to the stack.
+ /// </summary>
+ /// <param name="addContextProto"></param>
+ private void AddContext(AddContextProto addContextProto)
+ {
+ lock (_contextStack)
+ {
+ ContextRuntime currentTopContext = _contextStack.Peek();
+ if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase))
+ {
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}",
+ addContextProto.parent_context_id,
+ currentTopContext.Id));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ string contextConfigString = addContextProto.context_configuration;
+ ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString);
+ ContextRuntime newTopContext;
+ if (addContextProto.service_configuration != null)
+ {
+ ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration);
+ newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig);
+ }
+ else
+ {
+ newTopContext = currentTopContext.SpawnChildContext(contextConfiguration);
+ }
+ _contextStack.Push(newTopContext);
+ }
+ }
+
+ /// <summary>
+ /// Remove the context with the given ID from the stack.
+ /// </summary>
+ /// <param name="contextId"> context id</param>
+ private void RemoveContext(string contextId)
+ {
+ lock (_contextStack)
+ {
+ string currentTopContextId = _contextStack.Peek().Id;
+ if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase))
+ {
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ _contextStack.Peek().Dispose();
+ if (_contextStack.Count > 1)
+ {
+ // We did not close the root context. Therefore, we need to inform the
+ // driver explicitly that this context is closed. The root context notification
+ // is implicit in the Evaluator close/done notification.
+ _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state
+ }
+ _contextStack.Pop();
+ }
+ // System.gc(); // TODO: garbage collect?
+ }
+
+ /// <summary>
+ /// Launch an Task.
+ /// </summary>
+ /// <param name="startTaskProto"></param>
+ private void StartTask(StartTaskProto startTaskProto)
+ {
+ lock (_contextStack)
+ {
+ ContextRuntime currentActiveContext = _contextStack.Peek();
+ string expectedContextId = startTaskProto.context_id;
+ if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase))
+ {
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration);
+ currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager);
+ }
+ }
+
+ /// <summary>
+ /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+ /// </summary>
+ /// <param name="e"></param>
+ private void HandleTaskException(TaskClientCodeException e)
+ {
+ LOGGER.Log(Level.Error, "TaskClientCodeException", e);
+ byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
+ TaskStatusProto taskStatus = new TaskStatusProto()
+ {
+ context_id = e.ContextId,
+ task_id = e.TaskId,
+ result = exception,
+ state = State.FAILED
+ };
+ LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString()));
+ _heartBeatManager.OnNext(taskStatus);
+ }
+
+ /// <summary>
+ /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+ /// </summary>
+ /// <param name="e"></param>
+ private void HandlContextException(ContextClientCodeException e)
+ {
+ LOGGER.Log(Level.Error, "ContextClientCodeException", e);
+ byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
+ ContextStatusProto contextStatusProto = new ContextStatusProto()
+ {
+ context_id = e.ContextId,
+ context_state = ContextStatusProto.State.FAIL,
+ error = exception
+ };
+ if (e.ParentId.IsPresent())
+ {
+ contextStatusProto.parent_id = e.ParentId.Value;
+ }
+ LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString()));
+ _heartBeatManager.OnNext(contextStatusProto);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs
new file mode 100644
index 0000000..9ed7a5c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Task;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ public class ContextRuntime
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime));
+ // Context-local injector. This contains information that will not be available in child injectors.
+ private readonly IInjector _contextInjector;
+ //// Service injector. State in this injector moves to child injectors.
+ private readonly IInjector _serviceInjector;
+
+ // Convenience class to hold all the event handlers for the context as well as the service instances.
+ private readonly ContextLifeCycle _contextLifeCycle;
+
+ // The child context, if any.
+ private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty();
+
+ // The parent context, if any.
+ private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty();
+
+ // The currently running task, if any.
+ private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty();
+
+ private ContextStatusProto.State _contextState = ContextStatusProto.State.READY;
+
+ /// <summary>
+ /// Create a new ContextRuntime.
+ /// </summary>
+ /// <param name="serviceInjector"></param>
+ /// <param name="contextConfiguration">the Configuration for this context.</param>
+ /// <param name="parentContext"></param>
+ public ContextRuntime(
+ IInjector serviceInjector,
+ IConfiguration contextConfiguration,
+ Optional<ContextRuntime> parentContext)
+ {
+ ContextConfiguration config = contextConfiguration as ContextConfiguration;
+ if (config == null)
+ {
+ var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration");
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ _contextLifeCycle = new ContextLifeCycle(config.Id);
+ _serviceInjector = serviceInjector;
+ _parentContext = parentContext;
+ try
+ {
+ _contextInjector = serviceInjector.ForkInjector();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+
+ Optional<string> parentId = ParentContext.IsPresent() ?
+ Optional<string>.Of(ParentContext.Value.Id) :
+ Optional<string>.Empty();
+ ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e);
+
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+ }
+ // Trigger the context start events on contextInjector.
+ _contextLifeCycle.Start();
+ }
+
+ /// <summary>
+ /// Create a new ContextRuntime for the root context.
+ /// </summary>
+ /// <param name="serviceInjector"> </param> the serviceInjector to be used.
+ /// <param name="contextConfiguration"> the Configuration for this context.</param>
+ public ContextRuntime(
+ IInjector serviceInjector,
+ IConfiguration contextConfiguration)
+ : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty())
+ {
+ LOGGER.Log(Level.Info, "Instantiating root context");
+ }
+
+ public string Id
+ {
+ get { return _contextLifeCycle.Id; }
+ }
+
+ public Optional<ContextRuntime> ParentContext
+ {
+ get { return _parentContext; }
+ }
+
+ /// <summary>
+ /// Spawns a new context.
+ /// The new context will have a serviceInjector that is created by forking the one in this object with the given
+ /// serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+ /// </summary>
+ /// <param name="contextConfiguration">the new context's context (local) Configuration.</param>
+ /// <param name="serviceConfiguration">the new context's service Configuration.</param>
+ /// <returns>a child context.</returns>
+ public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
+ {
+ ContextRuntime childContext = null;
+ lock (_contextLifeCycle)
+ {
+ if (_task.IsPresent())
+ {
+ var e = new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ if (_childContext.IsPresent())
+ {
+ var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ try
+ {
+ IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration });
+ childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this));
+ _childContext = Optional<ContextRuntime>.Of(childContext);
+ return childContext;
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+
+ Optional<string> parentId = ParentContext.IsPresent() ?
+ Optional<string>.Of(ParentContext.Value.Id) :
+ Optional<string>.Empty();
+ ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e);
+
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+ }
+ }
+ return childContext;
+ }
+
+ /// <summary>
+ /// Spawns a new context without services of its own.
+ /// The new context will have a serviceInjector that is created by forking the one in this object. The
+ /// contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+ /// </summary>
+ /// <param name="contextConfiguration">the new context's context (local) Configuration.</param>
+ /// <returns> a child context.</returns>
+ public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration)
+ {
+ lock (_contextLifeCycle)
+ {
+ if (_task.IsPresent())
+ {
+ var e = new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ if (_childContext.IsPresent())
+ {
+ var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ IInjector childServiceInjector = _serviceInjector.ForkInjector();
+ ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this));
+ _childContext = Optional<ContextRuntime>.Of(childContext);
+ return childContext;
+ }
+ }
+
+ /// <summary>
+ /// Launches an Task on this context.
+ /// </summary>
+ /// <param name="taskConfiguration"></param>
+ /// <param name="contextId"></param>
+ /// <param name="heartBeatManager"></param>
+ public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager)
+ {
+ lock (_contextLifeCycle)
+ {
+ bool taskPresent = _task.IsPresent();
+ bool taskEnded = taskPresent && _task.Value.HasEnded();
+
+ LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded);
+ if (taskPresent)
+ {
+ LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState());
+ }
+
+ if (taskEnded)
+ {
+ // clean up state
+ _task = Optional<TaskRuntime>.Empty();
+ taskPresent = false;
+ }
+ if (taskPresent)
+ {
+ var e = new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ if (_childContext.IsPresent())
+ {
+ var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ try
+ {
+ IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig });
+ LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString());
+ TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class);
+ taskRuntime.Initialize();
+ System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start));
+ _task = Optional<TaskRuntime>.Of(taskRuntime);
+ }
+ catch (Exception e)
+ {
+ var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Close this context. If there is a child context, this recursively closes it before closing this context. If
+ /// there is an Task currently running, that will be closed.
+ /// </summary>
+ public void Dispose()
+ {
+ lock (_contextLifeCycle)
+ {
+ _contextState = ContextStatusProto.State.DONE;
+ if (_task.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed.");
+ _task.Value.Close(null);
+ }
+ if (_childContext.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed.");
+ _childContext.Value.Dispose();
+ }
+ _contextLifeCycle.Close();
+ if (_parentContext.IsPresent())
+ {
+ ParentContext.Value.ResetChildContext();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Issue a suspend call to the Task
+ /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+ /// in the log.
+ /// </summary>
+ /// <param name="message"> the suspend message to deliver or null if there is none.</param>
+ public void SuspendTask(byte[] message)
+ {
+ lock (_contextLifeCycle)
+ {
+ if (!_task.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored");
+ }
+ else
+ {
+ _task.Value.Suspend(message);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Issue a close call to the Task
+ /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+ /// in the log.
+ /// </summary>
+ /// <param name="message">the close message to deliver or null if there is none.</param>
+ public void CloseTask(byte[] message)
+ {
+ lock (_contextLifeCycle)
+ {
+ if (!_task.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored");
+ }
+ else
+ {
+ _task.Value.Close(message);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Deliver a message to the Task
+ /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+ /// in the log.
+ /// </summary>
+ /// <param name="message">the message to deliver or null if there is none.</param>
+ public void DeliverTaskMessage(byte[] message)
+ {
+ lock (_contextLifeCycle)
+ {
+ if (!_task.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored");
+ }
+ else
+ {
+ _task.Value.Deliver(message);
+ }
+ }
+ }
+
+ public void HandleContextMessaage(byte[] mesage)
+ {
+ _contextLifeCycle.HandleContextMessage(mesage);
+ }
+
+ /// <summary>
+ /// get state of the running Task
+ /// </summary>
+ /// <returns> the state of the running Task, if one is running.</returns>
+ public Optional<TaskStatusProto> GetTaskStatus()
+ {
+ lock (_contextLifeCycle)
+ {
+ if (_task.IsPresent())
+ {
+ if (_task.Value.HasEnded())
+ {
+ _task = Optional<TaskRuntime>.Empty();
+ return Optional<TaskStatusProto>.Empty();
+ }
+ else
+ {
+ TaskStatusProto taskStatusProto = _task.Value.GetStatusProto();
+ if (taskStatusProto.state == State.RUNNING)
+ {
+ // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat
+ return Optional<TaskStatusProto>.Of(taskStatusProto);
+ }
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ return Optional<TaskStatusProto>.Empty();
+ }
+ }
+ else
+ {
+ return Optional<TaskStatusProto>.Empty();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Reset child context when parent is being closed
+ /// </summary>
+ public void ResetChildContext()
+ {
+ lock (_contextLifeCycle)
+ {
+ if (_childContext.IsPresent())
+ {
+ _childContext = Optional<ContextRuntime>.Empty();
+ }
+ else
+ {
+ var e = new InvalidOperationException("no child context set");
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// get context's status in protocol buffer
+ /// </summary>
+ /// <returns>this context's status in protocol buffer form.</returns>
+ public ContextStatusProto GetContextStatus()
+ {
+ lock (_contextLifeCycle)
+ {
+ ContextStatusProto contextStatusProto = new ContextStatusProto()
+ {
+ context_id = Id,
+ context_state = _contextState,
+ };
+ if (_parentContext.IsPresent())
+ {
+ contextStatusProto.parent_id = _parentContext.Value.Id;
+ }
+
+ foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources)
+ {
+ Optional<ContextMessage> contextMessageOptional = source.Message;
+ if (contextMessageOptional.IsPresent())
+ {
+ ContextStatusProto.ContextMessageProto contextMessageProto
+ = new ContextStatusProto.ContextMessageProto()
+ {
+ source_id = contextMessageOptional.Value.MessageSourceId,
+ };
+ contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes);
+ contextStatusProto.context_message.Add(contextMessageProto);
+ }
+ }
+ return contextStatusProto;
+ }
+ }
+ }
+}
+ ///// <summary>
+ ///// TODO: remove and use parameterless GetContextStatus above
+ ///// </summary>
+ ///// <returns>this context's status in protocol buffer form.</returns>
+ //public ContextStatusProto GetContextStatus(string contextId)
+ //{
+ // ContextStatusProto contextStatusProto = new ContextStatusProto()
+ // {
+ // context_id = contextId,
+ // context_state = _contextState,
+ // };
+ // return contextStatusProto;
+ //}
+
+ ////// TODO: remove and use injection
+ //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId)
+ //{
+ // lock (_contextLifeCycle)
+ // {
+ // if (_task.IsPresent() && _task.Value.HasEnded())
+ // {
+ // // clean up state
+ // _task = Optional<TaskRuntime>.Empty();
+ // }
+ // if (_task.IsPresent())
+ // {
+ // throw new InvalidOperationException(
+ // string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+ // }
+ // if (_childContext.IsPresent())
+ // {
+ // throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+ // }
+ // try
+ // {
+ // // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration);
+ // TaskRuntime taskRuntime // taskInjector.getInstance(TaskRuntime.class);
+ // = new TaskRuntime(task, heartBeatManager);
+ // LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId));
+ // taskRuntime.Initialize(taskId, contextId);
+ // taskRuntime.Start();
+ // _task = Optional<TaskRuntime>.Of(taskRuntime);
+ // }
+ // catch (Exception e)
+ // {
+ // throw new InvalidOperationException("Unable to instantiate the new task");
+ // }
+ // }
+ //}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs
new file mode 100644
index 0000000..7c62a0b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Events;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ class ContextStartImpl : IContextStart
+ {
+ public ContextStartImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs
new file mode 100644
index 0000000..4df40b6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Events;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ class ContextStopImpl : IContextStop
+ {
+ public ContextStopImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs
new file mode 100644
index 0000000..e7daecb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Services;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Globalization;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Common.Context
+{
+ /// <summary>
+ /// Helper class that encapsulates the root context configuration: With or without services and an initial task.
+ /// </summary>
+ public sealed class RootContextLauncher
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher));
+
+ private readonly IInjector _rootServiceInjector = null;
+
+ private ContextRuntime _rootContext = null;
+
+ private ContextConfiguration _rootContextConfiguration = null;
+
+ public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
+ {
+ _rootContextConfiguration = rootContextConfig;
+ _rootServiceInjector = InjectServices(rootServiceConfig);
+ RootTaskConfig = rootTaskConfig;
+ }
+
+ public Optional<TaskConfiguration> RootTaskConfig { get; set; }
+
+ public ContextConfiguration RootContextConfig
+ {
+ get { return _rootContextConfiguration; }
+ set { _rootContextConfiguration = value; }
+ }
+
+ public ContextRuntime GetRootContext()
+ {
+ if (_rootContext == null)
+ {
+ _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration);
+ }
+ return _rootContext;
+ }
+
+ private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig)
+ {
+ IInjector rootServiceInjector;
+
+ if (serviceConfig.IsPresent())
+ {
+ rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig);
+ InjectedServices services = null;
+ try
+ {
+ services = rootServiceInjector.GetInstance<InjectedServices>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER);
+ InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace));
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+ }
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count));
+ }
+ else
+ {
+ rootServiceInjector = TangFactory.GetTang().NewInjector();
+ LOGGER.Log(Level.Info, "no service provided for injection.");
+ }
+
+ return rootServiceInjector;
+ }
+
+ private ContextRuntime GetRootContext(
+ IInjector rootServiceInjector,
+ IConfiguration rootContextConfiguration)
+ {
+ ContextRuntime result;
+ result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+ return result;
+ }
+ }
+}
+//if (rootServiceInjector != null)
+//{
+// try
+// {
+// rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs);
+// }
+// catch (Exception e)
+// {
+// throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration),
+// Optional<String>.Empty(),
+// "Unable to instatiate the root context", e);
+// }
+// result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+//}
+//else
+//{
+// result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration);
+//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs
new file mode 100644
index 0000000..fc50c73
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common
+{
+ public class CloseEventImpl : ICloseEvent
+ {
+ public CloseEventImpl()
+ {
+ Value = Optional<byte[]>.Empty();
+ }
+
+ public CloseEventImpl(byte[] bytes)
+ {
+ Value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Value
+ {
+ get { return Value; }
+ set { value = Value; }
+ }
+
+ public override string ToString()
+ {
+ return "CloseEvent{value=" + Value.ToString() + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs
new file mode 100644
index 0000000..2b00aa2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator
+{
+ public class DriverMessageImpl : IDriverMessage
+ {
+ private Optional<byte[]> _value;
+
+ public DriverMessageImpl()
+ {
+ _value = Optional<byte[]>.Empty();
+ }
+
+ public DriverMessageImpl(byte[] bytes)
+ {
+ _value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Message
+ {
+ get
+ {
+ return _value;
+ }
+ }
+
+ public override string ToString()
+ {
+ return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs
new file mode 100644
index 0000000..a6bb52f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common
+{
+ public class SuspendEventImpl : ICloseEvent
+ {
+ public SuspendEventImpl()
+ {
+ Value = Optional<byte[]>.Empty();
+ }
+
+ public SuspendEventImpl(byte[] bytes)
+ {
+ Value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Value
+ {
+ get { return Value; }
+ set { value = Value; }
+ }
+
+ public override string ToString()
+ {
+ return "SuspendEvent{value=" + Value.ToString() + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs
new file mode 100644
index 0000000..22bdbd3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Common.Task
+{
+ public class TaskClientCodeException : Exception
+ {
+ private readonly string _taskId;
+
+ private readonly string _contextId;
+
+ /// <summary>
+ /// construct the exception that caused the Task to fail
+ /// </summary>
+ /// <param name="taskId"> the id of the failed task.</param>
+ /// <param name="contextId"> the id of the context the failed Task was executing in.</param>
+ /// <param name="message"> the error message </param>
+ /// <param name="cause"> the exception that caused the Task to fail.</param>
+ public TaskClientCodeException(
+ string taskId,
+ string contextId,
+ string message,
+ Exception cause)
+ : base(message, cause)
+ {
+ _taskId = taskId;
+ _contextId = contextId;
+ }
+
+ public string TaskId
+ {
+ get { return _taskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _contextId; }
+ }
+
+ public static string GetTaskIdentifier(IConfiguration c)
+ {
+ // TODO: update after TANG is available
+ return string.Empty;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs
new file mode 100644
index 0000000..26e638d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Wake;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Common
+{
+ public class TaskLifeCycle
+ {
+ private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers;
+ private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers;
+ private readonly ITaskStart _taskStart;
+ private readonly ITaskStop _taskStop;
+
+ // INJECT
+ public TaskLifeCycle(
+ HashSet<IObserver<ITaskStop>> taskStopHandlers,
+ HashSet<IObserver<ITaskStart>> taskStartHandlers,
+ TaskStartImpl taskStart,
+ TaskStopImpl taskStop)
+ {
+ _taskStartHandlers = taskStartHandlers;
+ _taskStopHandlers = taskStopHandlers;
+ _taskStart = taskStart;
+ _taskStop = taskStop;
+ }
+
+ public TaskLifeCycle()
+ {
+ _taskStartHandlers = new HashSet<IObserver<ITaskStart>>();
+ _taskStopHandlers = new HashSet<IObserver<ITaskStop>>();
+ }
+
+ public void Start()
+ {
+ foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers)
+ {
+ startHandler.OnNext(_taskStart);
+ }
+ }
+
+ public void Stop()
+ {
+ foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers)
+ {
+ stopHandler.OnNext(_taskStop);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs
new file mode 100644
index 0000000..d531df7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Runtime.Evaluator;
+using Org.Apache.REEF.Common.Task;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Common
+{
+ public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime));
+
+ private readonly ITask _task;
+
+ private readonly IInjector _injector;
+
+ // The memento given by the task configuration
+ private readonly Optional<byte[]> _memento;
+
+ private readonly HeartBeatManager _heartBeatManager;
+
+ private readonly TaskStatus _currentStatus;
+
+ private readonly INameClient _nameClient;
+
+ public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null)
+ {
+ _injector = taskInjector;
+ _heartBeatManager = heartBeatManager;
+
+ Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty();
+ try
+ {
+ _task = _injector.GetInstance<ITask>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER);
+ }
+ try
+ {
+ ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>();
+ messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource });
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER);
+ // do not rethrow since this is benign
+ }
+ try
+ {
+ _nameClient = _injector.GetInstance<INameClient>();
+ _heartBeatManager.EvaluatorSettings.NameClient = _nameClient;
+ }
+ catch (InjectionException)
+ {
+ LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration.");
+ // do not rethrow since user is not required to provide name client
+ }
+
+ LOGGER.Log(Level.Info, "task message source injected");
+ _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources);
+ _memento = memento == null ?
+ Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento));
+ }
+
+ public string TaskId
+ {
+ get { return _currentStatus.TaskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _currentStatus.ContextId; }
+ }
+
+ public void Initialize()
+ {
+ _currentStatus.SetRunning();
+ }
+
+ /// <summary>
+ /// Run the task
+ /// </summary>
+ public void Start()
+ {
+ try
+ {
+ LOGGER.Log(Level.Info, "Call Task");
+ if (_currentStatus.IsNotRunning())
+ {
+ var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ byte[] result;
+ byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null;
+ System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento));
+ try
+ {
+ runTask.Start();
+ runTask.Wait();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER);
+ }
+ result = runTask.Result;
+
+ LOGGER.Log(Level.Info, "Task Call Finished");
+ if (_task != null)
+ {
+ _task.Dispose();
+ }
+ _currentStatus.SetResult(result);
+ if (result != null && result.Length > 0)
+ {
+ LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+ }
+ }
+ catch (Exception e)
+ {
+ if (_task != null)
+ {
+ _task.Dispose();
+ }
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e));
+ _currentStatus.SetException(e);
+ }
+ }
+
+ public TaskState GetTaskState()
+ {
+ return _currentStatus.State;
+ }
+
+ /// <summary>
+ /// Called by heartbeat manager
+ /// </summary>
+ /// <returns> current TaskStatusProto </returns>
+ public TaskStatusProto GetStatusProto()
+ {
+ return _currentStatus.ToProto();
+ }
+
+ public bool HasEnded()
+ {
+ return _currentStatus.HasEnded();
+ }
+
+ /// <summary>
+ /// get ID of the task.
+ /// </summary>
+ /// <returns>ID of the task.</returns>
+ public string GetActicityId()
+ {
+ return _currentStatus.TaskId;
+ }
+
+ public void Close(byte[] message)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId));
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new CloseEventImpl(message));
+ _currentStatus.SetCloseRequested();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER);
+
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e));
+ }
+ }
+ }
+
+ public void Suspend(byte[] message)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId));
+
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new SuspendEventImpl(message));
+ _currentStatus.SetSuspendRequested();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER);
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e));
+ }
+ }
+ }
+
+ public void Deliver(byte[] message)
+ {
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new DriverMessageImpl(message));
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER);
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e));
+ }
+ }
+ }
+
+ public void OnNext(ICloseEvent value)
+ {
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
+ // TODO: send a heartbeat
+ }
+
+ void IObserver<ICloseEvent>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IDriverMessage>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IDriverMessage>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ISuspendEvent>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ISuspendEvent>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ICloseEvent>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnNext(ISuspendEvent value)
+ {
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
+ // TODO: send a heartbeat
+ }
+
+ public void OnNext(IDriverMessage value)
+ {
+ IDriverMessageHandler messageHandler = null;
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)");
+ try
+ {
+ messageHandler = _injector.GetInstance<IDriverMessageHandler>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER);
+ }
+ if (messageHandler != null)
+ {
+ try
+ {
+ messageHandler.Handle(value);
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER);
+ _currentStatus.RecordExecptionWithoutHeartbeat(e);
+ }
+ }
+ }
+
+ private byte[] RunTask(byte[] memento)
+ {
+ return _task.Call(memento);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs
new file mode 100644
index 0000000..ad8002b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+
+namespace Org.Apache.REEF.Common
+{
+ public class TaskStartImpl : ITaskStart
+ {
+ //INJECT
+ public TaskStartImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}