You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:06:09 UTC

[48/51] [partial] incubator-reef git commit: [REEF-131] Towards the new .Net project structure This is to change .Net project structure for Tang, Wake, REEF utilities, Common and Driver:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs
new file mode 100644
index 0000000..383467e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs
@@ -0,0 +1,393 @@
+//------------------------------------------------------------------------------
+// <auto-generated>
+//     This code was generated by a tool.
+//
+//     Changes to this file may cause incorrect behavior and will be lost if
+//     the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: reef_service_protos.proto
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto
+
+{
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"FileResourceProto")]
+  public partial class FileResourceProto : global::ProtoBuf.IExtensible
+  {
+    public FileResourceProto() {}
+    
+    private FileType _type;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"type", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public FileType type
+    {
+      get { return _type; }
+      set { _type = value; }
+    }
+    private string _name;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string name
+    {
+      get { return _name; }
+      set { _name = value; }
+    }
+    private string _path;
+    [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"path", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string path
+    {
+      get { return _path; }
+      set { _path = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"RuntimeErrorProto")]
+  public partial class RuntimeErrorProto : global::ProtoBuf.IExtensible
+  {
+    public RuntimeErrorProto() {}
+    
+    private string _name;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string name
+    {
+      get { return _name; }
+      set { _name = value; }
+    }
+    private string _message;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string message
+    {
+      get { return _message; }
+      set { _message = value; }
+    }
+    private byte[] _exception = null;
+    [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public byte[] exception
+    {
+      get { return _exception; }
+      set { _exception = value; }
+    }
+    private string _identifier = "";
+    [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue("")]
+    public string identifier
+    {
+      get { return _identifier; }
+      set { _identifier = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"JobStatusProto")]
+  public partial class JobStatusProto : global::ProtoBuf.IExtensible
+  {
+    public JobStatusProto() {}
+    
+    private string _identifier;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string identifier
+    {
+      get { return _identifier; }
+      set { _identifier = value; }
+    }
+    private State _state;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public State state
+    {
+      get { return _state; }
+      set { _state = value; }
+    }
+    private byte[] _message = null;
+    [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public byte[] message
+    {
+      get { return _message; }
+      set { _message = value; }
+    }
+    private byte[] _exception = null;
+    [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public byte[] exception
+    {
+      get { return _exception; }
+      set { _exception = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextStatusProto")]
+  public partial class ContextStatusProto : global::ProtoBuf.IExtensible
+  {
+    public ContextStatusProto() {}
+    
+    private ContextStatusProto.State _context_state;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public ContextStatusProto.State context_state
+    {
+      get { return _context_state; }
+      set { _context_state = value; }
+    }
+    private string _context_id;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string context_id
+    {
+      get { return _context_id; }
+      set { _context_id = value; }
+    }
+    private string _parent_id = "";
+    [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"parent_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue("")]
+    public string parent_id
+    {
+      get { return _parent_id; }
+      set { _parent_id = value; }
+    }
+    private byte[] _error = null;
+    [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public byte[] error
+    {
+      get { return _error; }
+      set { _error = value; }
+    }
+    private bool _recovery = false;
+    [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(false)]
+    public bool recovery
+    {
+        get { return _recovery; }
+        set { _recovery = value; }
+    }
+    private readonly global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> _context_message = new global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto>();
+    [global::ProtoBuf.ProtoMember(7, Name=@"context_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> context_message
+    {
+      get { return _context_message; }
+    }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextMessageProto")]
+  public partial class ContextMessageProto : global::ProtoBuf.IExtensible
+  {
+    public ContextMessageProto() {}
+    
+    private string _source_id;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string source_id
+    {
+      get { return _source_id; }
+      set { _source_id = value; }
+    }
+    private byte[] _message;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public byte[] message
+    {
+      get { return _message; }
+      set { _message = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+    [global::ProtoBuf.ProtoContract(Name=@"State")]
+    public enum State
+    {
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"READY", Value=0)]
+      READY = 0,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=1)]
+      DONE = 1,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"FAIL", Value=2)]
+      FAIL = 2
+    }
+  
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskStatusProto")]
+  public partial class TaskStatusProto : global::ProtoBuf.IExtensible
+  {
+    public TaskStatusProto() {}
+    
+    private string _task_id;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"task_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string task_id
+    {
+      get { return _task_id; }
+      set { _task_id = value; }
+    }
+    private string _context_id;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string context_id
+    {
+      get { return _context_id; }
+      set { _context_id = value; }
+    }
+    private State _state;
+    [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public State state
+    {
+      get { return _state; }
+      set { _state = value; }
+    }
+    private byte[] _result = null;
+    [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"result", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public byte[] result
+    {
+      get { return _result; }
+      set { _result = value; }
+    }
+    private bool _recovery = false;
+    [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(false)]
+    public bool recovery
+    {
+        get { return _recovery; }
+        set { _recovery = value; }
+    }
+    private readonly global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> _task_message = new global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto>();
+    [global::ProtoBuf.ProtoMember(6, Name=@"task_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> task_message
+    {
+      get { return _task_message; }
+    }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskMessageProto")]
+  public partial class TaskMessageProto : global::ProtoBuf.IExtensible
+  {
+    public TaskMessageProto() {}
+    
+    private string _source_id;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string source_id
+    {
+      get { return _source_id; }
+      set { _source_id = value; }
+    }
+    private byte[] _message;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public byte[] message
+    {
+      get { return _message; }
+      set { _message = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+  [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"EvaluatorStatusProto")]
+  public partial class EvaluatorStatusProto : global::ProtoBuf.IExtensible
+  {
+    public EvaluatorStatusProto() {}
+    
+    private string _evaluator_id;
+    [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"evaluator_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    public string evaluator_id
+    {
+      get { return _evaluator_id; }
+      set { _evaluator_id = value; }
+    }
+    private State _state;
+    [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+    public State state
+    {
+      get { return _state; }
+      set { _state = value; }
+    }
+    private byte[] _error = null;
+    [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)]
+    [global::System.ComponentModel.DefaultValue(null)]
+    public byte[] error
+    {
+      get { return _error; }
+      set { _error = value; }
+    }
+    private global::ProtoBuf.IExtension extensionObject;
+    global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+      { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+  }
+  
+    [global::ProtoBuf.ProtoContract(Name=@"State")]
+    public enum State
+    {
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"INIT", Value=0)]
+      INIT = 0,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"RUNNING", Value=1)]
+      RUNNING = 1,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=2)]
+      DONE = 2,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"SUSPEND", Value=3)]
+      SUSPEND = 3,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"FAILED", Value=4)]
+      FAILED = 4,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"KILLED", Value=5)]
+      KILLED = 5
+    }
+  
+    [global::ProtoBuf.ProtoContract(Name=@"FileType")]
+    public enum FileType
+    {
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"PLAIN", Value=0)]
+      PLAIN = 0,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"LIB", Value=1)]
+      LIB = 1,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"ARCHIVE", Value=2)]
+      ARCHIVE = 2
+    }
+  
+    [global::ProtoBuf.ProtoContract(Name=@"SIZE")]
+    public enum SIZE
+    {
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"SMALL", Value=0)]
+      SMALL = 0,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"MEDIUM", Value=1)]
+      MEDIUM = 1,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"LARGE", Value=2)]
+      LARGE = 2,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"XLARGE", Value=3)]
+      XLARGE = 3
+    }
+  
+    [global::ProtoBuf.ProtoContract(Name=@"ProcessType")]
+    public enum ProcessType
+    {
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"JVM", Value=0)]
+      JVM = 0,
+            
+      [global::ProtoBuf.ProtoEnum(Name=@"CLR", Value=1)]
+      CLR = 1
+    }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs
new file mode 100644
index 0000000..6bd90e8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Utilities;
+
+using ProtoBuf;
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.IO;
+using System.Text;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1403:FileMayOnlyContainASingleNamespace", Justification = "Serializers for all protobuf types")]
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "Serializers for all protobuf types")]
+
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol
+{
+    /// <summary>
+    /// Add serializer/deserializer to REEFMessage
+    /// </summary>
+    public partial class REEFMessage
+    {
+        public REEFMessage(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
+        {
+            _evaluatorHeartBeat = evaluatorHeartbeatProto;
+        }
+
+        public static REEFMessage Deserialize(byte[] bytes)
+        {
+            REEFMessage pbuf = null;
+            using (var s = new MemoryStream(bytes))
+            {
+                pbuf = Serializer.Deserialize<REEFMessage>(s);
+            }
+            return pbuf;
+        }
+
+        public byte[] Serialize()
+        {
+            byte[] b = null;
+            using (var s = new MemoryStream())
+            {
+                Serializer.Serialize<REEFMessage>(s, this);
+                b = new byte[s.Position];
+                var fullBuffer = s.GetBuffer();
+                Array.Copy(fullBuffer, b, b.Length);
+            }
+            return b;
+        }
+    }
+}
+
+namespace Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto
+{
+    /// <summary>
+    /// Add serializer/deserializer to EvaluatorHeartbeatProto
+    /// </summary>
+    public partial class EvaluatorHeartbeatProto
+    {
+        public static EvaluatorHeartbeatProto Deserialize(byte[] bytes)
+        {
+            EvaluatorHeartbeatProto pbuf = null;
+            using (var s = new MemoryStream(bytes))
+            {
+                pbuf = Serializer.Deserialize<EvaluatorHeartbeatProto>(s);
+            }
+            return pbuf;
+        }
+
+        public byte[] Serialize()
+        {
+            byte[] b = null;
+            using (var s = new MemoryStream())
+            {
+                Serializer.Serialize<EvaluatorHeartbeatProto>(s, this);
+                b = new byte[s.Position];
+                var fullBuffer = s.GetBuffer();
+                Array.Copy(fullBuffer, b, b.Length);
+            }
+            return b;
+        }
+
+        public override string ToString()
+        {
+            string contextStatus = string.Empty;
+            string taskStatusMessage = string.Empty;
+            foreach (ContextStatusProto contextStatusProto in context_status)
+            {
+                contextStatus += string.Format(CultureInfo.InvariantCulture, "evaluator {0} has context {1} in state {2} with recovery flag {3}",
+                                               evaluator_status.evaluator_id,
+                                               contextStatusProto.context_id,
+                                               contextStatusProto.context_state,
+                                               contextStatusProto.recovery);
+            }
+            if (task_status != null && task_status.task_message != null && task_status.task_message.Count > 0)
+            {
+                foreach (TaskStatusProto.TaskMessageProto taskMessageProto in task_status.task_message)
+                {
+                    taskStatusMessage += ByteUtilities.ByteArrarysToString(taskMessageProto.message);
+                }
+            }
+            return string.Format(CultureInfo.InvariantCulture, "EvaluatorHeartbeatProto: task_id=[{0}], task_status=[{1}], task_message=[{2}], evaluator_status=[{3}], context_status=[{4}], timestamp=[{5}], recoveryFlag =[{6}]",
+                task_status == null ? string.Empty : task_status.task_id,
+                task_status == null ? string.Empty : task_status.state.ToString(),
+                taskStatusMessage,
+                evaluator_status.state.ToString(),
+                contextStatus,
+                timestamp,
+                recovery);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
new file mode 100644
index 0000000..0f997a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol
+{
+    public class EvaluatorHeartbeatProtoCodec : ICodec<EvaluatorHeartbeatProto>
+    {
+        public byte[] Encode(EvaluatorHeartbeatProto obj)
+        {
+            EvaluatorHeartbeatProto pbuf = new EvaluatorHeartbeatProto();
+
+            pbuf.evaluator_status = obj.evaluator_status;
+            return pbuf.Serialize();
+        }
+
+        public EvaluatorHeartbeatProto Decode(byte[] data)
+        {
+            EvaluatorHeartbeatProto pbuf = EvaluatorHeartbeatProto.Deserialize(data);
+            return pbuf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs
new file mode 100644
index 0000000..41109e3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol
+{
+    public class REEFMessageCodec : ICodec<REEFMessage>
+    {
+        public byte[] Encode(REEFMessage obj)
+        {
+            return obj.Serialize();
+        }
+
+        public REEFMessage Decode(byte[] data)
+        {
+            REEFMessage pbuf = REEFMessage.Deserialize(data);
+            return pbuf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto
new file mode 100644
index 0000000..3d1f927
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "ClientRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "reef_service_protos.proto";
+
+// Messages from REEF Client -> Driver Runtime
+
+message JobSubmissionProto {
+	required string identifier     = 1; // the job identifier
+	required string remote_id      = 2; // the remote identifier
+	required string configuration  = 5; // the runtime configuration
+	required string user_name      = 6; // the user name
+
+  optional SIZE   driver_size    = 7;
+  optional int32  driver_memory  = 8;
+  optional int32  priority       = 9;
+  optional string queue          = 10;
+
+	repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators
+	repeated FileResourceProto local_File  = 12; // files that should be placed on the driver only
+
+}
+
+enum Signal {
+	SIG_TERMINATE = 1;
+	SIG_SUSPEND   = 2;
+	SIG_RESUME    = 3;
+}
+
+message JobControlProto {
+	required string identifier = 1;
+	optional Signal signal     = 2;
+	optional bytes message     = 3;
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto
new file mode 100644
index 0000000..2b21ac7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "DriverRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+
+import "reef_service_protos.proto";
+
+// Messages from Driver Runtime -> Driver Process
+
+message DriverProcessRegistrationProto {
+	required string remote_identifier = 1;
+}
+
+
+message NodeDescriptorProto {
+	required string identifier = 1;
+	required string host_name  = 2; // e.g., IP address
+	required int32 port        = 3; // e.g., IP port
+	required int32 memory_size = 4;
+	optional string rack_name  = 5; // e.g., /default-rack
+}
+
+message ResourceAllocationProto {
+	required string identifier     = 1; // e.g., the container id, or the thread id
+	required int32 resource_memory = 2; // megabytes
+	required string node_id        = 3;
+}
+
+message ResourceStatusProto {
+	required string identifier   = 1;
+	required State  state        = 2;
+	optional string diagnostics  = 3;
+	optional int32  exit_code    = 4;
+	optional bool is_from_previous_driver = 5;
+}
+
+message RuntimeStatusProto {
+   required string name  = 1;   // e.g., local, yarn21
+   required State  state = 2;
+   optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error
+
+   optional int32 outstanding_container_requests = 5;
+   repeated string container_allocation = 6;
+}
+
+//////////////////////////////////////////////////////
+// Messages from Driver Process -> Driver Runtime
+
+message ResourceRequestProto {
+	// optional SIZE resource_size   = 1; // Removed in REEF 0.3 in favor of memory_size.
+    optional int32 memory_size = 2; // Memory size of the evaluator in MB
+    optional int32 priority       = 3;
+
+    required int32 resource_count = 5;
+	repeated string node_name     = 6; // a list of specific nodes
+	repeated string rack_name     = 7; // a list of specific racks
+
+    optional bool relax_locality = 10;
+}
+
+message ResourceReleaseProto {
+	required string identifier = 1;
+}
+
+message ResourceLaunchProto {
+	required string identifier      = 1;
+	required string remote_id       = 2;
+	required string evaluator_conf  = 3;
+    required ProcessType type       = 4;
+	repeated FileResourceProto file = 10;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto
new file mode 100644
index 0000000..1415e5c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "EvaluatorRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "reef_service_protos.proto";
+
+// Stop the evaluator
+message StopEvaluatorProto {
+}
+
+// Kill the evaluator
+message KillEvaluatorProto {
+}
+
+// Start a task
+message StartTaskProto {
+    required string context_id = 1;
+    required string configuration = 2;
+}
+
+message AddContextProto {
+    required string parent_context_id = 1;
+    required string context_configuration = 2;
+    optional string service_configuration = 3;
+}
+
+message RemoveContextProto {
+    required string context_id = 1;
+}
+
+// Stop the task
+message StopTaskProto {
+}
+
+// Suspend the task
+message SuspendTaskProto {
+}
+
+/////////////////////////////////////////
+// Message aggregators
+
+message ContextMessageProto {
+    required string context_id = 1;
+    required bytes message = 2;
+}
+
+message ContextControlProto {
+    optional bytes task_message = 1;
+    optional ContextMessageProto context_message = 2;
+
+    optional AddContextProto    add_context    = 5;
+    optional RemoveContextProto remove_context = 6;
+    optional StartTaskProto     start_task     = 7;
+    optional StopTaskProto      stop_task      = 8;
+    optional SuspendTaskProto   suspend_task   = 9;
+}
+
+message EvaluatorHeartbeatProto {
+    required int64 timestamp = 1;
+    required EvaluatorStatusProto evaluator_status = 2;
+    repeated ContextStatusProto   context_status   = 3;
+    optional TaskStatusProto      task_status      = 4;
+    optional bool                 recovery         = 5;  
+}
+
+message EvaluatorControlProto {
+    required int64 timestamp = 1;
+    required string identifier = 2;
+
+    optional ContextControlProto context_control = 3;
+    optional KillEvaluatorProto kill_evaluator = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto
new file mode 100644
index 0000000..6b99415
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import "client_runtime.proto";
+
+import "evaluator_runtime.proto";
+
+import "reef_service_protos.proto";
+
+
+option java_package = "com.Org.Apache.REEF.proto";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+option java_outer_classname = "REEFProtocol";
+
+message REEFMessage {
+    // Messages defined in client_runtime.proto
+    optional JobSubmissionProto jobSubmission = 1;
+    optional JobControlProto jobControl = 2;
+    // Messages defined in reef_service_protos.proto
+    optional RuntimeErrorProto runtimeError = 3;
+    optional JobStatusProto jobStatus = 4;
+    // Messages from evaluator_runtime.proto
+    optional EvaluatorControlProto evaluatorControl = 5;
+    optional EvaluatorHeartbeatProto evaluatorHeartBeat = 6;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto
new file mode 100644
index 0000000..a553ca9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.reef.proto";
+
+option java_outer_classname = "ReefServiceProtos";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+enum State {
+    INIT = 0;
+    RUNNING = 1;
+    DONE = 2;
+    SUSPEND = 3;
+    FAILED = 4;
+    KILLED = 5;
+}
+
+enum FileType {
+    PLAIN = 0;
+    LIB = 1;
+    ARCHIVE = 2;
+}
+
+// Removed in REEF 0.3 in favor of explicit memory sizes.
+// enum SIZE {
+//    SMALL = 0;
+//    MEDIUM = 1;
+//    LARGE = 2;
+//    XLARGE = 3;
+//}
+
+enum ProcessType {
+    JVM = 0;
+    CLR = 1;
+}
+
+message FileResourceProto {
+    required FileType type = 1;
+    required string name = 2;
+    required string path = 3;
+}
+
+message RuntimeErrorProto {
+    required string name = 1; // e.g., local, yarn21
+    required string message = 2;
+    optional bytes exception = 3;
+
+    optional string identifier = 5; // e.g., evaluator id
+}
+
+message JobStatusProto {
+    required string identifier = 1;
+    required State state = 2;
+    optional bytes message = 3;
+    optional bytes exception = 4;
+}
+
+message ContextStatusProto {
+    enum State {
+        READY = 0;
+        DONE = 1;
+        FAIL = 2;
+    }
+    required State context_state = 1;
+
+    required string context_id = 2;
+    optional string parent_id = 3;
+
+    optional bytes error = 5; // when creating the context
+
+    optional bool recovery = 6;
+    // Context messages
+    message ContextMessageProto {
+        required string source_id = 1;
+        required bytes message = 2;
+    }
+    repeated ContextMessageProto context_message = 7;
+}
+
+message TaskStatusProto {
+    required string task_id = 1;
+    required string context_id = 2;
+    required State state = 3;
+    optional bytes result = 4; // e.g., return value from Task.call()
+    optional bool  recovery = 5;
+
+    // TaskMessageSource messages
+    message TaskMessageProto {
+        required string source_id = 1;
+        required bytes message = 2;
+    }
+    repeated TaskMessageProto task_message = 6;
+}
+
+message EvaluatorStatusProto {
+    required string evaluator_id = 1;
+    required State state = 2;
+    optional bytes error = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs
new file mode 100644
index 0000000..54aca4c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Globalization;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Runtime
+{
+    public class MachineStatus
+    {
+        private static PerformanceCounter _cpuCounter;
+
+        private static PerformanceCounter _ramCounter;
+
+        private static PerformanceCounter _processCpuCounter;
+
+        private static Process _process;
+
+        private static bool _checkStatus;
+
+        static MachineStatus()
+        {
+            _checkStatus = true;
+            _process = Process.GetCurrentProcess();
+            string processName = _process.ProcessName;
+
+            _cpuCounter = _cpuCounter ?? new PerformanceCounter()
+            {
+                CategoryName = "Processor",
+                CounterName = "% Processor Time",
+                InstanceName = "_Total",
+            };
+
+            _ramCounter = _ramCounter ?? new PerformanceCounter()
+            {
+                CategoryName = "Memory",
+                CounterName = "Available MBytes"
+            };
+
+            _processCpuCounter = _processCpuCounter ?? new PerformanceCounter()
+            {
+                CategoryName = "Process",
+                CounterName = "% Processor Time",
+                InstanceName = processName
+            };
+        }
+
+        public static string CurrentNodeCpuUsage
+        {
+            get
+            {
+                return _cpuCounter.NextValue() + "%";
+            }
+        }
+
+        public static string AvailableMemory
+        {
+            get
+            {
+                return _ramCounter.NextValue() + "MB";
+            }
+        }
+
+        public static string CurrentProcessMemoryUsage
+        {
+            get
+            {
+                return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+            }
+        }
+
+        public static string PeakProcessMemoryUsage
+        {
+            get
+            {
+                return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+            }
+        }
+
+        // this may not be accurate if there are multiple evaluator processes running on a single machine
+        public static string CurrentProcessCpuUsage
+        {
+            get
+            {
+                return ((float)_processCpuCounter.RawValue / 1000000.0) + "%";
+            }
+        }
+
+        public override string ToString()
+        {
+            string info = "No machine status information retrieved. Could be due to lack of admin right to get the info.";
+            if (_checkStatus)
+            {
+                try
+                {
+                    _process.Refresh();
+                    info = string.Format(
+                    CultureInfo.InvariantCulture,
+                    "current node is running at [{0}] CPU usage and with [{1}] memory available.{2}             current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]",
+                    CurrentNodeCpuUsage,
+                    AvailableMemory,
+                    Environment.NewLine,
+                    CurrentProcessCpuUsage,
+                    CurrentProcessMemoryUsage,
+                    PeakProcessMemoryUsage);
+                }
+                catch (Exception e)
+                {
+                    _checkStatus = false; // It only takes one exception to switch the cheking off for good.
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus)));
+                    // we do not want to crash the evealuator just because we cannot get the information.
+                    info = "Cannot obtain machine status due to error " + e.Message;
+                }
+            }
+            
+            return info;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs
new file mode 100644
index 0000000..97d705b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Evaluator
+{
+    public class Constants
+    {
+        public const string RootContextConfiguration = "RootContextConfiguration";
+
+        public const string EvaluatorIdentifier = "EvaluatorIdentifier";
+
+        public const string RootServiceConfiguration = "RootServiceConfiguration";
+
+        public const string TaskConfiguration = "TaskConfiguration";
+
+        public const string ContextIdentifier = "ContextIdentifier";
+
+        public const string ApplicationIdentifier = "ApplicationIdentifier";
+
+        public const int DefaultEvaluatorHeartbeatPeriodInMs = 4000;
+
+        public const int DefaultEvaluatorHeartbeatMaxRetry = 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs
new file mode 100644
index 0000000..217e24d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Evaluator;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Common
+{
+    public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime));
+        
+        private readonly string _evaluatorId;
+
+        private readonly ContextManager _contextManager;
+
+        private readonly HeartBeatManager _heartBeatManager;
+
+        private readonly IRemoteManager<REEFMessage> _remoteManager;
+
+        private readonly IClock _clock;
+
+        private State _state = State.INIT;
+
+        private IDisposable _evaluatorControlChannel; 
+
+        [Inject]
+        public EvaluatorRuntime(
+            ContextManager contextManager,
+            HeartBeatManager heartBeatManager)
+        {
+            using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
+            {
+                _clock = heartBeatManager.EvaluatorSettings.RuntimeClock;
+                _heartBeatManager = heartBeatManager;
+                _contextManager = contextManager;
+                _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId;
+                _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager;
+
+                ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver();
+
+                // subscribe to driver proto message
+                driverObserver.Subscribe(o => OnNext(o.Message));
+
+                // register the driver observer
+                _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver);
+
+                // start the hearbeat
+                _clock.ScheduleAlarm(0, heartBeatManager);
+            }
+        }
+
+        public State State
+        {
+            get
+            {
+                return _state;
+            }
+        }
+
+        public void Handle(EvaluatorControlProto message)
+        {
+            lock (_heartBeatManager)
+            {
+                LOGGER.Log(Level.Info, "Handle Evaluator control message");
+                if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
+                {
+                    Handle(new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId)));
+                }
+                else if (_state != State.RUNNING)
+                {
+                    Handle(new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state)));
+                }
+                else
+                {
+                    if (message.context_control != null)
+                    {
+                        LOGGER.Log(Level.Info, "Send task control message to ContextManager");
+                        try
+                        {
+                            _contextManager.HandleTaskControl(message.context_control);
+                            if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING)
+                            {
+                                LOGGER.Log(Level.Info, "Context stack is empty, done");
+                                _state = State.DONE;
+                                _heartBeatManager.OnNext(GetEvaluatorStatus());
+                                _clock.Dispose();
+                            }
+                        }
+                        catch (Exception e)
+                        {
+                            Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+                            Handle(e);
+                            Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER);
+                        }
+                    }
+                    if (message.kill_evaluator != null)
+                    {
+                        LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId));
+                        _state = State.KILLED;
+                        _clock.Dispose();
+                    }
+                }
+            }
+        }
+
+        public EvaluatorStatusProto GetEvaluatorStatus()
+        {
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state));
+            EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
+            {
+                evaluator_id = _evaluatorId,
+                state = _state
+            };
+            return evaluatorStatusProto;
+        }
+
+        public void OnNext(RuntimeStart runtimeStart)
+        {
+            lock (_evaluatorId)
+            {
+                try
+                {
+                    LOGGER.Log(Level.Info, "Runtime start");
+                    if (_state != State.INIT)
+                    {
+                        var e = new InvalidOperationException("State should be init.");
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                    }
+                    _state = State.RUNNING;
+                    _contextManager.Start();
+                    _heartBeatManager.OnNext();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+                    Handle(e);
+                }
+            }
+        }
+
+        void IObserver<RuntimeStart>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<REEFMessage>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<REEFMessage>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStop>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStop>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStart>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnNext(RuntimeStop runtimeStop)
+        {
+            LOGGER.Log(Level.Info, "Runtime stop");
+            _contextManager.Dispose();
+
+            if (_state == State.RUNNING)
+            {
+                _state = State.DONE;
+                _heartBeatManager.OnNext();
+            }
+            try
+            {
+                _evaluatorControlChannel.Dispose();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER);
+            }
+            LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete");        
+        }
+
+        public void OnNext(REEFMessage value)
+        {
+            if (value != null && value.evaluatorControl != null)
+            {
+                LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl");
+                Handle(value.evaluatorControl);
+            }
+        }
+
+        private void Handle(Exception e)
+        {
+            lock (_heartBeatManager)
+            {
+                LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e);
+                _state = State.FAILED;
+                string errorMessage = string.Format(
+                        CultureInfo.InvariantCulture,
+                        "failed with error [{0}] with mesage [{1}] and stack trace [{2}]",
+                        e,
+                        e.Message,
+                        e.StackTrace);
+                EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
+                {
+                    evaluator_id = _evaluatorId,
+                    error = ByteUtilities.StringToByteArrays(errorMessage),
+                    state = _state
+                };
+                _heartBeatManager.OnNext(evaluatorStatusProto);
+                _contextManager.Dispose();
+            }       
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs
new file mode 100644
index 0000000..bc939d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Time;
+using System;
+
+namespace Org.Apache.REEF.Evaluator
+{
+    // TODO: merge with EvaluatorConfigurations class
+    public class EvaluatorSettings
+    {
+        private string _applicationId;
+
+        private string _evaluatorId;
+
+        private int _heartBeatPeriodInMs;
+
+        private int _maxHeartbeatRetries;
+
+        private ContextConfiguration _rootContextConfig;
+
+        private IClock _clock;
+
+        private IRemoteManager<REEFMessage> _remoteManager;
+
+        private IInjector _injector;
+
+        private EvaluatorOperationState _operationState;
+
+        private INameClient _nameClient;
+
+        public EvaluatorSettings(
+            string applicationId,
+            string evaluatorId,
+            int heartbeatPeriodInMs,
+            int maxHeartbeatRetries,
+            ContextConfiguration rootContextConfig,
+            IClock clock,
+            IRemoteManager<REEFMessage> remoteManager,
+            IInjector injecor)
+        {
+            if (string.IsNullOrWhiteSpace(evaluatorId))
+            {
+                throw new ArgumentNullException("evaluatorId");
+            }
+            if (rootContextConfig == null)
+            {
+                throw new ArgumentNullException("rootContextConfig");
+            }
+            if (clock == null)
+            {
+                throw new ArgumentNullException("clock");
+            }
+            if (remoteManager == null)
+            {
+                throw new ArgumentNullException("remoteManager");
+            }
+            if (injecor == null)
+            {
+                throw new ArgumentNullException("injecor");
+            }
+            _applicationId = applicationId;
+            _evaluatorId = evaluatorId;
+            _heartBeatPeriodInMs = heartbeatPeriodInMs;
+            _maxHeartbeatRetries = maxHeartbeatRetries;
+            _rootContextConfig = rootContextConfig;
+            _clock = clock;
+            _remoteManager = remoteManager;
+            _injector = injecor;
+            _operationState = EvaluatorOperationState.OPERATIONAL;
+        }
+
+        public EvaluatorOperationState OperationState
+        {
+            get
+            {
+                return _operationState;
+            }
+
+            set
+            {
+                _operationState = value;
+            }
+        }
+
+        public string EvalutorId
+        {
+            get
+            {
+                return _evaluatorId;
+            }
+        }
+
+        public int HeartBeatPeriodInMs
+        {
+            get
+            {
+                return _heartBeatPeriodInMs;
+            }
+        }
+
+        public string ApplicationId
+        {
+            get
+            {
+                return _applicationId;
+            }
+        }
+
+        public int MaxHeartbeatFailures
+        {
+            get
+            {
+                return _maxHeartbeatRetries;
+            }
+        }
+
+        public ContextConfiguration RootContextConfig
+        {
+            get
+            {
+                return _rootContextConfig;
+            }
+        }
+
+        public IClock RuntimeClock
+        {
+            get
+            {
+                return _clock;
+            }
+        }
+
+        public INameClient NameClient
+        {
+            get
+            {
+                return _nameClient;
+            }
+
+            set
+            {
+                _nameClient = value;
+            }
+        }
+
+        public IRemoteManager<REEFMessage> RemoteManager
+        {
+            get
+            {
+                return _remoteManager;
+            }
+        }
+
+        public IInjector Injector
+        {
+            get
+            {
+                return _injector;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs
new file mode 100644
index 0000000..6d2121e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Runtime;
+using Org.Apache.REEF.Evaluator;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.Linq;
+using System.Net;
+using System.Threading;
+
+namespace Org.Apache.REEF.Common
+{
+    public class HeartBeatManager : IObserver<Alarm>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager));
+
+        private static readonly MachineStatus MachineStatus = new MachineStatus();
+
+        private readonly IRemoteManager<REEFMessage> _remoteManager;
+
+        private readonly IClock _clock;
+
+        private readonly int _heartBeatPeriodInMillSeconds;
+
+        private readonly int _maxHeartbeatRetries = 0;
+
+        private readonly string _evaluatorId;
+
+        private IRemoteIdentifier _remoteId;
+
+        private IObserver<REEFMessage> _observer;
+
+        private int _heartbeatFailures = 0;
+
+        private IDriverConnection _driverConnection;
+
+        private EvaluatorSettings _evaluatorSettings;
+
+        // the queue can only contains the following:
+        // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state
+        // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat)
+        private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>();
+
+        public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId)
+        {
+            using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager"))
+            {
+                _remoteManager = settings.RemoteManager;
+                _remoteId = remoteId;
+                _evaluatorId = settings.EvalutorId;
+                _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
+                _clock = settings.RuntimeClock;
+                _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
+                _maxHeartbeatRetries = settings.MaxHeartbeatFailures;
+                EvaluatorSettings = settings;
+                MachineStatus.ToString(); // kick start the CPU perf counter
+            }
+        }
+
+        [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
+        public EvaluatorRuntime _evaluatorRuntime { get; set; }
+
+        [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
+        public ContextManager _contextManager { get; set; }
+
+        public EvaluatorSettings EvaluatorSettings
+        {
+            get
+            {
+                return _evaluatorSettings;
+            }
+
+            private set
+            {
+                _evaluatorSettings = value;
+            }
+        }
+
+        public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
+        {
+            lock (_queuedHeartbeats)
+            {
+                if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
+                {
+                    LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto));
+                    _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
+                    return;
+                }
+
+                // NOT during recovery, try to send
+                REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto);
+                try
+                {
+                    _observer.OnNext(payload);
+                    _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
+                }
+                catch (Exception e)
+                {
+                    if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING)
+                    {
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER);
+                    }
+
+                    _heartbeatFailures++;
+
+                    _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
+                    LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
+
+                    if (_heartbeatFailures >= _maxHeartbeatRetries)
+                    {
+                        LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures));
+                        try
+                        {
+                            _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>();
+                        }
+                        catch (Exception ex)
+                        {
+                            Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER);
+                        }
+                        LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
+                        _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
+
+                        // clean heartbeat failure
+                        _heartbeatFailures = 0;
+                    }
+                }
+            }     
+        }
+
+        /// <summary>
+        /// Assemble a complete new heartbeat and send it out.
+        /// </summary>
+        public void OnNext()
+        {
+            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()");
+                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto();
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        /// <summary>
+        /// Called with a specific TaskStatus that must be delivered to the driver
+        /// </summary>
+        /// <param name="taskStatusProto"></param>
+        public void OnNext(TaskStatusProto taskStatusProto)
+        {
+            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)");
+                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
+                    _evaluatorRuntime.GetEvaluatorStatus(),
+                    _contextManager.GetContextStatusCollection(),
+                     Optional<TaskStatusProto>.Of(taskStatusProto));
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        /// <summary>
+        ///  Called with a specific ContextStatusProto that must be delivered to the driver
+        /// </summary>
+        /// <param name="contextStatusProto"></param>
+        public void OnNext(ContextStatusProto contextStatusProto)
+        {
+            LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)");
+                List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>();
+                contextStatusProtos.Add(contextStatusProto);
+                contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection());
+                EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
+                    _evaluatorRuntime.GetEvaluatorStatus(),
+                    contextStatusProtos,
+                    Optional<TaskStatusProto>.Empty());
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        /// <summary>
+        /// Called with a specific EvaluatorStatus that must be delivered to the driver
+        /// </summary>
+        /// <param name="evaluatorStatusProto"></param>
+        public void OnNext(EvaluatorStatusProto evaluatorStatusProto)
+        {
+            LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)");
+                EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto()
+                {
+                    timestamp = CurrentTimeMilliSeconds(),
+                    evaluator_status = evaluatorStatusProto
+                };
+                LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+                Send(heartbeatProto);
+            }
+        }
+
+        public void OnNext(Alarm value)
+        {
+            LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)");
+            lock (this)
+            {
+                LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)");
+                if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING)
+                {
+                    EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
+                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString()));
+                    Send(evaluatorHeartbeatProto);
+                    _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
+                }
+                else
+                {
+                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState,  _evaluatorRuntime.State));
+                    try
+                    {
+                        DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId);
+                        if (driverInformation == null)
+                        {
+                            LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
+                        }
+                        else
+                        {
+                            LOGGER.Log(
+                                Level.Info, 
+                                string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
+                            Recover(driverInformation);
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        // we do not want any exception to stop the query for driver status
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER);
+                    }
+                    _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
+                }
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        private static long CurrentTimeMilliSeconds()
+        {
+            // this is an implmenation to get current time milli second counted from Jan 1st, 1970
+            // it is chose as such to be compatible with java implmentation
+            DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+            return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
+        }
+
+        private void Recover(DriverInformation driverInformation)
+        {
+            IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier);
+            _remoteId = new SocketRemoteIdentifier(driverEndpoint);
+            _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
+            lock (_evaluatorSettings)
+            {
+                if (_evaluatorSettings.NameClient != null)
+                {
+                    try
+                    {
+                        LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId);
+                        _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId));
+                        LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId);
+                    }
+                    catch (Exception e)
+                    {
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+                    }
+                }
+            }
+
+            lock (_queuedHeartbeats)
+            {
+                bool firstHeartbeatInQueue = true;
+                while (_queuedHeartbeats.Any())
+                {
+                    LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId);
+                    try
+                    {
+                        if (firstHeartbeatInQueue)
+                        {
+                            // first heartbeat is specially construted to include the recovery flag
+                            EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
+                            LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
+                            _observer.OnNext(new REEFMessage(recoveryHeartbeat));
+                            firstHeartbeatInQueue = false;
+                        }
+                        else
+                        {
+                            _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue()));
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        // we do not handle failures during RECOVERY 
+                        Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(
+                            e,
+                            Level.Error,
+                            string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId),
+                            LOGGER);
+                    }
+                    Thread.Sleep(500);
+                }
+            }        
+            _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
+            LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
+        }
+
+        private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
+        {
+            heartbeat.recovery = true;
+            heartbeat.context_status.ForEach(c => c.recovery = true);
+            heartbeat.task_status.recovery = true;
+            return heartbeat;
+        }
+
+        private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto()
+        {
+            return GetEvaluatorHeartbeatProto(
+                _evaluatorRuntime.GetEvaluatorStatus(),
+                _contextManager.GetContextStatusCollection(),
+                _contextManager.GetTaskStatus());
+        }
+
+        private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto(
+            EvaluatorStatusProto evaluatorStatusProto,
+            ICollection<ContextStatusProto> contextStatusProtos,
+            Optional<TaskStatusProto> taskStatusProto)
+        {
+            EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto()
+            {
+                timestamp = CurrentTimeMilliSeconds(),
+                evaluator_status = evaluatorStatusProto
+            };
+            foreach (ContextStatusProto contextStatusProto in contextStatusProtos)
+            {
+                evaluatorHeartbeatProto.context_status.Add(contextStatusProto);
+            }
+            if (taskStatusProto.IsPresent())
+            {
+                evaluatorHeartbeatProto.task_status = taskStatusProto.Value;
+            }
+            return evaluatorHeartbeatProto;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs
new file mode 100644
index 0000000..5593e08
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Globalization;
+using System.Threading;
+
+namespace Org.Apache.REEF.Common
+{
+    public class ReefMessageProtoObserver :
+        IObserver<IRemoteMessage<REEFMessage>>,
+        IObservable<IRemoteMessage<REEFMessage>>,
+        IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver));
+        private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null;
+        private long _count = 0;
+        private DateTime _begin;
+        private DateTime _origBegin;
+
+        public void OnCompleted()
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnNext(IRemoteMessage<REEFMessage> value)
+        {
+            REEFMessage remoteEvent = value.Message;
+            IRemoteIdentifier id = value.Identifier;
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id));
+
+            if (remoteEvent.evaluatorControl != null)
+            {
+                if (remoteEvent.evaluatorControl.context_control != null)
+                {
+                    string context_message = null;
+                    string task_message = null;
+
+                    if (remoteEvent.evaluatorControl.context_control.context_message != null)
+                    {
+                        context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString();
+                    }
+                    if (remoteEvent.evaluatorControl.context_control.task_message != null)
+                    {
+                        task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message);
+                    }
+
+                    if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message)))
+                    {
+                        LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message));
+                    }                   
+                    else if (remoteEvent.evaluatorControl.context_control.remove_context != null)
+                    {
+                         LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id));
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.add_context != null)
+                    {
+                        LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id));
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.start_task != null)
+                    {
+                        LOGGER.Log(Level.Info, 
+                            string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id));
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.stop_task != null)
+                    {
+                        LOGGER.Log(Level.Info, "Control protobuf to stop task");
+                    }
+                    else if (remoteEvent.evaluatorControl.context_control.suspend_task != null)
+                    {
+                        LOGGER.Log(Level.Info, "Control protobuf to suspend task"); 
+                    }
+                }
+            } 
+            if (_count == 0)
+            {
+                _begin = DateTime.Now;
+                _origBegin = _begin;
+            }
+            var count = Interlocked.Increment(ref _count);
+
+            int printBatchSize = 100000;
+            if (count % printBatchSize == 0)
+            {
+                DateTime end = DateTime.Now;
+                var diff = (end - _begin).TotalMilliseconds;
+                double seconds = diff / 1000.0;
+                long eventsPerSecond = (long)(printBatchSize / seconds);
+                _begin = DateTime.Now;
+            }
+
+            var observer = _observer;
+            if (observer != null)
+            {
+                observer.OnNext(value);
+            }
+        }
+
+        public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer)
+        {
+            if (_observer != null)
+            {
+                return null;
+            }
+            _observer = observer;
+            return this;
+        }
+
+        public void Dispose()
+        {
+            _observer = null;
+        }
+    }
+}