You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:43 UTC

[22/51] [partial] incubator-reef git commit: [REEF-131] Towards the new .Net project structure This is to change .Net project structure for Tang, Wake, REEF utilities, Common and Driver:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
deleted file mode 100644
index 052764d..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Context;
-using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
-using Org.Apache.Reef.Evaluator;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Tang.Annotations;
-using Org.Apache.Reef.Tang.Interface;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Time;
-using Org.Apache.Reef.Wake.Time.Runtime.Event;
-using System;
-using System.Globalization;
-
-namespace Org.Apache.Reef.Common
-{
-    public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage>
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime));
-        
-        private readonly string _evaluatorId;
-
-        private readonly ContextManager _contextManager;
-
-        private readonly HeartBeatManager _heartBeatManager;
-
-        private readonly IRemoteManager<REEFMessage> _remoteManager;
-
-        private readonly IClock _clock;
-
-        private State _state = State.INIT;
-
-        private IDisposable _evaluatorControlChannel; 
-
-        [Inject]
-        public EvaluatorRuntime(
-            ContextManager contextManager,
-            HeartBeatManager heartBeatManager)
-        {
-            using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
-            {
-                _clock = heartBeatManager.EvaluatorSettings.RuntimeClock;
-                _heartBeatManager = heartBeatManager;
-                _contextManager = contextManager;
-                _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId;
-                _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager;
-
-                ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver();
-
-                // subscribe to driver proto message
-                driverObserver.Subscribe(o => OnNext(o.Message));
-
-                // register the driver observer
-                _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver);
-
-                // start the hearbeat
-                _clock.ScheduleAlarm(0, heartBeatManager);
-            }
-        }
-
-        public State State
-        {
-            get
-            {
-                return _state;
-            }
-        }
-
-        public void Handle(EvaluatorControlProto message)
-        {
-            lock (_heartBeatManager)
-            {
-                LOGGER.Log(Level.Info, "Handle Evaluator control message");
-                if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
-                {
-                    Handle(new InvalidOperationException(
-                        string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId)));
-                }
-                else if (_state != State.RUNNING)
-                {
-                    Handle(new InvalidOperationException(
-                        string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state)));
-                }
-                else
-                {
-                    if (message.context_control != null)
-                    {
-                        LOGGER.Log(Level.Info, "Send task control message to ContextManager");
-                        try
-                        {
-                            _contextManager.HandleTaskControl(message.context_control);
-                            if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING)
-                            {
-                                LOGGER.Log(Level.Info, "Context stack is empty, done");
-                                _state = State.DONE;
-                                _heartBeatManager.OnNext(GetEvaluatorStatus());
-                                _clock.Dispose();
-                            }
-                        }
-                        catch (Exception e)
-                        {
-                            Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
-                            Handle(e);
-                            Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER);
-                        }
-                    }
-                    if (message.kill_evaluator != null)
-                    {
-                        LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId));
-                        _state = State.KILLED;
-                        _clock.Dispose();
-                    }
-                }
-            }
-        }
-
-        public EvaluatorStatusProto GetEvaluatorStatus()
-        {
-            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state));
-            EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
-            {
-                evaluator_id = _evaluatorId,
-                state = _state
-            };
-            return evaluatorStatusProto;
-        }
-
-        public void OnNext(RuntimeStart runtimeStart)
-        {
-            lock (_evaluatorId)
-            {
-                try
-                {
-                    LOGGER.Log(Level.Info, "Runtime start");
-                    if (_state != State.INIT)
-                    {
-                        var e = new InvalidOperationException("State should be init.");
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                    }
-                    _state = State.RUNNING;
-                    _contextManager.Start();
-                    _heartBeatManager.OnNext();
-                }
-                catch (Exception e)
-                {
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
-                    Handle(e);
-                }
-            }
-        }
-
-        void IObserver<RuntimeStart>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<REEFMessage>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<REEFMessage>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<RuntimeStop>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<RuntimeStop>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<RuntimeStart>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        public void OnNext(RuntimeStop runtimeStop)
-        {
-            LOGGER.Log(Level.Info, "Runtime stop");
-            _contextManager.Dispose();
-
-            if (_state == State.RUNNING)
-            {
-                _state = State.DONE;
-                _heartBeatManager.OnNext();
-            }
-            try
-            {
-                _evaluatorControlChannel.Dispose();
-            }
-            catch (Exception e)
-            {
-                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER);
-            }
-            LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete");        
-        }
-
-        public void OnNext(REEFMessage value)
-        {
-            if (value != null && value.evaluatorControl != null)
-            {
-                LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl");
-                Handle(value.evaluatorControl);
-            }
-        }
-
-        private void Handle(Exception e)
-        {
-            lock (_heartBeatManager)
-            {
-                LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e);
-                _state = State.FAILED;
-                string errorMessage = string.Format(
-                        CultureInfo.InvariantCulture,
-                        "failed with error [{0}] with mesage [{1}] and stack trace [{2}]",
-                        e,
-                        e.Message,
-                        e.StackTrace);
-                EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
-                {
-                    evaluator_id = _evaluatorId,
-                    error = ByteUtilities.StringToByteArrays(errorMessage),
-                    state = _state
-                };
-                _heartBeatManager.OnNext(evaluatorStatusProto);
-                _contextManager.Dispose();
-            }       
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
deleted file mode 100644
index 067a0a0..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Evaluator;
-using Org.Apache.Reef.Common.Evaluator.Context;
-using Org.Apache.Reef.Common.io;
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Tang.Interface;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Time;
-using System;
-
-namespace Org.Apache.Reef.Evaluator
-{
-    // TODO: merge with EvaluatorConfigurations class
-    public class EvaluatorSettings
-    {
-        private string _applicationId;
-
-        private string _evaluatorId;
-
-        private int _heartBeatPeriodInMs;
-
-        private int _maxHeartbeatRetries;
-
-        private ContextConfiguration _rootContextConfig;
-
-        private IClock _clock;
-
-        private IRemoteManager<REEFMessage> _remoteManager;
-
-        private IInjector _injector;
-
-        private EvaluatorOperationState _operationState;
-
-        private INameClient _nameClient;
-
-        public EvaluatorSettings(
-            string applicationId,
-            string evaluatorId,
-            int heartbeatPeriodInMs,
-            int maxHeartbeatRetries,
-            ContextConfiguration rootContextConfig,
-            IClock clock,
-            IRemoteManager<REEFMessage> remoteManager,
-            IInjector injecor)
-        {
-            if (string.IsNullOrWhiteSpace(evaluatorId))
-            {
-                throw new ArgumentNullException("evaluatorId");
-            }
-            if (rootContextConfig == null)
-            {
-                throw new ArgumentNullException("rootContextConfig");
-            }
-            if (clock == null)
-            {
-                throw new ArgumentNullException("clock");
-            }
-            if (remoteManager == null)
-            {
-                throw new ArgumentNullException("remoteManager");
-            }
-            if (injecor == null)
-            {
-                throw new ArgumentNullException("injecor");
-            }
-            _applicationId = applicationId;
-            _evaluatorId = evaluatorId;
-            _heartBeatPeriodInMs = heartbeatPeriodInMs;
-            _maxHeartbeatRetries = maxHeartbeatRetries;
-            _rootContextConfig = rootContextConfig;
-            _clock = clock;
-            _remoteManager = remoteManager;
-            _injector = injecor;
-            _operationState = EvaluatorOperationState.OPERATIONAL;
-        }
-
-        public EvaluatorOperationState OperationState
-        {
-            get
-            {
-                return _operationState;
-            }
-
-            set
-            {
-                _operationState = value;
-            }
-        }
-
-        public string EvalutorId
-        {
-            get
-            {
-                return _evaluatorId;
-            }
-        }
-
-        public int HeartBeatPeriodInMs
-        {
-            get
-            {
-                return _heartBeatPeriodInMs;
-            }
-        }
-
-        public string ApplicationId
-        {
-            get
-            {
-                return _applicationId;
-            }
-        }
-
-        public int MaxHeartbeatFailures
-        {
-            get
-            {
-                return _maxHeartbeatRetries;
-            }
-        }
-
-        public ContextConfiguration RootContextConfig
-        {
-            get
-            {
-                return _rootContextConfig;
-            }
-        }
-
-        public IClock RuntimeClock
-        {
-            get
-            {
-                return _clock;
-            }
-        }
-
-        public INameClient NameClient
-        {
-            get
-            {
-                return _nameClient;
-            }
-
-            set
-            {
-                _nameClient = value;
-            }
-        }
-
-        public IRemoteManager<REEFMessage> RemoteManager
-        {
-            get
-            {
-                return _remoteManager;
-            }
-        }
-
-        public IInjector Injector
-        {
-            get
-            {
-                return _injector;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
deleted file mode 100644
index e495fda..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Context;
-using Org.Apache.Reef.Common.Evaluator;
-using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
-using Org.Apache.Reef.Common.Runtime;
-using Org.Apache.Reef.Evaluator;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Remote.Impl;
-using Org.Apache.Reef.Wake.Time;
-using System;
-using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using System.Globalization;
-using System.Linq;
-using System.Net;
-using System.Threading;
-
-namespace Org.Apache.Reef.Common
-{
-    public class HeartBeatManager : IObserver<Alarm>
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager));
-
-        private static readonly MachineStatus MachineStatus = new MachineStatus();
-
-        private readonly IRemoteManager<REEFMessage> _remoteManager;
-
-        private readonly IClock _clock;
-
-        private readonly int _heartBeatPeriodInMillSeconds;
-
-        private readonly int _maxHeartbeatRetries = 0;
-
-        private readonly string _evaluatorId;
-
-        private IRemoteIdentifier _remoteId;
-
-        private IObserver<REEFMessage> _observer;
-
-        private int _heartbeatFailures = 0;
-
-        private IDriverConnection _driverConnection;
-
-        private EvaluatorSettings _evaluatorSettings;
-
-        // the queue can only contains the following:
-        // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state
-        // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat)
-        private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>();
-
-        public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId)
-        {
-            using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager"))
-            {
-                _remoteManager = settings.RemoteManager;
-                _remoteId = remoteId;
-                _evaluatorId = settings.EvalutorId;
-                _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
-                _clock = settings.RuntimeClock;
-                _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
-                _maxHeartbeatRetries = settings.MaxHeartbeatFailures;
-                EvaluatorSettings = settings;
-                MachineStatus.ToString(); // kick start the CPU perf counter
-            }
-        }
-
-        [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
-        public EvaluatorRuntime _evaluatorRuntime { get; set; }
-
-        [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
-        public ContextManager _contextManager { get; set; }
-
-        public EvaluatorSettings EvaluatorSettings
-        {
-            get
-            {
-                return _evaluatorSettings;
-            }
-
-            private set
-            {
-                _evaluatorSettings = value;
-            }
-        }
-
-        public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
-        {
-            lock (_queuedHeartbeats)
-            {
-                if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
-                {
-                    LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto));
-                    _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
-                    return;
-                }
-
-                // NOT during recovery, try to send
-                REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto);
-                try
-                {
-                    _observer.OnNext(payload);
-                    _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
-                }
-                catch (Exception e)
-                {
-                    if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING)
-                    {
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER);
-                    }
-
-                    _heartbeatFailures++;
-
-                    _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
-                    LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
-
-                    if (_heartbeatFailures >= _maxHeartbeatRetries)
-                    {
-                        LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures));
-                        try
-                        {
-                            _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>();
-                        }
-                        catch (Exception ex)
-                        {
-                            Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER);
-                        }
-                        LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
-                        _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
-
-                        // clean heartbeat failure
-                        _heartbeatFailures = 0;
-                    }
-                }
-            }     
-        }
-
-        /// <summary>
-        /// Assemble a complete new heartbeat and send it out.
-        /// </summary>
-        public void OnNext()
-        {
-            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()");
-            lock (this)
-            {
-                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()");
-                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto();
-                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
-                Send(heartbeatProto);
-            }
-        }
-
-        /// <summary>
-        /// Called with a specific TaskStatus that must be delivered to the driver
-        /// </summary>
-        /// <param name="taskStatusProto"></param>
-        public void OnNext(TaskStatusProto taskStatusProto)
-        {
-            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)");
-            lock (this)
-            {
-                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)");
-                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
-                    _evaluatorRuntime.GetEvaluatorStatus(),
-                    _contextManager.GetContextStatusCollection(),
-                     Optional<TaskStatusProto>.Of(taskStatusProto));
-                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
-                Send(heartbeatProto);
-            }
-        }
-
-        /// <summary>
-        ///  Called with a specific ContextStatusProto that must be delivered to the driver
-        /// </summary>
-        /// <param name="contextStatusProto"></param>
-        public void OnNext(ContextStatusProto contextStatusProto)
-        {
-            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)");
-            lock (this)
-            {
-                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)");
-                List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>();
-                contextStatusProtos.Add(contextStatusProto);
-                contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection());
-                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
-                    _evaluatorRuntime.GetEvaluatorStatus(),
-                    contextStatusProtos,
-                    Optional<TaskStatusProto>.Empty());
-                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
-                Send(heartbeatProto);
-            }
-        }
-
-        /// <summary>
-        /// Called with a specific EvaluatorStatus that must be delivered to the driver
-        /// </summary>
-        /// <param name="evaluatorStatusProto"></param>
-        public void OnNext(EvaluatorStatusProto evaluatorStatusProto)
-        {
-            LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)");
-            lock (this)
-            {
-                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)");
-                EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto()
-                {
-                    timestamp = CurrentTimeMilliSeconds(),
-                    evaluator_status = evaluatorStatusProto
-                };
-                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
-                Send(heartbeatProto);
-            }
-        }
-
-        public void OnNext(Alarm value)
-        {
-            LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)");
-            lock (this)
-            {
-                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)");
-                if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING)
-                {
-                    EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
-                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString()));
-                    Send(evaluatorHeartbeatProto);
-                    _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
-                }
-                else
-                {
-                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState,  _evaluatorRuntime.State));
-                    try
-                    {
-                        DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId);
-                        if (driverInformation == null)
-                        {
-                            LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
-                        }
-                        else
-                        {
-                            LOGGER.Log(
-                                Level.Info, 
-                                string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
-                            Recover(driverInformation);
-                        }
-                    }
-                    catch (Exception e)
-                    {
-                        // we do not want any exception to stop the query for driver status
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER);
-                    }
-                    _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
-                }
-            }
-        }
-
-        public void OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        public void OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        private static long CurrentTimeMilliSeconds()
-        {
-            // this is an implmenation to get current time milli second counted from Jan 1st, 1970
-            // it is chose as such to be compatible with java implmentation
-            DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
-            return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
-        }
-
-        private void Recover(DriverInformation driverInformation)
-        {
-            IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier);
-            _remoteId = new SocketRemoteIdentifier(driverEndpoint);
-            _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
-            lock (_evaluatorSettings)
-            {
-                if (_evaluatorSettings.NameClient != null)
-                {
-                    try
-                    {
-                        LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId);
-                        _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId));
-                        LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId);
-                    }
-                    catch (Exception e)
-                    {
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
-                    }
-                }
-            }
-
-            lock (_queuedHeartbeats)
-            {
-                bool firstHeartbeatInQueue = true;
-                while (_queuedHeartbeats.Any())
-                {
-                    LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId);
-                    try
-                    {
-                        if (firstHeartbeatInQueue)
-                        {
-                            // first heartbeat is specially construted to include the recovery flag
-                            EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
-                            LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
-                            _observer.OnNext(new REEFMessage(recoveryHeartbeat));
-                            firstHeartbeatInQueue = false;
-                        }
-                        else
-                        {
-                            _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue()));
-                        }
-                    }
-                    catch (Exception e)
-                    {
-                        // we do not handle failures during RECOVERY 
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(
-                            e,
-                            Level.Error,
-                            string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId),
-                            LOGGER);
-                    }
-                    Thread.Sleep(500);
-                }
-            }        
-            _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
-            LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
-        }
-
-        private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
-        {
-            heartbeat.recovery = true;
-            heartbeat.context_status.ForEach(c => c.recovery = true);
-            heartbeat.task_status.recovery = true;
-            return heartbeat;
-        }
-
-        private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto()
-        {
-            return GetEvaluatorHeartbeatProto(
-                _evaluatorRuntime.GetEvaluatorStatus(),
-                _contextManager.GetContextStatusCollection(),
-                _contextManager.GetTaskStatus());
-        }
-
-        private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto(
-            EvaluatorStatusProto evaluatorStatusProto,
-            ICollection<ContextStatusProto> contextStatusProtos,
-            Optional<TaskStatusProto> taskStatusProto)
-        {
-            EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto()
-            {
-                timestamp = CurrentTimeMilliSeconds(),
-                evaluator_status = evaluatorStatusProto
-            };
-            foreach (ContextStatusProto contextStatusProto in contextStatusProtos)
-            {
-                evaluatorHeartbeatProto.context_status.Add(contextStatusProto);
-            }
-            if (taskStatusProto.IsPresent())
-            {
-                evaluatorHeartbeatProto.task_status = taskStatusProto.Value;
-            }
-            return evaluatorHeartbeatProto;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
deleted file mode 100644
index 8a7aa94..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Remote;
-using System;
-using System.Globalization;
-using System.Threading;
-
-namespace Org.Apache.Reef.Common
-{
-    public class ReefMessageProtoObserver :
-        IObserver<IRemoteMessage<REEFMessage>>,
-        IObservable<IRemoteMessage<REEFMessage>>,
-        IDisposable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver));
-        private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null;
-        private long _count = 0;
-        private DateTime _begin;
-        private DateTime _origBegin;
-
-        public void OnCompleted()
-        {
-        }
-
-        public void OnError(Exception error)
-        {
-        }
-
-        public void OnNext(IRemoteMessage<REEFMessage> value)
-        {
-            REEFMessage remoteEvent = value.Message;
-            IRemoteIdentifier id = value.Identifier;
-            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id));
-
-            if (remoteEvent.evaluatorControl != null)
-            {
-                if (remoteEvent.evaluatorControl.context_control != null)
-                {
-                    string context_message = null;
-                    string task_message = null;
-
-                    if (remoteEvent.evaluatorControl.context_control.context_message != null)
-                    {
-                        context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString();
-                    }
-                    if (remoteEvent.evaluatorControl.context_control.task_message != null)
-                    {
-                        task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message);
-                    }
-
-                    if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message)))
-                    {
-                        LOGGER.Log(Level.Info, 
-                            string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message));
-                    }                   
-                    else if (remoteEvent.evaluatorControl.context_control.remove_context != null)
-                    {
-                         LOGGER.Log(Level.Info, 
-                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id));
-                    }
-                    else if (remoteEvent.evaluatorControl.context_control.add_context != null)
-                    {
-                        LOGGER.Log(Level.Info, 
-                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id));
-                    }
-                    else if (remoteEvent.evaluatorControl.context_control.start_task != null)
-                    {
-                        LOGGER.Log(Level.Info, 
-                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id));
-                    }
-                    else if (remoteEvent.evaluatorControl.context_control.stop_task != null)
-                    {
-                        LOGGER.Log(Level.Info, "Control protobuf to stop task");
-                    }
-                    else if (remoteEvent.evaluatorControl.context_control.suspend_task != null)
-                    {
-                        LOGGER.Log(Level.Info, "Control protobuf to suspend task"); 
-                    }
-                }
-            } 
-            if (_count == 0)
-            {
-                _begin = DateTime.Now;
-                _origBegin = _begin;
-            }
-            var count = Interlocked.Increment(ref _count);
-
-            int printBatchSize = 100000;
-            if (count % printBatchSize == 0)
-            {
-                DateTime end = DateTime.Now;
-                var diff = (end - _begin).TotalMilliseconds;
-                double seconds = diff / 1000.0;
-                long eventsPerSecond = (long)(printBatchSize / seconds);
-                _begin = DateTime.Now;
-            }
-
-            var observer = _observer;
-            if (observer != null)
-            {
-                observer.OnNext(value);
-            }
-        }
-
-        public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer)
-        {
-            if (_observer != null)
-            {
-                return null;
-            }
-            _observer = observer;
-            return this;
-        }
-
-        public void Dispose()
-        {
-            _observer = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
deleted file mode 100644
index 31194a7..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Tang.Interface;
-using System;
-
-namespace Org.Apache.Reef.Common.Context
-{
-    public class ContextClientCodeException : Exception
-    {
-        private readonly string _contextId;
-        private readonly Optional<string> _parentId;
-
-        /// <summary>
-        /// construt the exception that caused the error
-        /// </summary>
-        /// <param name="contextId"> the id of the failed context.</param>
-        /// <param name="parentId"> the id of the failed context's parent, if any.</param>
-        /// <param name="message"> the error message </param>
-        /// <param name="cause"> the exception that caused the error</param>
-        public ContextClientCodeException(
-                string contextId,
-                Optional<string> parentId,
-                string message,
-                Exception cause)
-            : base("Failure in context '" + contextId + "': " + message, cause)
-        {
-            _contextId = contextId;
-            _parentId = parentId;
-        }
-
-        public string ContextId
-        {
-            get { return _contextId; }
-        }
-
-        public Optional<string> ParentId
-        {
-            get { return _parentId; }
-        }
-
-        /// <summary>
-        /// Extracts a context id from the given configuration.
-        /// </summary>
-        /// <param name="c"></param>
-        /// <returns>the context id in the given configuration.</returns>
-        public static string GetId(IConfiguration c)
-        {
-            // TODO: update after TANG is available
-            return string.Empty;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
deleted file mode 100644
index ca6b949..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Tang.Formats;
-using Org.Apache.Reef.Tang.Interface;
-using System;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using Org.Apache.Reef.Tang.Types;
-
-namespace Org.Apache.Reef.Common.Evaluator.Context
-{
-    public class ContextConfiguration : IConfiguration
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration));
-        
-        private Dictionary<string, string> _settings;
-
-        public ContextConfiguration(string configString)
-        {
-            using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn"))
-            {
-                ContainerDirectory = Directory.GetCurrentDirectory();
-
-                _settings = new Dictionary<string, string>();
-                AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
-                foreach (ConfigurationEntry config in avroConfiguration.Bindings)
-                {
-                    if (config.key.Contains(Reef.Evaluator.Constants.ContextIdentifier))
-                    {
-                        config.key = Reef.Evaluator.Constants.ContextIdentifier;
-                        LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value));
-                    }
-                    _settings.Add(config.key, config.value);
-                }
-                if (!_settings.ContainsKey(Reef.Evaluator.Constants.ContextIdentifier))
-                {
-                    string msg = "Required parameter ContextIdentifier not provided.";
-                    LOGGER.Log(Level.Error, msg);
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
-                }
-            }
-        }
-
-        public string Id
-        {
-            get { return _settings[Reef.Evaluator.Constants.ContextIdentifier]; }
-        }
-
-        public string ContainerDirectory { get; set; }
-
-        public IConfigurationBuilder newBuilder()
-        {
-            throw new NotImplementedException();
-        }
-
-        public string GetNamedParameter(INamedParameterNode np)
-        {
-            throw new NotImplementedException();
-        }
-
-        public IClassHierarchy GetClassHierarchy()
-        {
-            throw new NotImplementedException();
-        }
-
-        public ISet<object> GetBoundSet(INamedParameterNode np)
-        {
-            throw new NotImplementedException();
-        }
-
-        public IClassNode GetBoundConstructor(IClassNode cn)
-        {
-            throw new NotImplementedException();
-        }
-
-        public IClassNode GetBoundImplementation(IClassNode cn)
-        {
-            throw new NotImplementedException();
-        }
-
-        public IConstructorDef GetLegacyConstructor(IClassNode cn)
-        {
-            throw new NotImplementedException();
-        }
-
-        public ICollection<IClassNode> GetBoundImplementations()
-        {
-            throw new NotImplementedException();
-        }
-
-        public ICollection<IClassNode> GetBoundConstructors()
-        {
-            throw new NotImplementedException();
-        }
-
-        public ICollection<INamedParameterNode> GetNamedParameters()
-        {
-            throw new NotImplementedException();
-        }
-
-        public ICollection<IClassNode> GetLegacyConstructors()
-        {
-            throw new NotImplementedException();
-        }
-
-        public IList<object> GetBoundList(INamedParameterNode np)
-        {
-            throw new NotImplementedException();
-        }
-
-        public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets()
-        {
-            throw new NotImplementedException();
-        }
-
-        public IDictionary<INamedParameterNode, IList<object>> GetBoundList()
-        {
-            throw new NotImplementedException();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
deleted file mode 100644
index 9967258..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Events;
-using System;
-using System.Collections.Generic;
-
-namespace Org.Apache.Reef.Common.Context
-{
-    /// <summary>
-    /// This class is used to trigger all the context life-cycle dependent events.
-    /// </summary>
-    class ContextLifeCycle
-    {
-        private HashSet<IObserver<IContextStart>> _contextStartHandlers;
-
-        private HashSet<IObserver<IContextStop>> _contextStopHandlers;
-
-        private HashSet<IContextMessageSource> _contextMessageSources;
-
-        // @Inject
-        public ContextLifeCycle(
-            string id,
-            HashSet<IObserver<IContextStart>> contextStartHandlers,
-            HashSet<IObserver<IContextStop>> contextStopHandlers,
-            HashSet<IContextMessageSource> contextMessageSources)
-        {
-            Id = id;
-            _contextStartHandlers = contextStartHandlers;
-            _contextStopHandlers = contextStopHandlers;
-            _contextMessageSources = contextMessageSources;
-        }
-
-        public ContextLifeCycle(string contextId)
-        {
-            Id = contextId;
-            _contextStartHandlers = new HashSet<IObserver<IContextStart>>();
-            _contextStopHandlers = new HashSet<IObserver<IContextStop>>();
-            _contextMessageSources = new HashSet<IContextMessageSource>();
-        }
-
-        public string Id { get; private set; }
-
-        public HashSet<IContextMessageSource> ContextMessageSources
-        {
-            get { return _contextMessageSources; }
-        }
-
-        /// <summary>
-        /// Fires ContextStart to all registered event handlers.
-        /// </summary>
-        public void Start()
-        {
-            IContextStart contextStart = new ContextStartImpl(Id);
-            
-            // TODO: enable
-            //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers)
-            //{
-            //    startHandler.OnNext(contextStart);
-            //}
-        }
-
-        /// <summary>
-        /// Fires ContextStop to all registered event handlers.
-        /// </summary>
-        public void Close()
-        {
-            //IContextStop contextStop = new ContextStopImpl(Id);
-            //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers)
-            //{
-            //    startHandler.OnNext(contextStop);
-            //}
-        }
-
-        public void HandleContextMessage(byte[] message)
-        {
-            //contextMessageHandler.onNext(message);
-        }
-
-        /// <summary>
-        /// get the set of ContextMessageSources configured
-        /// </summary>
-        /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns>
-        public HashSet<IContextMessageSource> GetContextMessageSources()
-        {
-            return new HashSet<IContextMessageSource>(_contextMessageSources);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
deleted file mode 100644
index 15b09b9..0000000
--- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Common.Evaluator.Context;
-using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
-using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
-using Org.Apache.Reef.Common.Task;
-using Org.Apache.Reef.Evaluator;
-using Org.Apache.Reef.Services;
-using Org.Apache.Reef.Tasks;
-using Org.Apache.Reef.Utilities;
-using Org.Apache.Reef.Utilities.Logging;
-using System;
-using System.Collections.Generic;
-using System.Collections.ObjectModel;
-using System.Globalization;
-using System.Linq;
-
-namespace Org.Apache.Reef.Common.Context
-{
-    public class ContextManager : IDisposable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager));
-        
-        private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>();
-
-        private readonly HeartBeatManager _heartBeatManager;
-
-        private RootContextLauncher _rootContextLauncher;
-
-        public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig)
-        {
-            using (LOGGER.LogFunction("ContextManager::ContextManager"))
-            {
-                _heartBeatManager = heartBeatManager;
-                _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig);
-            }
-        }
-
-        /// <summary>
-        /// Start the context manager. This initiates the root context.
-        /// </summary>
-        public void Start()
-        {
-            lock (_contextStack)
-            {
-                ContextRuntime rootContext = _rootContextLauncher.GetRootContext();
-                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id));
-                _contextStack.Push(rootContext);
-
-                if (_rootContextLauncher.RootTaskConfig.IsPresent())
-                {
-                    LOGGER.Log(Level.Info, "Launching the initial Task");
-                    try
-                    {
-                        _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
-                    }
-                    catch (TaskClientCodeException e)
-                    {
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER);
-                        HandleTaskException(e);
-                    }
-                }
-            }
-        }
-
-        public bool ContextStackIsEmpty()
-        {
-            lock (_contextStack)
-            {
-                return (_contextStack.Count == 0);
-            }
-        }
-
-        // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later
-
-        /// <summary>
-        /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
-        /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
-        /// </summary>
-        /// <param name="controlMessage"></param>
-        public void HandleTaskControl(ContextControlProto controlMessage)
-        {
-            try
-            {
-                byte[] message = controlMessage.task_message;
-                if (controlMessage.add_context != null && controlMessage.remove_context != null)
-                {
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER);
-                }
-                if (controlMessage.add_context != null)
-                {
-                    LOGGER.Log(Level.Info, "AddContext");
-                    AddContext(controlMessage.add_context);
-                    // support submitContextAndTask()
-                    if (controlMessage.start_task != null)
-                    {
-                        LOGGER.Log(Level.Info, "StartTask");
-                        StartTask(controlMessage.start_task);
-                    }
-                    else
-                    {
-                        // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime
-                        // Therefore this call can not go into addContext
-                        LOGGER.Log(Level.Info, "Trigger Heartbeat");
-                        _heartBeatManager.OnNext();
-                    }
-                }
-                else if (controlMessage.remove_context != null)
-                {
-                    LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id));
-                    RemoveContext(controlMessage.remove_context.context_id);
-                }
-                else if (controlMessage.start_task != null)
-                {
-                    LOGGER.Log(Level.Info, "StartTask only");
-                    StartTask(controlMessage.start_task);
-                }
-                else if (controlMessage.stop_task != null)
-                {
-                    LOGGER.Log(Level.Info, "CloseTask");
-                    _contextStack.Peek().CloseTask(message);
-                }
-                else if (controlMessage.suspend_task != null)
-                {
-                    LOGGER.Log(Level.Info, "SuspendTask");
-                    _contextStack.Peek().SuspendTask(message);
-                }
-                else if (controlMessage.task_message != null)
-                {
-                    LOGGER.Log(Level.Info, "DeliverTaskMessage");
-                    _contextStack.Peek().DeliverTaskMessage(message);
-                }
-                else if (controlMessage.context_message != null)
-                {
-                    LOGGER.Log(Level.Info, "Handle context contol message");
-                    ContextMessageProto contextMessageProto = controlMessage.context_message;
-                    bool deliveredMessage = false;
-                    foreach (ContextRuntime context in _contextStack)
-                    {
-                        if (context.Id.Equals(contextMessageProto.context_id))
-                        {
-                            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message));
-                            context.HandleContextMessaage(controlMessage.context_message.message);
-                            deliveredMessage = true;
-                            break;
-                        }
-                    }
-                    if (!deliveredMessage)
-                    {
-                        InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id));
-                        Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                    }
-                }
-                else
-                {
-                    InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString()));
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                } 
-            }
-            catch (Exception e)
-            {
-                if (e is TaskClientCodeException)
-                {
-                    HandleTaskException((TaskClientCodeException)e);
-                }
-                else if (e is ContextClientCodeException)
-                {
-                    HandlContextException((ContextClientCodeException)e);
-                }
-                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER);
-            }  
-        }
-
-        /// <summary>
-        /// Get TaskStatusProto of the currently running task, if there is any
-        /// </summary>
-        /// <returns>the TaskStatusProto of the currently running task, if there is any</returns>
-        public Optional<TaskStatusProto> GetTaskStatus()
-        {
-            if (_contextStack.Count == 0)
-            {
-                return Optional<TaskStatusProto>.Empty();
-
-                //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running.");
-            }
-            return _contextStack.Peek().GetTaskStatus();
-        }
-
-        /// <summary>
-        /// get status of all contexts in the stack.
-        /// </summary>
-        /// <returns>the status of all contexts in the stack.</returns>
-        public ICollection<ContextStatusProto> GetContextStatusCollection()
-        {
-            ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>();
-            foreach (ContextRuntime runtime in _contextStack)
-            {
-                ContextStatusProto contextStatusProto = runtime.GetContextStatus();
-                LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto));
-                result.Add(contextStatusProto);
-            }
-            return result;
-        }
-
-        /// <summary>
-        /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
-        /// starting at the top.
-        /// </summary>
-        public void Dispose()
-        {
-            lock (_contextStack)
-            {
-                if (_contextStack != null && _contextStack.Any())
-                {
-                    LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime.");
-                    ContextRuntime runtime = _contextStack.Last();
-                    if (runtime != null)
-                    {
-                        runtime.Dispose();
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// Add a context to the stack.
-        /// </summary>
-        /// <param name="addContextProto"></param>
-        private void AddContext(AddContextProto addContextProto)
-        {
-            lock (_contextStack)
-            {
-                ContextRuntime currentTopContext = _contextStack.Peek();
-                if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase))
-                {
-                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}",
-                        addContextProto.parent_context_id,
-                        currentTopContext.Id));
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                }
-                string contextConfigString = addContextProto.context_configuration;
-                ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString);
-                ContextRuntime newTopContext;
-                if (addContextProto.service_configuration != null)
-                {
-                    ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration);
-                    newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig);
-                }
-                else
-                {
-                    newTopContext = currentTopContext.SpawnChildContext(contextConfiguration);
-                }
-                _contextStack.Push(newTopContext);
-            }
-        }
-
-        /// <summary>
-        /// Remove the context with the given ID from the stack.
-        /// </summary>
-        /// <param name="contextId"> context id</param>
-        private void RemoveContext(string contextId)
-        {
-            lock (_contextStack)
-            {
-                string currentTopContextId = _contextStack.Peek().Id;
-                if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase))
-                {
-                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId));
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                }
-                _contextStack.Peek().Dispose();
-                if (_contextStack.Count > 1)
-                {
-                    // We did not close the root context. Therefore, we need to inform the
-                    // driver explicitly that this context is closed. The root context notification
-                    // is implicit in the Evaluator close/done notification.
-                    _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state
-                }
-                _contextStack.Pop();
-            }
-            //  System.gc(); // TODO: garbage collect?
-        }
-
-        /// <summary>
-        /// Launch an Task.
-        /// </summary>
-        /// <param name="startTaskProto"></param>
-        private void StartTask(StartTaskProto startTaskProto)
-        {
-            lock (_contextStack)
-            {
-                ContextRuntime currentActiveContext = _contextStack.Peek();
-                string expectedContextId = startTaskProto.context_id;
-                if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase))
-                {
-                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id));
-                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                }
-                TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration);
-                currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager);
-            }
-        }
-
-        /// <summary>
-        ///  THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
-        /// </summary>
-        /// <param name="e"></param>
-        private void HandleTaskException(TaskClientCodeException e)
-        {
-            LOGGER.Log(Level.Error, "TaskClientCodeException", e);
-            byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
-            TaskStatusProto taskStatus = new TaskStatusProto()
-            {
-                context_id = e.ContextId,
-                task_id = e.TaskId,
-                result = exception,
-                state = State.FAILED
-            };
-            LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString()));
-            _heartBeatManager.OnNext(taskStatus);
-        }
-
-        /// <summary>
-        /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
-        /// </summary>
-        /// <param name="e"></param>
-        private void HandlContextException(ContextClientCodeException e)
-        {
-            LOGGER.Log(Level.Error, "ContextClientCodeException", e);
-            byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
-            ContextStatusProto contextStatusProto = new ContextStatusProto()
-            {
-                context_id = e.ContextId,
-                context_state = ContextStatusProto.State.FAIL,
-                error = exception
-            };
-            if (e.ParentId.IsPresent())
-            {
-                contextStatusProto.parent_id = e.ParentId.Value;
-            }
-            LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString()));
-            _heartBeatManager.OnNext(contextStatusProto);
-        }
-    }
-}
\ No newline at end of file