You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:06:08 UTC

[47/51] [partial] incubator-reef git commit: [REEF-131] Towards the new .Net project structure This is to change .Net project structure for Tang, Wake, REEF utilities, Common and Driver:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs
new file mode 100644
index 0000000..3a0b474
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    public class ContextClientCodeException : Exception
+    {
+        private readonly string _contextId;
+        private readonly Optional<string> _parentId;
+
+        /// <summary>
+        /// construt the exception that caused the error
+        /// </summary>
+        /// <param name="contextId"> the id of the failed context.</param>
+        /// <param name="parentId"> the id of the failed context's parent, if any.</param>
+        /// <param name="message"> the error message </param>
+        /// <param name="cause"> the exception that caused the error</param>
+        public ContextClientCodeException(
+                string contextId,
+                Optional<string> parentId,
+                string message,
+                Exception cause)
+            : base("Failure in context '" + contextId + "': " + message, cause)
+        {
+            _contextId = contextId;
+            _parentId = parentId;
+        }
+
+        public string ContextId
+        {
+            get { return _contextId; }
+        }
+
+        public Optional<string> ParentId
+        {
+            get { return _parentId; }
+        }
+
+        /// <summary>
+        /// Extracts a context id from the given configuration.
+        /// </summary>
+        /// <param name="c"></param>
+        /// <returns>the context id in the given configuration.</returns>
+        public static string GetId(IConfiguration c)
+        {
+            // TODO: update after TANG is available
+            return string.Empty;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs
new file mode 100644
index 0000000..bcd7fb0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using Org.Apache.REEF.Tang.Types;
+
+namespace Org.Apache.REEF.Common.Evaluator.Context
+{
+    public class ContextConfiguration : IConfiguration
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration));
+        
+        private Dictionary<string, string> _settings;
+
+        public ContextConfiguration(string configString)
+        {
+            using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn"))
+            {
+                ContainerDirectory = Directory.GetCurrentDirectory();
+
+                _settings = new Dictionary<string, string>();
+                AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
+                foreach (ConfigurationEntry config in avroConfiguration.Bindings)
+                {
+                    if (config.key.Contains(REEF.Evaluator.Constants.ContextIdentifier))
+                    {
+                        config.key = REEF.Evaluator.Constants.ContextIdentifier;
+                        LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value));
+                    }
+                    _settings.Add(config.key, config.value);
+                }
+                if (!_settings.ContainsKey(REEF.Evaluator.Constants.ContextIdentifier))
+                {
+                    string msg = "Required parameter ContextIdentifier not provided.";
+                    LOGGER.Log(Level.Error, msg);
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
+                }
+            }
+        }
+
+        public string Id
+        {
+            get { return _settings[REEF.Evaluator.Constants.ContextIdentifier]; }
+        }
+
+        public string ContainerDirectory { get; set; }
+
+        public IConfigurationBuilder newBuilder()
+        {
+            throw new NotImplementedException();
+        }
+
+        public string GetNamedParameter(INamedParameterNode np)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IClassHierarchy GetClassHierarchy()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ISet<object> GetBoundSet(INamedParameterNode np)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IClassNode GetBoundConstructor(IClassNode cn)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IClassNode GetBoundImplementation(IClassNode cn)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IConstructorDef GetLegacyConstructor(IClassNode cn)
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<IClassNode> GetBoundImplementations()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<IClassNode> GetBoundConstructors()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<INamedParameterNode> GetNamedParameters()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<IClassNode> GetLegacyConstructors()
+        {
+            throw new NotImplementedException();
+        }
+
+        public IList<object> GetBoundList(INamedParameterNode np)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets()
+        {
+            throw new NotImplementedException();
+        }
+
+        public IDictionary<INamedParameterNode, IList<object>> GetBoundList()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs
new file mode 100644
index 0000000..97e65c0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Events;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    /// <summary>
+    /// This class is used to trigger all the context life-cycle dependent events.
+    /// </summary>
+    class ContextLifeCycle
+    {
+        private HashSet<IObserver<IContextStart>> _contextStartHandlers;
+
+        private HashSet<IObserver<IContextStop>> _contextStopHandlers;
+
+        private HashSet<IContextMessageSource> _contextMessageSources;
+
+        // @Inject
+        public ContextLifeCycle(
+            string id,
+            HashSet<IObserver<IContextStart>> contextStartHandlers,
+            HashSet<IObserver<IContextStop>> contextStopHandlers,
+            HashSet<IContextMessageSource> contextMessageSources)
+        {
+            Id = id;
+            _contextStartHandlers = contextStartHandlers;
+            _contextStopHandlers = contextStopHandlers;
+            _contextMessageSources = contextMessageSources;
+        }
+
+        public ContextLifeCycle(string contextId)
+        {
+            Id = contextId;
+            _contextStartHandlers = new HashSet<IObserver<IContextStart>>();
+            _contextStopHandlers = new HashSet<IObserver<IContextStop>>();
+            _contextMessageSources = new HashSet<IContextMessageSource>();
+        }
+
+        public string Id { get; private set; }
+
+        public HashSet<IContextMessageSource> ContextMessageSources
+        {
+            get { return _contextMessageSources; }
+        }
+
+        /// <summary>
+        /// Fires ContextStart to all registered event handlers.
+        /// </summary>
+        public void Start()
+        {
+            IContextStart contextStart = new ContextStartImpl(Id);
+            
+            // TODO: enable
+            //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers)
+            //{
+            //    startHandler.OnNext(contextStart);
+            //}
+        }
+
+        /// <summary>
+        /// Fires ContextStop to all registered event handlers.
+        /// </summary>
+        public void Close()
+        {
+            //IContextStop contextStop = new ContextStopImpl(Id);
+            //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers)
+            //{
+            //    startHandler.OnNext(contextStop);
+            //}
+        }
+
+        public void HandleContextMessage(byte[] message)
+        {
+            //contextMessageHandler.onNext(message);
+        }
+
+        /// <summary>
+        /// get the set of ContextMessageSources configured
+        /// </summary>
+        /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns>
+        public HashSet<IContextMessageSource> GetContextMessageSources()
+        {
+            return new HashSet<IContextMessageSource>(_contextMessageSources);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs
new file mode 100644
index 0000000..7c4d288
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Task;
+using Org.Apache.REEF.Evaluator;
+using Org.Apache.REEF.Services;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Globalization;
+using System.Linq;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    public class ContextManager : IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager));
+        
+        private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>();
+
+        private readonly HeartBeatManager _heartBeatManager;
+
+        private RootContextLauncher _rootContextLauncher;
+
+        public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
+        {
+            using (LOGGER.LogFunction("ContextManager::ContextManager"))
+            {
+                _heartBeatManager = heartBeatManager;
+                _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig);
+            }
+        }
+
+        /// <summary>
+        /// Start the context manager. This initiates the root context.
+        /// </summary>
+        public void Start()
+        {
+            lock (_contextStack)
+            {
+                ContextRuntime rootContext = _rootContextLauncher.GetRootContext();
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id));
+                _contextStack.Push(rootContext);
+
+                if (_rootContextLauncher.RootTaskConfig.IsPresent())
+                {
+                    LOGGER.Log(Level.Info, "Launching the initial Task");
+                    try
+                    {
+                        _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
+                    }
+                    catch (TaskClientCodeException e)
+                    {
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER);
+                        HandleTaskException(e);
+                    }
+                }
+            }
+        }
+
+        public bool ContextStackIsEmpty()
+        {
+            lock (_contextStack)
+            {
+                return (_contextStack.Count == 0);
+            }
+        }
+
+        // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later
+
+        /// <summary>
+        /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
+        /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
+        /// </summary>
+        /// <param name="controlMessage"></param>
+        public void HandleTaskControl(ContextControlProto controlMessage)
+        {
+            try
+            {
+                byte[] message = controlMessage.task_message;
+                if (controlMessage.add_context != null && controlMessage.remove_context != null)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER);
+                }
+                if (controlMessage.add_context != null)
+                {
+                    LOGGER.Log(Level.Info, "AddContext");
+                    AddContext(controlMessage.add_context);
+                    // support submitContextAndTask()
+                    if (controlMessage.start_task != null)
+                    {
+                        LOGGER.Log(Level.Info, "StartTask");
+                        StartTask(controlMessage.start_task);
+                    }
+                    else
+                    {
+                        // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime
+                        // Therefore this call can not go into addContext
+                        LOGGER.Log(Level.Info, "Trigger Heartbeat");
+                        _heartBeatManager.OnNext();
+                    }
+                }
+                else if (controlMessage.remove_context != null)
+                {
+                    LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id));
+                    RemoveContext(controlMessage.remove_context.context_id);
+                }
+                else if (controlMessage.start_task != null)
+                {
+                    LOGGER.Log(Level.Info, "StartTask only");
+                    StartTask(controlMessage.start_task);
+                }
+                else if (controlMessage.stop_task != null)
+                {
+                    LOGGER.Log(Level.Info, "CloseTask");
+                    _contextStack.Peek().CloseTask(message);
+                }
+                else if (controlMessage.suspend_task != null)
+                {
+                    LOGGER.Log(Level.Info, "SuspendTask");
+                    _contextStack.Peek().SuspendTask(message);
+                }
+                else if (controlMessage.task_message != null)
+                {
+                    LOGGER.Log(Level.Info, "DeliverTaskMessage");
+                    _contextStack.Peek().DeliverTaskMessage(message);
+                }
+                else if (controlMessage.context_message != null)
+                {
+                    LOGGER.Log(Level.Info, "Handle context contol message");
+                    ContextMessageProto contextMessageProto = controlMessage.context_message;
+                    bool deliveredMessage = false;
+                    foreach (ContextRuntime context in _contextStack)
+                    {
+                        if (context.Id.Equals(contextMessageProto.context_id))
+                        {
+                            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message));
+                            context.HandleContextMessaage(controlMessage.context_message.message);
+                            deliveredMessage = true;
+                            break;
+                        }
+                    }
+                    if (!deliveredMessage)
+                    {
+                        InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id));
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                    }
+                }
+                else
+                {
+                    InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString()));
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                } 
+            }
+            catch (Exception e)
+            {
+                if (e is TaskClientCodeException)
+                {
+                    HandleTaskException((TaskClientCodeException)e);
+                }
+                else if (e is ContextClientCodeException)
+                {
+                    HandlContextException((ContextClientCodeException)e);
+                }
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER);
+            }  
+        }
+
+        /// <summary>
+        /// Get TaskStatusProto of the currently running task, if there is any
+        /// </summary>
+        /// <returns>the TaskStatusProto of the currently running task, if there is any</returns>
+        public Optional<TaskStatusProto> GetTaskStatus()
+        {
+            if (_contextStack.Count == 0)
+            {
+                return Optional<TaskStatusProto>.Empty();
+
+                //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running.");
+            }
+            return _contextStack.Peek().GetTaskStatus();
+        }
+
+        /// <summary>
+        /// get status of all contexts in the stack.
+        /// </summary>
+        /// <returns>the status of all contexts in the stack.</returns>
+        public ICollection<ContextStatusProto> GetContextStatusCollection()
+        {
+            ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>();
+            foreach (ContextRuntime runtime in _contextStack)
+            {
+                ContextStatusProto contextStatusProto = runtime.GetContextStatus();
+                LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto));
+                result.Add(contextStatusProto);
+            }
+            return result;
+        }
+
+        /// <summary>
+        /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
+        /// starting at the top.
+        /// </summary>
+        public void Dispose()
+        {
+            lock (_contextStack)
+            {
+                if (_contextStack != null && _contextStack.Any())
+                {
+                    LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime.");
+                    ContextRuntime runtime = _contextStack.Last();
+                    if (runtime != null)
+                    {
+                        runtime.Dispose();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Add a context to the stack.
+        /// </summary>
+        /// <param name="addContextProto"></param>
+        private void AddContext(AddContextProto addContextProto)
+        {
+            lock (_contextStack)
+            {
+                ContextRuntime currentTopContext = _contextStack.Peek();
+                if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase))
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}",
+                        addContextProto.parent_context_id,
+                        currentTopContext.Id));
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                string contextConfigString = addContextProto.context_configuration;
+                ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString);
+                ContextRuntime newTopContext;
+                if (addContextProto.service_configuration != null)
+                {
+                    ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration);
+                    newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig);
+                }
+                else
+                {
+                    newTopContext = currentTopContext.SpawnChildContext(contextConfiguration);
+                }
+                _contextStack.Push(newTopContext);
+            }
+        }
+
+        /// <summary>
+        /// Remove the context with the given ID from the stack.
+        /// </summary>
+        /// <param name="contextId"> context id</param>
+        private void RemoveContext(string contextId)
+        {
+            lock (_contextStack)
+            {
+                string currentTopContextId = _contextStack.Peek().Id;
+                if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase))
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId));
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                _contextStack.Peek().Dispose();
+                if (_contextStack.Count > 1)
+                {
+                    // We did not close the root context. Therefore, we need to inform the
+                    // driver explicitly that this context is closed. The root context notification
+                    // is implicit in the Evaluator close/done notification.
+                    _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state
+                }
+                _contextStack.Pop();
+            }
+            //  System.gc(); // TODO: garbage collect?
+        }
+
+        /// <summary>
+        /// Launch an Task.
+        /// </summary>
+        /// <param name="startTaskProto"></param>
+        private void StartTask(StartTaskProto startTaskProto)
+        {
+            lock (_contextStack)
+            {
+                ContextRuntime currentActiveContext = _contextStack.Peek();
+                string expectedContextId = startTaskProto.context_id;
+                if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase))
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id));
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration);
+                currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager);
+            }
+        }
+
+        /// <summary>
+        ///  THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+        /// </summary>
+        /// <param name="e"></param>
+        private void HandleTaskException(TaskClientCodeException e)
+        {
+            LOGGER.Log(Level.Error, "TaskClientCodeException", e);
+            byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
+            TaskStatusProto taskStatus = new TaskStatusProto()
+            {
+                context_id = e.ContextId,
+                task_id = e.TaskId,
+                result = exception,
+                state = State.FAILED
+            };
+            LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString()));
+            _heartBeatManager.OnNext(taskStatus);
+        }
+
+        /// <summary>
+        /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+        /// </summary>
+        /// <param name="e"></param>
+        private void HandlContextException(ContextClientCodeException e)
+        {
+            LOGGER.Log(Level.Error, "ContextClientCodeException", e);
+            byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
+            ContextStatusProto contextStatusProto = new ContextStatusProto()
+            {
+                context_id = e.ContextId,
+                context_state = ContextStatusProto.State.FAIL,
+                error = exception
+            };
+            if (e.ParentId.IsPresent())
+            {
+                contextStatusProto.parent_id = e.ParentId.Value;
+            }
+            LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString()));
+            _heartBeatManager.OnNext(contextStatusProto);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs
new file mode 100644
index 0000000..9ed7a5c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Task;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    public class ContextRuntime
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime));
+        // Context-local injector. This contains information that will not be available in child injectors.
+        private readonly IInjector _contextInjector;
+        //// Service injector. State in this injector moves to child injectors.
+        private readonly IInjector _serviceInjector;
+
+        // Convenience class to hold all the event handlers for the context as well as the service instances.
+        private readonly ContextLifeCycle _contextLifeCycle;
+
+        // The child context, if any.
+        private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty();
+
+        // The parent context, if any.
+        private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty();
+
+        // The currently running task, if any.
+        private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty();
+
+        private ContextStatusProto.State _contextState = ContextStatusProto.State.READY;
+
+        /// <summary>
+        /// Create a new ContextRuntime.
+        /// </summary>
+        /// <param name="serviceInjector"></param>
+        /// <param name="contextConfiguration">the Configuration for this context.</param>
+        /// <param name="parentContext"></param>
+        public ContextRuntime(
+                IInjector serviceInjector,
+                IConfiguration contextConfiguration,
+                Optional<ContextRuntime> parentContext)
+        {
+            ContextConfiguration config = contextConfiguration as ContextConfiguration;
+            if (config == null)
+            {
+                var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration");
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+            }
+            _contextLifeCycle = new ContextLifeCycle(config.Id);
+            _serviceInjector = serviceInjector;
+            _parentContext = parentContext;
+            try
+            {
+                _contextInjector = serviceInjector.ForkInjector();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+
+                Optional<string> parentId = ParentContext.IsPresent() ?
+                    Optional<string>.Of(ParentContext.Value.Id) :
+                    Optional<string>.Empty();
+                ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e);
+                
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+            }
+            // Trigger the context start events on contextInjector.
+            _contextLifeCycle.Start();
+        }
+
+        /// <summary>
+        ///  Create a new ContextRuntime for the root context.
+        /// </summary>
+        /// <param name="serviceInjector"> </param> the serviceInjector to be used.
+        /// <param name="contextConfiguration"> the Configuration for this context.</param>
+        public ContextRuntime(
+            IInjector serviceInjector,
+            IConfiguration contextConfiguration)
+            : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty())
+        {
+            LOGGER.Log(Level.Info, "Instantiating root context");
+        }
+
+        public string Id
+        {
+            get { return _contextLifeCycle.Id; }
+        }
+
+        public Optional<ContextRuntime> ParentContext
+        {
+            get { return _parentContext; }
+        }
+
+        /// <summary>
+        ///  Spawns a new context.
+        ///  The new context will have a serviceInjector that is created by forking the one in this object with the given
+        ///  serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+        /// </summary>
+        /// <param name="contextConfiguration">the new context's context (local) Configuration.</param>
+        /// <param name="serviceConfiguration">the new context's service Configuration.</param>
+        /// <returns>a child context.</returns>
+        public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
+        {
+            ContextRuntime childContext = null;
+            lock (_contextLifeCycle)
+            {
+                if (_task.IsPresent())
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                if (_childContext.IsPresent())
+                {
+                    var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                try
+                {
+                    IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration });
+                    childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this));
+                    _childContext = Optional<ContextRuntime>.Of(childContext);
+                    return childContext;
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+
+                    Optional<string> parentId = ParentContext.IsPresent() ?
+                        Optional<string>.Of(ParentContext.Value.Id) :
+                        Optional<string>.Empty();
+                    ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e);
+                    
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+                }
+            }
+            return childContext;
+        }
+
+        /// <summary>
+        /// Spawns a new context without services of its own.
+        /// The new context will have a serviceInjector that is created by forking the one in this object. The
+        /// contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+        /// </summary>
+        /// <param name="contextConfiguration">the new context's context (local) Configuration.</param>
+        /// <returns> a child context.</returns>
+        public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (_task.IsPresent())
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                if (_childContext.IsPresent())
+                {
+                    var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                IInjector childServiceInjector = _serviceInjector.ForkInjector();
+                ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this));
+                _childContext = Optional<ContextRuntime>.Of(childContext);
+                return childContext;
+            }
+        }
+
+        /// <summary>
+        ///  Launches an Task on this context.
+        /// </summary>
+        /// <param name="taskConfiguration"></param>
+        /// <param name="contextId"></param>
+        /// <param name="heartBeatManager"></param>
+        public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager)
+        {
+            lock (_contextLifeCycle)
+            {
+                bool taskPresent = _task.IsPresent();
+                bool taskEnded = taskPresent && _task.Value.HasEnded();
+
+                LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded);
+                if (taskPresent)
+                {
+                    LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState());
+                }
+
+                if (taskEnded)
+                {
+                    // clean up state
+                    _task = Optional<TaskRuntime>.Empty();
+                    taskPresent = false;
+                }
+                if (taskPresent)
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                if (_childContext.IsPresent())
+                {
+                    var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                try
+                {
+                    IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig });
+                    LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString());
+                    TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class);
+                    taskRuntime.Initialize();
+                    System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start));                    
+                    _task = Optional<TaskRuntime>.Of(taskRuntime);
+                }
+                catch (Exception e)
+                {
+                    var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e);
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Close this context. If there is a child context, this recursively closes it before closing this context. If
+        /// there is an Task currently running, that will be closed.
+        /// </summary>
+        public void Dispose()
+        {
+            lock (_contextLifeCycle)
+            {
+                _contextState = ContextStatusProto.State.DONE;
+                if (_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed.");
+                    _task.Value.Close(null);
+                }
+                if (_childContext.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed.");
+                    _childContext.Value.Dispose();
+                }
+                _contextLifeCycle.Close();
+                if (_parentContext.IsPresent())
+                {
+                    ParentContext.Value.ResetChildContext();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Issue a suspend call to the Task
+        /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+        /// in the log.
+        /// </summary>
+        /// <param name="message"> the suspend message to deliver or null if there is none.</param>
+        public void SuspendTask(byte[] message)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (!_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored");
+                }
+                else
+                {
+                    _task.Value.Suspend(message);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Issue a close call to the Task
+        /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+        /// in the log.
+        /// </summary>
+        /// <param name="message">the close  message to deliver or null if there is none.</param>
+        public void CloseTask(byte[] message)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (!_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored");
+                }
+                else
+                {
+                    _task.Value.Close(message);
+                }
+            }
+        }
+
+        /// <summary>
+        ///  Deliver a message to the Task
+        /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+        /// in the log.
+        /// </summary>
+        /// <param name="message">the message to deliver or null if there is none.</param>
+        public void DeliverTaskMessage(byte[] message)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (!_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored");
+                }
+                else
+                {
+                    _task.Value.Deliver(message);
+                }
+            }
+        }
+
+        public void HandleContextMessaage(byte[] mesage)
+        {
+            _contextLifeCycle.HandleContextMessage(mesage);
+        }
+
+        /// <summary>
+        /// get state of the running Task
+        /// </summary>
+        /// <returns> the state of the running Task, if one is running.</returns>
+        public Optional<TaskStatusProto> GetTaskStatus()
+        {
+            lock (_contextLifeCycle)
+            {
+                if (_task.IsPresent())
+                {
+                    if (_task.Value.HasEnded())
+                    {
+                        _task = Optional<TaskRuntime>.Empty();
+                        return Optional<TaskStatusProto>.Empty();
+                    }
+                    else
+                    {
+                        TaskStatusProto taskStatusProto = _task.Value.GetStatusProto();
+                        if (taskStatusProto.state == State.RUNNING)
+                        {
+                            // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat 
+                            return Optional<TaskStatusProto>.Of(taskStatusProto);
+                        }
+                        var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state));
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                        return Optional<TaskStatusProto>.Empty();
+                    }
+                }
+                else
+                {
+                    return Optional<TaskStatusProto>.Empty();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Reset child context when parent is being closed
+        /// </summary>
+        public void ResetChildContext()
+        {
+            lock (_contextLifeCycle)
+            {
+                if (_childContext.IsPresent())
+                {
+                    _childContext = Optional<ContextRuntime>.Empty();
+                }
+                else
+                {
+                    var e = new InvalidOperationException("no child context set");
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// get context's status in protocol buffer
+        /// </summary>
+        /// <returns>this context's status in protocol buffer form.</returns>
+        public ContextStatusProto GetContextStatus()
+        {
+            lock (_contextLifeCycle)
+            {
+                ContextStatusProto contextStatusProto = new ContextStatusProto()
+                {
+                    context_id = Id,
+                    context_state = _contextState,
+                };
+                if (_parentContext.IsPresent())
+                {
+                    contextStatusProto.parent_id = _parentContext.Value.Id;
+                }
+
+                foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources)
+                {
+                    Optional<ContextMessage> contextMessageOptional = source.Message;
+                    if (contextMessageOptional.IsPresent())
+                    {
+                        ContextStatusProto.ContextMessageProto contextMessageProto
+                            = new ContextStatusProto.ContextMessageProto()
+                            {
+                                source_id = contextMessageOptional.Value.MessageSourceId,
+                            };
+                        contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes);
+                        contextStatusProto.context_message.Add(contextMessageProto);
+                    }
+                }
+                return contextStatusProto;
+            }
+        }
+    }
+}
+        ///// <summary>
+        ///// TODO: remove and use parameterless GetContextStatus above
+        ///// </summary>
+        ///// <returns>this context's status in protocol buffer form.</returns>
+        //public ContextStatusProto GetContextStatus(string contextId)
+        //{
+        //    ContextStatusProto contextStatusProto = new ContextStatusProto()
+        //    {
+        //        context_id = contextId,
+        //        context_state = _contextState,
+        //    };
+        //    return contextStatusProto;
+        //}
+
+        ////// TODO: remove and use injection
+        //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId)
+        //{
+        //    lock (_contextLifeCycle)
+        //    {
+        //        if (_task.IsPresent() && _task.Value.HasEnded())
+        //        {
+        //            // clean up state
+        //            _task = Optional<TaskRuntime>.Empty();
+        //        }
+        //        if (_task.IsPresent())
+        //        {
+        //            throw new InvalidOperationException(
+        //                string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+        //        }
+        //        if (_childContext.IsPresent())
+        //        {
+        //            throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+        //        }
+        //        try
+        //        {
+        //            // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration);
+        //            TaskRuntime taskRuntime  // taskInjector.getInstance(TaskRuntime.class);
+        //                = new TaskRuntime(task, heartBeatManager);
+        //            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId));
+        //            taskRuntime.Initialize(taskId, contextId);
+        //            taskRuntime.Start();
+        //            _task = Optional<TaskRuntime>.Of(taskRuntime);
+        //        }
+        //        catch (Exception e)
+        //        {
+        //            throw new InvalidOperationException("Unable to instantiate the new task");
+        //        }
+        //    }
+        //}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs
new file mode 100644
index 0000000..7c62a0b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Events;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    class ContextStartImpl : IContextStart
+    {
+        public ContextStartImpl(string id)
+        {
+            Id = id;
+        }
+
+        public string Id { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs
new file mode 100644
index 0000000..4df40b6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Events;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    class ContextStopImpl : IContextStop
+    {
+        public ContextStopImpl(string id)
+        {
+            Id = id;
+        }
+        
+        public string Id { get; set; }   
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs
new file mode 100644
index 0000000..e7daecb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Services;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Globalization;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+
+namespace Org.Apache.REEF.Common.Context
+{
+    /// <summary>
+    ///  Helper class that encapsulates the root context configuration: With or without services and an initial task.
+    /// </summary>
+    public sealed class RootContextLauncher
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher));
+        
+        private readonly IInjector _rootServiceInjector = null;
+
+        private ContextRuntime _rootContext = null;
+
+        private ContextConfiguration _rootContextConfiguration = null;
+
+        public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
+        {
+            _rootContextConfiguration = rootContextConfig;
+            _rootServiceInjector = InjectServices(rootServiceConfig);
+            RootTaskConfig = rootTaskConfig;
+        }
+
+        public Optional<TaskConfiguration> RootTaskConfig { get; set; }
+
+        public ContextConfiguration RootContextConfig
+        {
+            get { return _rootContextConfiguration; }
+            set { _rootContextConfiguration = value; }
+        }
+
+        public ContextRuntime GetRootContext()
+        {
+            if (_rootContext == null)
+            {
+                _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration);
+            }
+            return _rootContext;
+        }
+
+        private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig)
+        {
+            IInjector rootServiceInjector;
+
+            if (serviceConfig.IsPresent())
+            {
+                rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig);
+                InjectedServices services = null;
+                try
+                {
+                    services = rootServiceInjector.GetInstance<InjectedServices>();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER);
+                    InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace));
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+                }
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count));
+            }
+            else
+            {
+                rootServiceInjector = TangFactory.GetTang().NewInjector();
+                LOGGER.Log(Level.Info, "no service provided for injection.");
+            }
+            
+            return rootServiceInjector;
+        }
+
+        private ContextRuntime GetRootContext( 
+            IInjector rootServiceInjector,
+            IConfiguration rootContextConfiguration)
+        {
+            ContextRuntime result;
+            result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+            return result;
+        }
+    }
+}
+//if (rootServiceInjector != null)
+//{
+//    try
+//    {
+//        rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs);
+//    }
+//    catch (Exception e)
+//    {
+//        throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration),
+//                                             Optional<String>.Empty(),
+//                                             "Unable to instatiate the root context", e);
+//    }
+//    result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+//}
+//else
+//{
+//    result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration);
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs
new file mode 100644
index 0000000..fc50c73
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common
+{
+    public class CloseEventImpl : ICloseEvent
+    {
+        public CloseEventImpl()
+        {
+            Value = Optional<byte[]>.Empty();
+        }
+
+        public CloseEventImpl(byte[] bytes)
+        {
+            Value = Optional<byte[]>.OfNullable(bytes);
+        }
+
+        public Optional<byte[]> Value
+        {
+            get { return Value; }
+            set { value = Value; } 
+        }
+
+        public override string ToString()
+        {
+            return "CloseEvent{value=" + Value.ToString() + "}";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs
new file mode 100644
index 0000000..2b00aa2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator
+{
+    public class DriverMessageImpl : IDriverMessage
+    {
+        private Optional<byte[]> _value;
+ 
+        public DriverMessageImpl()
+        {
+            _value = Optional<byte[]>.Empty();
+        }
+
+        public DriverMessageImpl(byte[] bytes)
+        {
+            _value = Optional<byte[]>.OfNullable(bytes);
+        }
+
+        public Optional<byte[]> Message
+        {
+            get
+            {
+                return _value;
+            }
+        }
+
+        public override string ToString()
+        {
+            return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs
new file mode 100644
index 0000000..a6bb52f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common
+{
+    public class SuspendEventImpl : ICloseEvent
+    {
+        public SuspendEventImpl()
+        {
+            Value = Optional<byte[]>.Empty();
+        }
+
+        public SuspendEventImpl(byte[] bytes)
+        {
+            Value = Optional<byte[]>.OfNullable(bytes);
+        }
+
+        public Optional<byte[]> Value
+        {
+            get { return Value; }
+            set { value = Value; }
+        }
+
+        public override string ToString()
+        {
+            return "SuspendEvent{value=" + Value.ToString() + "}";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs
new file mode 100644
index 0000000..22bdbd3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Common.Task
+{
+    public class TaskClientCodeException : Exception
+    {
+        private readonly string _taskId;
+
+        private readonly string _contextId;
+
+        /// <summary>
+        /// construct the exception that caused the Task to fail
+        /// </summary>
+        /// <param name="taskId"> the id of the failed task.</param>
+        /// <param name="contextId"> the id of the context the failed Task was executing in.</param>
+        /// <param name="message"> the error message </param>
+        /// <param name="cause"> the exception that caused the Task to fail.</param>
+        public TaskClientCodeException(
+                string taskId,
+                string contextId,
+                string message,
+                Exception cause)
+            : base(message, cause)
+        {
+            _taskId = taskId;
+            _contextId = contextId;
+        }
+
+        public string TaskId 
+        {
+            get { return _taskId; }
+        }
+
+        public string ContextId
+        {
+            get { return _contextId; }
+        }
+
+        public static string GetTaskIdentifier(IConfiguration c)
+        {
+            // TODO: update after TANG is available
+            return string.Empty;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs
new file mode 100644
index 0000000..26e638d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Wake;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Common
+{
+    public class TaskLifeCycle
+    {
+        private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers;
+        private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers;
+        private readonly ITaskStart _taskStart;
+        private readonly ITaskStop _taskStop;
+
+        // INJECT
+        public TaskLifeCycle(
+            HashSet<IObserver<ITaskStop>> taskStopHandlers,
+            HashSet<IObserver<ITaskStart>> taskStartHandlers,
+            TaskStartImpl taskStart,
+            TaskStopImpl taskStop)
+        {
+            _taskStartHandlers = taskStartHandlers;
+            _taskStopHandlers = taskStopHandlers;
+            _taskStart = taskStart;
+            _taskStop = taskStop;
+        }
+
+        public TaskLifeCycle()
+        {
+            _taskStartHandlers = new HashSet<IObserver<ITaskStart>>();
+            _taskStopHandlers = new HashSet<IObserver<ITaskStop>>();
+        }
+        
+        public void Start() 
+        {
+            foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers) 
+            {
+                startHandler.OnNext(_taskStart);
+            }
+        }
+
+        public void Stop() 
+        {
+            foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers)
+            {
+                stopHandler.OnNext(_taskStop);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs
new file mode 100644
index 0000000..d531df7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Runtime.Evaluator;
+using Org.Apache.REEF.Common.Task;
+using Org.Apache.REEF.Tasks;
+using Org.Apache.REEF.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Common
+{
+    public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime));
+        
+        private readonly ITask _task;
+
+        private readonly IInjector _injector;
+
+        // The memento given by the task configuration
+        private readonly Optional<byte[]> _memento;
+
+        private readonly HeartBeatManager _heartBeatManager;
+
+        private readonly TaskStatus _currentStatus;
+
+        private readonly INameClient _nameClient;
+
+        public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null)
+        {
+            _injector = taskInjector;
+            _heartBeatManager = heartBeatManager;
+
+            Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty();
+            try
+            {
+                _task = _injector.GetInstance<ITask>();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER);
+            }
+            try
+            {
+                ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>();
+                messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource });
+            }
+            catch (Exception e)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER);
+                // do not rethrow since this is benign
+            }
+            try
+            {
+                _nameClient = _injector.GetInstance<INameClient>();
+                _heartBeatManager.EvaluatorSettings.NameClient = _nameClient;
+            }
+            catch (InjectionException)
+            {
+                LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration.");
+                // do not rethrow since user is not required to provide name client
+            }
+
+            LOGGER.Log(Level.Info, "task message source injected");
+            _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources);
+            _memento = memento == null ?
+                Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento));
+        }
+
+        public string TaskId
+        {
+            get { return _currentStatus.TaskId; }
+        }
+
+        public string ContextId
+        {
+            get { return _currentStatus.ContextId; }
+        }
+
+        public void Initialize()
+        {
+            _currentStatus.SetRunning();
+        }
+
+        /// <summary>
+        /// Run the task
+        /// </summary>
+        public void Start()
+        {
+            try
+            {
+                LOGGER.Log(Level.Info, "Call Task");
+                if (_currentStatus.IsNotRunning())
+                {
+                    var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State);
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                byte[] result;
+                byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null;
+                System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento));
+                try
+                {
+                    runTask.Start();
+                    runTask.Wait();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER);
+                }
+                result = runTask.Result;
+
+                LOGGER.Log(Level.Info, "Task Call Finished");
+                if (_task != null)
+                {
+                    _task.Dispose();
+                }
+                _currentStatus.SetResult(result);
+                if (result != null && result.Length > 0)
+                {
+                    LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+                }
+            }
+            catch (Exception e)
+            {
+                if (_task != null)
+                {
+                    _task.Dispose();
+                }
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e));
+                _currentStatus.SetException(e);
+            }
+        }
+
+        public TaskState GetTaskState()
+        {
+            return _currentStatus.State;
+        }
+
+        /// <summary>
+        /// Called by heartbeat manager
+        /// </summary>
+        /// <returns>  current TaskStatusProto </returns>
+        public TaskStatusProto GetStatusProto()
+        {
+            return _currentStatus.ToProto();
+        }
+
+        public bool HasEnded()
+        {
+            return _currentStatus.HasEnded();
+        }
+
+        /// <summary>
+        /// get ID of the task.
+        /// </summary>
+        /// <returns>ID of the task.</returns>
+        public string GetActicityId()
+        {
+            return _currentStatus.TaskId;
+        }
+
+        public void Close(byte[] message)
+        {
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId));
+            if (_currentStatus.IsNotRunning())
+            {
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State));
+            }
+            else
+            {
+                try
+                {
+                    OnNext(new CloseEventImpl(message));
+                    _currentStatus.SetCloseRequested();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER);
+
+                    _currentStatus.SetException(
+                        new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e));
+                }
+            }
+        }
+
+        public void Suspend(byte[] message)
+        {
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId));
+            
+            if (_currentStatus.IsNotRunning())
+            {
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State));
+            }
+            else
+            {
+                try
+                {
+                    OnNext(new SuspendEventImpl(message));
+                    _currentStatus.SetSuspendRequested();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER);
+                    _currentStatus.SetException(
+                        new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e));
+                }
+            }
+        }
+
+        public void Deliver(byte[] message)
+        {
+            if (_currentStatus.IsNotRunning())
+            {
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State));
+            }
+            else
+            {
+                try
+                {
+                    OnNext(new DriverMessageImpl(message));
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER);
+                    _currentStatus.SetException(
+                        new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e));
+                }
+            }
+        }
+
+        public void OnNext(ICloseEvent value)
+        {
+            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
+            // TODO: send a heartbeat
+        }
+
+        void IObserver<ICloseEvent>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<IDriverMessage>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<IDriverMessage>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ISuspendEvent>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ISuspendEvent>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ICloseEvent>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnNext(ISuspendEvent value)
+        {
+            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
+            // TODO: send a heartbeat
+        }
+
+        public void OnNext(IDriverMessage value)
+        {
+            IDriverMessageHandler messageHandler = null;
+            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)");
+            try
+            {
+                messageHandler = _injector.GetInstance<IDriverMessageHandler>();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER);
+            }
+            if (messageHandler != null)
+            {
+                try
+                {
+                    messageHandler.Handle(value);
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER);
+                    _currentStatus.RecordExecptionWithoutHeartbeat(e);
+                }
+            }
+        }
+
+        private byte[] RunTask(byte[] memento)
+        {
+            return _task.Call(memento);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs
new file mode 100644
index 0000000..ad8002b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tasks.Events;
+
+namespace Org.Apache.REEF.Common
+{
+    public class TaskStartImpl : ITaskStart
+    {        
+        //INJECT
+        public TaskStartImpl(string id)
+        {
+            Id = id;
+        }
+
+        public string Id { get; set; } 
+    }
+}