You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:43:11 UTC

[27/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code base

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
new file mode 100644
index 0000000..e495fda
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Context;
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Common.Runtime;
+using Org.Apache.Reef.Evaluator;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Impl;
+using Org.Apache.Reef.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.Linq;
+using System.Net;
+using System.Threading;
+
+namespace Org.Apache.Reef.Common
+{
+    public class HeartBeatManager : IObserver<Alarm>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager));
+
+        private static readonly MachineStatus MachineStatus = new MachineStatus();
+
+        private readonly IRemoteManager<REEFMessage> _remoteManager;
+
+        private readonly IClock _clock;
+
+        private readonly int _heartBeatPeriodInMillSeconds;
+
+        private readonly int _maxHeartbeatRetries = 0;
+
+        private readonly string _evaluatorId;
+
+        private IRemoteIdentifier _remoteId;
+
+        private IObserver<REEFMessage> _observer;
+
+        private int _heartbeatFailures = 0;
+
+        private IDriverConnection _driverConnection;
+
+        private EvaluatorSettings _evaluatorSettings;
+
+        // the queue can only contains the following:
+        // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state
+        // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat)
+        private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>();
+
+        public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId)
+        {
+            using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager"))
+            {
+                _remoteManager = settings.RemoteManager;
+                _remoteId = remoteId;
+                _evaluatorId = settings.EvalutorId;
+                _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
+                _clock = settings.RuntimeClock;
+                _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
+                _maxHeartbeatRetries = settings.MaxHeartbeatFailures;
+                EvaluatorSettings = settings;
+                MachineStatus.ToString(); // kick start the CPU perf counter
+            }
+        }
+
+        [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
+        public EvaluatorRuntime _evaluatorRuntime { get; set; }
+
+        [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
+        public ContextManager _contextManager { get; set; }
+
+        public EvaluatorSettings EvaluatorSettings
+        {
+            get
+            {
+                return _evaluatorSettings;
+            }
+
+            private set
+            {
+                _evaluatorSettings = value;
+            }
+        }
+
+        public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
+        {
+            lock (_queuedHeartbeats)
+            {
+                if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
+                {
+                    LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto));
+                    _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
+                    return;
+                }
+
+                // NOT during recovery, try to send
+                REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto);
+                try
+                {
+                    _observer.OnNext(payload);
+                    _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
+                }
+                catch (Exception e)
+                {
+                    if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING)
+                    {
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER);
+                    }
+
+                    _heartbeatFailures++;
+
+                    _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
+                    LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
+
+                    if (_heartbeatFailures >= _maxHeartbeatRetries)
+                    {
+                        LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures));
+                        try
+                        {
+                            _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>();
+                        }
+                        catch (Exception ex)
+                        {
+                            Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER);
+                        }
+                        LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
+                        _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
+
+                        // clean heartbeat failure
+                        _heartbeatFailures = 0;
+                    }
+                }
+            }     
+        }
+
+        /// <summary>
+        /// Assemble a complete new heartbeat and send it out.
+        /// </summary>
+        public void OnNext()
+        {
+            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()");
+                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto();
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        /// <summary>
+        /// Called with a specific TaskStatus that must be delivered to the driver
+        /// </summary>
+        /// <param name="taskStatusProto"></param>
+        public void OnNext(TaskStatusProto taskStatusProto)
+        {
+            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)");
+                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
+                    _evaluatorRuntime.GetEvaluatorStatus(),
+                    _contextManager.GetContextStatusCollection(),
+                     Optional<TaskStatusProto>.Of(taskStatusProto));
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        /// <summary>
+        ///  Called with a specific ContextStatusProto that must be delivered to the driver
+        /// </summary>
+        /// <param name="contextStatusProto"></param>
+        public void OnNext(ContextStatusProto contextStatusProto)
+        {
+            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)");
+                List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>();
+                contextStatusProtos.Add(contextStatusProto);
+                contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection());
+                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
+                    _evaluatorRuntime.GetEvaluatorStatus(),
+                    contextStatusProtos,
+                    Optional<TaskStatusProto>.Empty());
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        /// <summary>
+        /// Called with a specific EvaluatorStatus that must be delivered to the driver
+        /// </summary>
+        /// <param name="evaluatorStatusProto"></param>
+        public void OnNext(EvaluatorStatusProto evaluatorStatusProto)
+        {
+            LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)");
+                EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto()
+                {
+                    timestamp = CurrentTimeMilliSeconds(),
+                    evaluator_status = evaluatorStatusProto
+                };
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        public void OnNext(Alarm value)
+        {
+            LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)");
+                if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING)
+                {
+                    EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
+                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString()));
+                    Send(evaluatorHeartbeatProto);
+                    _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
+                }
+                else
+                {
+                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState,  _evaluatorRuntime.State));
+                    try
+                    {
+                        DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId);
+                        if (driverInformation == null)
+                        {
+                            LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
+                        }
+                        else
+                        {
+                            LOGGER.Log(
+                                Level.Info, 
+                                string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
+                            Recover(driverInformation);
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        // we do not want any exception to stop the query for driver status
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER);
+                    }
+                    _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
+                }
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        private static long CurrentTimeMilliSeconds()
+        {
+            // this is an implmenation to get current time milli second counted from Jan 1st, 1970
+            // it is chose as such to be compatible with java implmentation
+            DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+            return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
+        }
+
+        private void Recover(DriverInformation driverInformation)
+        {
+            IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier);
+            _remoteId = new SocketRemoteIdentifier(driverEndpoint);
+            _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
+            lock (_evaluatorSettings)
+            {
+                if (_evaluatorSettings.NameClient != null)
+                {
+                    try
+                    {
+                        LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId);
+                        _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId));
+                        LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId);
+                    }
+                    catch (Exception e)
+                    {
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+                    }
+                }
+            }
+
+            lock (_queuedHeartbeats)
+            {
+                bool firstHeartbeatInQueue = true;
+                while (_queuedHeartbeats.Any())
+                {
+                    LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId);
+                    try
+                    {
+                        if (firstHeartbeatInQueue)
+                        {
+                            // first heartbeat is specially construted to include the recovery flag
+                            EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
+                            LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
+                            _observer.OnNext(new REEFMessage(recoveryHeartbeat));
+                            firstHeartbeatInQueue = false;
+                        }
+                        else
+                        {
+                            _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue()));
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        // we do not handle failures during RECOVERY 
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(
+                            e,
+                            Level.Error,
+                            string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId),
+                            LOGGER);
+                    }
+                    Thread.Sleep(500);
+                }
+            }        
+            _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
+            LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
+        }
+
+        private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
+        {
+            heartbeat.recovery = true;
+            heartbeat.context_status.ForEach(c => c.recovery = true);
+            heartbeat.task_status.recovery = true;
+            return heartbeat;
+        }
+
+        private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto()
+        {
+            return GetEvaluatorHeartbeatProto(
+                _evaluatorRuntime.GetEvaluatorStatus(),
+                _contextManager.GetContextStatusCollection(),
+                _contextManager.GetTaskStatus());
+        }
+
+        private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto(
+            EvaluatorStatusProto evaluatorStatusProto,
+            ICollection<ContextStatusProto> contextStatusProtos,
+            Optional<TaskStatusProto> taskStatusProto)
+        {
+            EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto()
+            {
+                timestamp = CurrentTimeMilliSeconds(),
+                evaluator_status = evaluatorStatusProto
+            };
+            foreach (ContextStatusProto contextStatusProto in contextStatusProtos)
+            {
+                evaluatorHeartbeatProto.context_status.Add(contextStatusProto);
+            }
+            if (taskStatusProto.IsPresent())
+            {
+                evaluatorHeartbeatProto.task_status = taskStatusProto.Value;
+            }
+            return evaluatorHeartbeatProto;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
new file mode 100644
index 0000000..8a7aa94
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Remote;
+using System;
+using System.Globalization;
+using System.Threading;
+
+namespace Org.Apache.Reef.Common
+{
+    public class ReefMessageProtoObserver :
+        IObserver<IRemoteMessage<REEFMessage>>,
+        IObservable<IRemoteMessage<REEFMessage>>,
+        IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver));
+        private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null;
+        private long _count = 0;
+        private DateTime _begin;
+        private DateTime _origBegin;
+
+        public void OnCompleted()
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnNext(IRemoteMessage<REEFMessage> value)
+        {
+            REEFMessage remoteEvent = value.Message;
+            IRemoteIdentifier id = value.Identifier;
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id));
+
+            if (remoteEvent.evaluatorControl != null)
+            {
+                if (remoteEvent.evaluatorControl.context_control != null)
+                {
+                    string context_message = null;
+                    string task_message = null;
+
+                    if (remoteEvent.evaluatorControl.context_control.context_message != null)
+                    {
+                        context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString();
+                    }
+                    if (remoteEvent.evaluatorControl.context_control.task_message != null)
+                    {
+                        task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message);
+                    }
+
+                    if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message)))
+                    {
+                        LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message));
+                    }                   
+                    else if (remoteEvent.evaluatorControl.context_control.remove_context != null)
+                    {
+                         LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id));
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.add_context != null)
+                    {
+                        LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id));
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.start_task != null)
+                    {
+                        LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id));
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.stop_task != null)
+                    {
+                        LOGGER.Log(Level.Info, "Control protobuf to stop task");
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.suspend_task != null)
+                    {
+                        LOGGER.Log(Level.Info, "Control protobuf to suspend task"); 
+                    }
+                }
+            } 
+            if (_count == 0)
+            {
+                _begin = DateTime.Now;
+                _origBegin = _begin;
+            }
+            var count = Interlocked.Increment(ref _count);
+
+            int printBatchSize = 100000;
+            if (count % printBatchSize == 0)
+            {
+                DateTime end = DateTime.Now;
+                var diff = (end - _begin).TotalMilliseconds;
+                double seconds = diff / 1000.0;
+                long eventsPerSecond = (long)(printBatchSize / seconds);
+                _begin = DateTime.Now;
+            }
+
+            var observer = _observer;
+            if (observer != null)
+            {
+                observer.OnNext(value);
+            }
+        }
+
+        public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer)
+        {
+            if (_observer != null)
+            {
+                return null;
+            }
+            _observer = observer;
+            return this;
+        }
+
+        public void Dispose()
+        {
+            _observer = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
new file mode 100644
index 0000000..31194a7
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Tang.Interface;
+using System;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    public class ContextClientCodeException : Exception
+    {
+        private readonly string _contextId;
+        private readonly Optional<string> _parentId;
+
+        /// <summary>
+        /// construt the exception that caused the error
+        /// </summary>
+        /// <param name="contextId"> the id of the failed context.</param>
+        /// <param name="parentId"> the id of the failed context's parent, if any.</param>
+        /// <param name="message"> the error message </param>
+        /// <param name="cause"> the exception that caused the error</param>
+        public ContextClientCodeException(
+                string contextId,
+                Optional<string> parentId,
+                string message,
+                Exception cause)
+            : base("Failure in context '" + contextId + "': " + message, cause)
+        {
+            _contextId = contextId;
+            _parentId = parentId;
+        }
+
+        public string ContextId
+        {
+            get { return _contextId; }
+        }
+
+        public Optional<string> ParentId
+        {
+            get { return _parentId; }
+        }
+
+        /// <summary>
+        /// Extracts a context id from the given configuration.
+        /// </summary>
+        /// <param name="c"></param>
+        /// <returns>the context id in the given configuration.</returns>
+        public static string GetId(IConfiguration c)
+        {
+            // TODO: update after TANG is available
+            return string.Empty;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
new file mode 100644
index 0000000..ca6b949
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using Org.Apache.Reef.Tang.Types;
+
+namespace Org.Apache.Reef.Common.Evaluator.Context
+{
+    public class ContextConfiguration : IConfiguration
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration));
+        
+        private Dictionary<string, string> _settings;
+
+        public ContextConfiguration(string configString)
+        {
+            using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn"))
+            {
+                ContainerDirectory = Directory.GetCurrentDirectory();
+
+                _settings = new Dictionary<string, string>();
+                AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
+                foreach (ConfigurationEntry config in avroConfiguration.Bindings)
+                {
+                    if (config.key.Contains(Reef.Evaluator.Constants.ContextIdentifier))
+                    {
+                        config.key = Reef.Evaluator.Constants.ContextIdentifier;
+                        LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value));
+                    }
+                    _settings.Add(config.key, config.value);
+                }
+                if (!_settings.ContainsKey(Reef.Evaluator.Constants.ContextIdentifier))
+                {
+                    string msg = "Required parameter ContextIdentifier not provided.";
+                    LOGGER.Log(Level.Error, msg);
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
+                }
+            }
+        }
+
+        public string Id
+        {
+            get { return _settings[Reef.Evaluator.Constants.ContextIdentifier]; }
+        }
+
+        public string ContainerDirectory { get; set; }
+
+        public IConfigurationBuilder newBuilder()
+        {
+            throw new NotImplementedException();
+        }
+
+        public string GetNamedParameter(INamedParameterNode np)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IClassHierarchy GetClassHierarchy()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ISet<object> GetBoundSet(INamedParameterNode np)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IClassNode GetBoundConstructor(IClassNode cn)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IClassNode GetBoundImplementation(IClassNode cn)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IConstructorDef GetLegacyConstructor(IClassNode cn)
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<IClassNode> GetBoundImplementations()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<IClassNode> GetBoundConstructors()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<INamedParameterNode> GetNamedParameters()
+        {
+            throw new NotImplementedException();
+        }
+
+        public ICollection<IClassNode> GetLegacyConstructors()
+        {
+            throw new NotImplementedException();
+        }
+
+        public IList<object> GetBoundList(INamedParameterNode np)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets()
+        {
+            throw new NotImplementedException();
+        }
+
+        public IDictionary<INamedParameterNode, IList<object>> GetBoundList()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
new file mode 100644
index 0000000..9967258
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Events;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    /// <summary>
+    /// This class is used to trigger all the context life-cycle dependent events.
+    /// </summary>
+    class ContextLifeCycle
+    {
+        private HashSet<IObserver<IContextStart>> _contextStartHandlers;
+
+        private HashSet<IObserver<IContextStop>> _contextStopHandlers;
+
+        private HashSet<IContextMessageSource> _contextMessageSources;
+
+        // @Inject
+        public ContextLifeCycle(
+            string id,
+            HashSet<IObserver<IContextStart>> contextStartHandlers,
+            HashSet<IObserver<IContextStop>> contextStopHandlers,
+            HashSet<IContextMessageSource> contextMessageSources)
+        {
+            Id = id;
+            _contextStartHandlers = contextStartHandlers;
+            _contextStopHandlers = contextStopHandlers;
+            _contextMessageSources = contextMessageSources;
+        }
+
+        public ContextLifeCycle(string contextId)
+        {
+            Id = contextId;
+            _contextStartHandlers = new HashSet<IObserver<IContextStart>>();
+            _contextStopHandlers = new HashSet<IObserver<IContextStop>>();
+            _contextMessageSources = new HashSet<IContextMessageSource>();
+        }
+
+        public string Id { get; private set; }
+
+        public HashSet<IContextMessageSource> ContextMessageSources
+        {
+            get { return _contextMessageSources; }
+        }
+
+        /// <summary>
+        /// Fires ContextStart to all registered event handlers.
+        /// </summary>
+        public void Start()
+        {
+            IContextStart contextStart = new ContextStartImpl(Id);
+            
+            // TODO: enable
+            //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers)
+            //{
+            //    startHandler.OnNext(contextStart);
+            //}
+        }
+
+        /// <summary>
+        /// Fires ContextStop to all registered event handlers.
+        /// </summary>
+        public void Close()
+        {
+            //IContextStop contextStop = new ContextStopImpl(Id);
+            //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers)
+            //{
+            //    startHandler.OnNext(contextStop);
+            //}
+        }
+
+        public void HandleContextMessage(byte[] message)
+        {
+            //contextMessageHandler.onNext(message);
+        }
+
+        /// <summary>
+        /// get the set of ContextMessageSources configured
+        /// </summary>
+        /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns>
+        public HashSet<IContextMessageSource> GetContextMessageSources()
+        {
+            return new HashSet<IContextMessageSource>(_contextMessageSources);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
new file mode 100644
index 0000000..15b09b9
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Evaluator.Context;
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Common.Task;
+using Org.Apache.Reef.Evaluator;
+using Org.Apache.Reef.Services;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Globalization;
+using System.Linq;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    public class ContextManager : IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager));
+        
+        private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>();
+
+        private readonly HeartBeatManager _heartBeatManager;
+
+        private RootContextLauncher _rootContextLauncher;
+
+        public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
+        {
+            using (LOGGER.LogFunction("ContextManager::ContextManager"))
+            {
+                _heartBeatManager = heartBeatManager;
+                _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig);
+            }
+        }
+
+        /// <summary>
+        /// Start the context manager. This initiates the root context.
+        /// </summary>
+        public void Start()
+        {
+            lock (_contextStack)
+            {
+                ContextRuntime rootContext = _rootContextLauncher.GetRootContext();
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id));
+                _contextStack.Push(rootContext);
+
+                if (_rootContextLauncher.RootTaskConfig.IsPresent())
+                {
+                    LOGGER.Log(Level.Info, "Launching the initial Task");
+                    try
+                    {
+                        _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
+                    }
+                    catch (TaskClientCodeException e)
+                    {
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER);
+                        HandleTaskException(e);
+                    }
+                }
+            }
+        }
+
+        public bool ContextStackIsEmpty()
+        {
+            lock (_contextStack)
+            {
+                return (_contextStack.Count == 0);
+            }
+        }
+
+        // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later
+
+        /// <summary>
+        /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
+        /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
+        /// </summary>
+        /// <param name="controlMessage"></param>
+        public void HandleTaskControl(ContextControlProto controlMessage)
+        {
+            try
+            {
+                byte[] message = controlMessage.task_message;
+                if (controlMessage.add_context != null && controlMessage.remove_context != null)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER);
+                }
+                if (controlMessage.add_context != null)
+                {
+                    LOGGER.Log(Level.Info, "AddContext");
+                    AddContext(controlMessage.add_context);
+                    // support submitContextAndTask()
+                    if (controlMessage.start_task != null)
+                    {
+                        LOGGER.Log(Level.Info, "StartTask");
+                        StartTask(controlMessage.start_task);
+                    }
+                    else
+                    {
+                        // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime
+                        // Therefore this call can not go into addContext
+                        LOGGER.Log(Level.Info, "Trigger Heartbeat");
+                        _heartBeatManager.OnNext();
+                    }
+                }
+                else if (controlMessage.remove_context != null)
+                {
+                    LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id));
+                    RemoveContext(controlMessage.remove_context.context_id);
+                }
+                else if (controlMessage.start_task != null)
+                {
+                    LOGGER.Log(Level.Info, "StartTask only");
+                    StartTask(controlMessage.start_task);
+                }
+                else if (controlMessage.stop_task != null)
+                {
+                    LOGGER.Log(Level.Info, "CloseTask");
+                    _contextStack.Peek().CloseTask(message);
+                }
+                else if (controlMessage.suspend_task != null)
+                {
+                    LOGGER.Log(Level.Info, "SuspendTask");
+                    _contextStack.Peek().SuspendTask(message);
+                }
+                else if (controlMessage.task_message != null)
+                {
+                    LOGGER.Log(Level.Info, "DeliverTaskMessage");
+                    _contextStack.Peek().DeliverTaskMessage(message);
+                }
+                else if (controlMessage.context_message != null)
+                {
+                    LOGGER.Log(Level.Info, "Handle context contol message");
+                    ContextMessageProto contextMessageProto = controlMessage.context_message;
+                    bool deliveredMessage = false;
+                    foreach (ContextRuntime context in _contextStack)
+                    {
+                        if (context.Id.Equals(contextMessageProto.context_id))
+                        {
+                            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message));
+                            context.HandleContextMessaage(controlMessage.context_message.message);
+                            deliveredMessage = true;
+                            break;
+                        }
+                    }
+                    if (!deliveredMessage)
+                    {
+                        InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id));
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                    }
+                }
+                else
+                {
+                    InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString()));
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                } 
+            }
+            catch (Exception e)
+            {
+                if (e is TaskClientCodeException)
+                {
+                    HandleTaskException((TaskClientCodeException)e);
+                }
+                else if (e is ContextClientCodeException)
+                {
+                    HandlContextException((ContextClientCodeException)e);
+                }
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER);
+            }  
+        }
+
+        /// <summary>
+        /// Get TaskStatusProto of the currently running task, if there is any
+        /// </summary>
+        /// <returns>the TaskStatusProto of the currently running task, if there is any</returns>
+        public Optional<TaskStatusProto> GetTaskStatus()
+        {
+            if (_contextStack.Count == 0)
+            {
+                return Optional<TaskStatusProto>.Empty();
+
+                //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running.");
+            }
+            return _contextStack.Peek().GetTaskStatus();
+        }
+
+        /// <summary>
+        /// get status of all contexts in the stack.
+        /// </summary>
+        /// <returns>the status of all contexts in the stack.</returns>
+        public ICollection<ContextStatusProto> GetContextStatusCollection()
+        {
+            ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>();
+            foreach (ContextRuntime runtime in _contextStack)
+            {
+                ContextStatusProto contextStatusProto = runtime.GetContextStatus();
+                LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto));
+                result.Add(contextStatusProto);
+            }
+            return result;
+        }
+
+        /// <summary>
+        /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
+        /// starting at the top.
+        /// </summary>
+        public void Dispose()
+        {
+            lock (_contextStack)
+            {
+                if (_contextStack != null && _contextStack.Any())
+                {
+                    LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime.");
+                    ContextRuntime runtime = _contextStack.Last();
+                    if (runtime != null)
+                    {
+                        runtime.Dispose();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Add a context to the stack.
+        /// </summary>
+        /// <param name="addContextProto"></param>
+        private void AddContext(AddContextProto addContextProto)
+        {
+            lock (_contextStack)
+            {
+                ContextRuntime currentTopContext = _contextStack.Peek();
+                if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase))
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}",
+                        addContextProto.parent_context_id,
+                        currentTopContext.Id));
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                string contextConfigString = addContextProto.context_configuration;
+                ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString);
+                ContextRuntime newTopContext;
+                if (addContextProto.service_configuration != null)
+                {
+                    ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration);
+                    newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig);
+                }
+                else
+                {
+                    newTopContext = currentTopContext.SpawnChildContext(contextConfiguration);
+                }
+                _contextStack.Push(newTopContext);
+            }
+        }
+
+        /// <summary>
+        /// Remove the context with the given ID from the stack.
+        /// </summary>
+        /// <param name="contextId"> context id</param>
+        private void RemoveContext(string contextId)
+        {
+            lock (_contextStack)
+            {
+                string currentTopContextId = _contextStack.Peek().Id;
+                if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase))
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId));
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                _contextStack.Peek().Dispose();
+                if (_contextStack.Count > 1)
+                {
+                    // We did not close the root context. Therefore, we need to inform the
+                    // driver explicitly that this context is closed. The root context notification
+                    // is implicit in the Evaluator close/done notification.
+                    _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state
+                }
+                _contextStack.Pop();
+            }
+            //  System.gc(); // TODO: garbage collect?
+        }
+
+        /// <summary>
+        /// Launch an Task.
+        /// </summary>
+        /// <param name="startTaskProto"></param>
+        private void StartTask(StartTaskProto startTaskProto)
+        {
+            lock (_contextStack)
+            {
+                ContextRuntime currentActiveContext = _contextStack.Peek();
+                string expectedContextId = startTaskProto.context_id;
+                if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase))
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id));
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration);
+                currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager);
+            }
+        }
+
+        /// <summary>
+        ///  THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+        /// </summary>
+        /// <param name="e"></param>
+        private void HandleTaskException(TaskClientCodeException e)
+        {
+            LOGGER.Log(Level.Error, "TaskClientCodeException", e);
+            byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
+            TaskStatusProto taskStatus = new TaskStatusProto()
+            {
+                context_id = e.ContextId,
+                task_id = e.TaskId,
+                result = exception,
+                state = State.FAILED
+            };
+            LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString()));
+            _heartBeatManager.OnNext(taskStatus);
+        }
+
+        /// <summary>
+        /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+        /// </summary>
+        /// <param name="e"></param>
+        private void HandlContextException(ContextClientCodeException e)
+        {
+            LOGGER.Log(Level.Error, "ContextClientCodeException", e);
+            byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
+            ContextStatusProto contextStatusProto = new ContextStatusProto()
+            {
+                context_id = e.ContextId,
+                context_state = ContextStatusProto.State.FAIL,
+                error = exception
+            };
+            if (e.ParentId.IsPresent())
+            {
+                contextStatusProto.parent_id = e.ParentId.Value;
+            }
+            LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString()));
+            _heartBeatManager.OnNext(contextStatusProto);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs
new file mode 100644
index 0000000..012e436
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Evaluator.Context;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Common.Task;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Interface;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    public class ContextRuntime
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime));
+        // Context-local injector. This contains information that will not be available in child injectors.
+        private readonly IInjector _contextInjector;
+        //// Service injector. State in this injector moves to child injectors.
+        private readonly IInjector _serviceInjector;
+
+        // Convenience class to hold all the event handlers for the context as well as the service instances.
+        private readonly ContextLifeCycle _contextLifeCycle;
+
+        // The child context, if any.
+        private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty();
+
+        // The parent context, if any.
+        private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty();
+
+        // The currently running task, if any.
+        private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty();
+
+        private ContextStatusProto.State _contextState = ContextStatusProto.State.READY;
+
+        /// <summary>
+        /// Create a new ContextRuntime.
+        /// </summary>
+        /// <param name="serviceInjector"></param>
+        /// <param name="contextConfiguration">the Configuration for this context.</param>
+        /// <param name="parentContext"></param>
+        public ContextRuntime(
+                IInjector serviceInjector,
+                IConfiguration contextConfiguration,
+                Optional<ContextRuntime> parentContext)
+        {
+            ContextConfiguration config = contextConfiguration as ContextConfiguration;
+            if (config == null)
+            {
+                var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration");
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+            }
+            _contextLifeCycle = new ContextLifeCycle(config.Id);
+            _serviceInjector = serviceInjector;
+            _parentContext = parentContext;
+            try
+            {
+                _contextInjector = serviceInjector.ForkInjector();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+
+                Optional<string> parentId = ParentContext.IsPresent() ?
+                    Optional<string>.Of(ParentContext.Value.Id) :
+                    Optional<string>.Empty();
+                ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e);
+                
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+            }
+            // Trigger the context start events on contextInjector.
+            _contextLifeCycle.Start();
+        }
+
+        /// <summary>
+        ///  Create a new ContextRuntime for the root context.
+        /// </summary>
+        /// <param name="serviceInjector"> </param> the serviceInjector to be used.
+        /// <param name="contextConfiguration"> the Configuration for this context.</param>
+        public ContextRuntime(
+            IInjector serviceInjector,
+            IConfiguration contextConfiguration)
+            : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty())
+        {
+            LOGGER.Log(Level.Info, "Instantiating root context");
+        }
+
+        public string Id
+        {
+            get { return _contextLifeCycle.Id; }
+        }
+
+        public Optional<ContextRuntime> ParentContext
+        {
+            get { return _parentContext; }
+        }
+
+        /// <summary>
+        ///  Spawns a new context.
+        ///  The new context will have a serviceInjector that is created by forking the one in this object with the given
+        ///  serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+        /// </summary>
+        /// <param name="contextConfiguration">the new context's context (local) Configuration.</param>
+        /// <param name="serviceConfiguration">the new context's service Configuration.</param>
+        /// <returns>a child context.</returns>
+        public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
+        {
+            ContextRuntime childContext = null;
+            lock (_contextLifeCycle)
+            {
+                if (_task.IsPresent())
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                if (_childContext.IsPresent())
+                {
+                    var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                try
+                {
+                    IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration });
+                    childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this));
+                    _childContext = Optional<ContextRuntime>.Of(childContext);
+                    return childContext;
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+
+                    Optional<string> parentId = ParentContext.IsPresent() ?
+                        Optional<string>.Of(ParentContext.Value.Id) :
+                        Optional<string>.Empty();
+                    ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e);
+                    
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+                }
+            }
+            return childContext;
+        }
+
+        /// <summary>
+        /// Spawns a new context without services of its own.
+        /// The new context will have a serviceInjector that is created by forking the one in this object. The
+        /// contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+        /// </summary>
+        /// <param name="contextConfiguration">the new context's context (local) Configuration.</param>
+        /// <returns> a child context.</returns>
+        public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (_task.IsPresent())
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                if (_childContext.IsPresent())
+                {
+                    var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                IInjector childServiceInjector = _serviceInjector.ForkInjector();
+                ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this));
+                _childContext = Optional<ContextRuntime>.Of(childContext);
+                return childContext;
+            }
+        }
+
+        /// <summary>
+        ///  Launches an Task on this context.
+        /// </summary>
+        /// <param name="taskConfiguration"></param>
+        /// <param name="contextId"></param>
+        /// <param name="heartBeatManager"></param>
+        public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager)
+        {
+            lock (_contextLifeCycle)
+            {
+                bool taskPresent = _task.IsPresent();
+                bool taskEnded = taskPresent && _task.Value.HasEnded();
+
+                LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded);
+                if (taskPresent)
+                {
+                    LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState());
+                }
+
+                if (taskEnded)
+                {
+                    // clean up state
+                    _task = Optional<TaskRuntime>.Empty();
+                    taskPresent = false;
+                }
+                if (taskPresent)
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                if (_childContext.IsPresent())
+                {
+                    var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                try
+                {
+                    IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig });
+                    LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString());
+                    TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class);
+                    taskRuntime.Initialize();
+                    System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start));                    
+                    _task = Optional<TaskRuntime>.Of(taskRuntime);
+                }
+                catch (Exception e)
+                {
+                    var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e);
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Close this context. If there is a child context, this recursively closes it before closing this context. If
+        /// there is an Task currently running, that will be closed.
+        /// </summary>
+        public void Dispose()
+        {
+            lock (_contextLifeCycle)
+            {
+                _contextState = ContextStatusProto.State.DONE;
+                if (_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed.");
+                    _task.Value.Close(null);
+                }
+                if (_childContext.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed.");
+                    _childContext.Value.Dispose();
+                }
+                _contextLifeCycle.Close();
+                if (_parentContext.IsPresent())
+                {
+                    ParentContext.Value.ResetChildContext();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Issue a suspend call to the Task
+        /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+        /// in the log.
+        /// </summary>
+        /// <param name="message"> the suspend message to deliver or null if there is none.</param>
+        public void SuspendTask(byte[] message)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (!_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored");
+                }
+                else
+                {
+                    _task.Value.Suspend(message);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Issue a close call to the Task
+        /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+        /// in the log.
+        /// </summary>
+        /// <param name="message">the close  message to deliver or null if there is none.</param>
+        public void CloseTask(byte[] message)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (!_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored");
+                }
+                else
+                {
+                    _task.Value.Close(message);
+                }
+            }
+        }
+
+        /// <summary>
+        ///  Deliver a message to the Task
+        /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+        /// in the log.
+        /// </summary>
+        /// <param name="message">the message to deliver or null if there is none.</param>
+        public void DeliverTaskMessage(byte[] message)
+        {
+            lock (_contextLifeCycle)
+            {
+                if (!_task.IsPresent())
+                {
+                    LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored");
+                }
+                else
+                {
+                    _task.Value.Deliver(message);
+                }
+            }
+        }
+
+        public void HandleContextMessaage(byte[] mesage)
+        {
+            _contextLifeCycle.HandleContextMessage(mesage);
+        }
+
+        /// <summary>
+        /// get state of the running Task
+        /// </summary>
+        /// <returns> the state of the running Task, if one is running.</returns>
+        public Optional<TaskStatusProto> GetTaskStatus()
+        {
+            lock (_contextLifeCycle)
+            {
+                if (_task.IsPresent())
+                {
+                    if (_task.Value.HasEnded())
+                    {
+                        _task = Optional<TaskRuntime>.Empty();
+                        return Optional<TaskStatusProto>.Empty();
+                    }
+                    else
+                    {
+                        TaskStatusProto taskStatusProto = _task.Value.GetStatusProto();
+                        if (taskStatusProto.state == State.RUNNING)
+                        {
+                            // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat 
+                            return Optional<TaskStatusProto>.Of(taskStatusProto);
+                        }
+                        var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state));
+                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                        return Optional<TaskStatusProto>.Empty();
+                    }
+                }
+                else
+                {
+                    return Optional<TaskStatusProto>.Empty();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Reset child context when parent is being closed
+        /// </summary>
+        public void ResetChildContext()
+        {
+            lock (_contextLifeCycle)
+            {
+                if (_childContext.IsPresent())
+                {
+                    _childContext = Optional<ContextRuntime>.Empty();
+                }
+                else
+                {
+                    var e = new InvalidOperationException("no child context set");
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// get context's status in protocol buffer
+        /// </summary>
+        /// <returns>this context's status in protocol buffer form.</returns>
+        public ContextStatusProto GetContextStatus()
+        {
+            lock (_contextLifeCycle)
+            {
+                ContextStatusProto contextStatusProto = new ContextStatusProto()
+                {
+                    context_id = Id,
+                    context_state = _contextState,
+                };
+                if (_parentContext.IsPresent())
+                {
+                    contextStatusProto.parent_id = _parentContext.Value.Id;
+                }
+
+                foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources)
+                {
+                    Optional<ContextMessage> contextMessageOptional = source.Message;
+                    if (contextMessageOptional.IsPresent())
+                    {
+                        ContextStatusProto.ContextMessageProto contextMessageProto
+                            = new ContextStatusProto.ContextMessageProto()
+                            {
+                                source_id = contextMessageOptional.Value.MessageSourceId,
+                            };
+                        contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes);
+                        contextStatusProto.context_message.Add(contextMessageProto);
+                    }
+                }
+                return contextStatusProto;
+            }
+        }
+    }
+}
+        ///// <summary>
+        ///// TODO: remove and use parameterless GetContextStatus above
+        ///// </summary>
+        ///// <returns>this context's status in protocol buffer form.</returns>
+        //public ContextStatusProto GetContextStatus(string contextId)
+        //{
+        //    ContextStatusProto contextStatusProto = new ContextStatusProto()
+        //    {
+        //        context_id = contextId,
+        //        context_state = _contextState,
+        //    };
+        //    return contextStatusProto;
+        //}
+
+        ////// TODO: remove and use injection
+        //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId)
+        //{
+        //    lock (_contextLifeCycle)
+        //    {
+        //        if (_task.IsPresent() && _task.Value.HasEnded())
+        //        {
+        //            // clean up state
+        //            _task = Optional<TaskRuntime>.Empty();
+        //        }
+        //        if (_task.IsPresent())
+        //        {
+        //            throw new InvalidOperationException(
+        //                string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here
+        //        }
+        //        if (_childContext.IsPresent())
+        //        {
+        //            throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context.");
+        //        }
+        //        try
+        //        {
+        //            // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration);
+        //            TaskRuntime taskRuntime  // taskInjector.getInstance(TaskRuntime.class);
+        //                = new TaskRuntime(task, heartBeatManager);
+        //            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId));
+        //            taskRuntime.Initialize(taskId, contextId);
+        //            taskRuntime.Start();
+        //            _task = Optional<TaskRuntime>.Of(taskRuntime);
+        //        }
+        //        catch (Exception e)
+        //        {
+        //            throw new InvalidOperationException("Unable to instantiate the new task");
+        //        }
+        //    }
+        //}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs
new file mode 100644
index 0000000..3cf31b5
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Events;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    class ContextStartImpl : IContextStart
+    {
+        public ContextStartImpl(string id)
+        {
+            Id = id;
+        }
+
+        public string Id { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs
new file mode 100644
index 0000000..5db45d1
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Events;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    class ContextStopImpl : IContextStop
+    {
+        public ContextStopImpl(string id)
+        {
+            Id = id;
+        }
+        
+        public string Id { get; set; }   
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs
new file mode 100644
index 0000000..d31aeed
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Evaluator.Context;
+using Org.Apache.Reef.Services;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Implementations;
+using Org.Apache.Reef.Tang.Interface;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common.Context
+{
+    /// <summary>
+    ///  Helper class that encapsulates the root context configuration: With or without services and an initial task.
+    /// </summary>
+    public sealed class RootContextLauncher
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher));
+        
+        private readonly IInjector _rootServiceInjector = null;
+
+        private ContextRuntime _rootContext = null;
+
+        private ContextConfiguration _rootContextConfiguration = null;
+
+        public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
+        {
+            _rootContextConfiguration = rootContextConfig;
+            _rootServiceInjector = InjectServices(rootServiceConfig);
+            RootTaskConfig = rootTaskConfig;
+        }
+
+        public Optional<TaskConfiguration> RootTaskConfig { get; set; }
+
+        public ContextConfiguration RootContextConfig
+        {
+            get { return _rootContextConfiguration; }
+            set { _rootContextConfiguration = value; }
+        }
+
+        public ContextRuntime GetRootContext()
+        {
+            if (_rootContext == null)
+            {
+                _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration);
+            }
+            return _rootContext;
+        }
+
+        private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig)
+        {
+            IInjector rootServiceInjector;
+
+            if (serviceConfig.IsPresent())
+            {
+                rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig);
+                InjectedServices services = null;
+                try
+                {
+                    services = rootServiceInjector.GetInstance<InjectedServices>();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER);
+                    InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace));
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
+                }
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count));
+            }
+            else
+            {
+                rootServiceInjector = TangFactory.GetTang().NewInjector();
+                LOGGER.Log(Level.Info, "no service provided for injection.");
+            }
+            
+            return rootServiceInjector;
+        }
+
+        private ContextRuntime GetRootContext( 
+            IInjector rootServiceInjector,
+            IConfiguration rootContextConfiguration)
+        {
+            ContextRuntime result;
+            result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+            return result;
+        }
+    }
+}
+//if (rootServiceInjector != null)
+//{
+//    try
+//    {
+//        rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs);
+//    }
+//    catch (Exception e)
+//    {
+//        throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration),
+//                                             Optional<String>.Empty(),
+//                                             "Unable to instatiate the root context", e);
+//    }
+//    result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+//}
+//else
+//{
+//    result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration);
+//}
\ No newline at end of file