You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:43:12 UTC
[28/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code
base
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/EvaluatorRunTime.pb.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/EvaluatorRunTime.pb.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/EvaluatorRunTime.pb.cs
new file mode 100644
index 0000000..5e6007f
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/EvaluatorRunTime.pb.cs
@@ -0,0 +1,305 @@
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+// Generated from: evaluator_runtime.proto
+// Note: requires additional types generated from: reef_service_protos.proto
+namespace Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto
+{
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"StopEvaluatorProto")]
+ public partial class StopEvaluatorProto : global::ProtoBuf.IExtensible
+ {
+ public StopEvaluatorProto() {}
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"KillEvaluatorProto")]
+ public partial class KillEvaluatorProto : global::ProtoBuf.IExtensible
+ {
+ public KillEvaluatorProto() {}
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"StartTaskProto")]
+ public partial class StartTaskProto : global::ProtoBuf.IExtensible
+ {
+ public StartTaskProto() {}
+
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private string _configuration;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"configuration", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string configuration
+ {
+ get { return _configuration; }
+ set { _configuration = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"AddContextProto")]
+ public partial class AddContextProto : global::ProtoBuf.IExtensible
+ {
+ public AddContextProto() {}
+
+ private string _parent_context_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"parent_context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string parent_context_id
+ {
+ get { return _parent_context_id; }
+ set { _parent_context_id = value; }
+ }
+ private string _context_configuration;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_configuration", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_configuration
+ {
+ get { return _context_configuration; }
+ set { _context_configuration = value; }
+ }
+ private string _service_configuration = "";
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"service_configuration", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string service_configuration
+ {
+ get { return _service_configuration; }
+ set { _service_configuration = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"RemoveContextProto")]
+ public partial class RemoveContextProto : global::ProtoBuf.IExtensible
+ {
+ public RemoveContextProto() {}
+
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"StopTaskProto")]
+ public partial class StopTaskProto : global::ProtoBuf.IExtensible
+ {
+ public StopTaskProto() {}
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"SuspendTaskProto")]
+ public partial class SuspendTaskProto : global::ProtoBuf.IExtensible
+ {
+ public SuspendTaskProto() {}
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextMessageProto")]
+ public partial class ContextMessageProto : global::ProtoBuf.IExtensible
+ {
+ public ContextMessageProto() {}
+
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private byte[] _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextControlProto")]
+ public partial class ContextControlProto : global::ProtoBuf.IExtensible
+ {
+ public ContextControlProto() {}
+
+ private byte[] _task_message = null;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = false, Name=@"task_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] task_message
+ {
+ get { return _task_message; }
+ set { _task_message = value; }
+ }
+ private ContextMessageProto _context_message = null;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = false, Name=@"context_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public ContextMessageProto context_message
+ {
+ get { return _context_message; }
+ set { _context_message = value; }
+ }
+ private AddContextProto _add_context = null;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"add_context", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public AddContextProto add_context
+ {
+ get { return _add_context; }
+ set { _add_context = value; }
+ }
+ private RemoveContextProto _remove_context = null;
+ [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name=@"remove_context", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public RemoveContextProto remove_context
+ {
+ get { return _remove_context; }
+ set { _remove_context = value; }
+ }
+ private StartTaskProto _start_task = null;
+ [global::ProtoBuf.ProtoMember(7, IsRequired = false, Name=@"start_task", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public StartTaskProto start_task
+ {
+ get { return _start_task; }
+ set { _start_task = value; }
+ }
+ private StopTaskProto _stop_task = null;
+ [global::ProtoBuf.ProtoMember(8, IsRequired = false, Name=@"stop_task", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public StopTaskProto stop_task
+ {
+ get { return _stop_task; }
+ set { _stop_task = value; }
+ }
+ private SuspendTaskProto _suspend_task = null;
+ [global::ProtoBuf.ProtoMember(9, IsRequired = false, Name=@"suspend_task", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public SuspendTaskProto suspend_task
+ {
+ get { return _suspend_task; }
+ set { _suspend_task = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"EvaluatorHeartbeatProto")]
+ public partial class EvaluatorHeartbeatProto : global::ProtoBuf.IExtensible
+ {
+ public EvaluatorHeartbeatProto() {}
+
+ private long _timestamp;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"timestamp", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public long timestamp
+ {
+ get { return _timestamp; }
+ set { _timestamp = value; }
+ }
+ private EvaluatorStatusProto _evaluator_status;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"evaluator_status", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public EvaluatorStatusProto evaluator_status
+ {
+ get { return _evaluator_status; }
+ set { _evaluator_status = value; }
+ }
+ private readonly global::System.Collections.Generic.List<ContextStatusProto> _context_status = new global::System.Collections.Generic.List<ContextStatusProto>();
+ [global::ProtoBuf.ProtoMember(3, Name=@"context_status", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public global::System.Collections.Generic.List<ContextStatusProto> context_status
+ {
+ get { return _context_status; }
+ }
+
+ private TaskStatusProto _task_status = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"task_status", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public TaskStatusProto task_status
+ {
+ get { return _task_status; }
+ set { _task_status = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+
+ private bool _recovery;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public bool recovery
+ {
+ get { return _recovery; }
+ set { _recovery = value; }
+ }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"EvaluatorControlProto")]
+ public partial class EvaluatorControlProto : global::ProtoBuf.IExtensible
+ {
+ public EvaluatorControlProto() {}
+
+ private long _timestamp;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"timestamp", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public long timestamp
+ {
+ get { return _timestamp; }
+ set { _timestamp = value; }
+ }
+ private string _identifier;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string identifier
+ {
+ get { return _identifier; }
+ set { _identifier = value; }
+ }
+ private ContextControlProto _context_control = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"context_control", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public ContextControlProto context_control
+ {
+ get { return _context_control; }
+ set { _context_control = value; }
+ }
+ private KillEvaluatorProto _kill_evaluator = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"kill_evaluator", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public KillEvaluatorProto kill_evaluator
+ {
+ get { return _kill_evaluator; }
+ set { _kill_evaluator = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefProtocol.pb.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefProtocol.pb.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefProtocol.pb.cs
new file mode 100644
index 0000000..b5a9daa
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefProtocol.pb.cs
@@ -0,0 +1,78 @@
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: reef_protocol.proto
+// Note: requires additional types generated from: client_runtime.proto
+// Note: requires additional types generated from: evaluator_runtime.proto
+// Note: requires additional types generated from: reef_service_protos.proto
+using Org.Apache.Reef.Common.ProtoBuf.ClienRuntimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+
+namespace Org.Apache.Reef.Common.ProtoBuf.ReefProtocol
+{
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"REEFMessage")]
+ public partial class REEFMessage : global::ProtoBuf.IExtensible
+ {
+ public REEFMessage() {}
+
+ private JobSubmissionProto _jobSubmission = null;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = false, Name=@"jobSubmission", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public JobSubmissionProto jobSubmission
+ {
+ get { return _jobSubmission; }
+ set { _jobSubmission = value; }
+ }
+ private JobControlProto _jobControl = null;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = false, Name=@"jobControl", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public JobControlProto jobControl
+ {
+ get { return _jobControl; }
+ set { _jobControl = value; }
+ }
+ private RuntimeErrorProto _runtimeError = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"runtimeError", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public RuntimeErrorProto runtimeError
+ {
+ get { return _runtimeError; }
+ set { _runtimeError = value; }
+ }
+ private JobStatusProto _jobStatus = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"jobStatus", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public JobStatusProto jobStatus
+ {
+ get { return _jobStatus; }
+ set { _jobStatus = value; }
+ }
+ private EvaluatorControlProto _evaluatorControl = null;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"evaluatorControl", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public EvaluatorControlProto evaluatorControl
+ {
+ get { return _evaluatorControl; }
+ set { _evaluatorControl = value; }
+ }
+ private EvaluatorHeartbeatProto _evaluatorHeartBeat = null;
+ [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name=@"evaluatorHeartBeat", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public EvaluatorHeartbeatProto evaluatorHeartBeat
+ {
+ get { return _evaluatorHeartBeat; }
+ set { _evaluatorHeartBeat = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefService.pb.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefService.pb.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefService.pb.cs
new file mode 100644
index 0000000..d0dd8df
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/ReefService.pb.cs
@@ -0,0 +1,393 @@
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: reef_service_protos.proto
+namespace Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto
+
+{
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"FileResourceProto")]
+ public partial class FileResourceProto : global::ProtoBuf.IExtensible
+ {
+ public FileResourceProto() {}
+
+ private FileType _type;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"type", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public FileType type
+ {
+ get { return _type; }
+ set { _type = value; }
+ }
+ private string _name;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string name
+ {
+ get { return _name; }
+ set { _name = value; }
+ }
+ private string _path;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"path", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string path
+ {
+ get { return _path; }
+ set { _path = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"RuntimeErrorProto")]
+ public partial class RuntimeErrorProto : global::ProtoBuf.IExtensible
+ {
+ public RuntimeErrorProto() {}
+
+ private string _name;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string name
+ {
+ get { return _name; }
+ set { _name = value; }
+ }
+ private string _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private byte[] _exception = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] exception
+ {
+ get { return _exception; }
+ set { _exception = value; }
+ }
+ private string _identifier = "";
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string identifier
+ {
+ get { return _identifier; }
+ set { _identifier = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"JobStatusProto")]
+ public partial class JobStatusProto : global::ProtoBuf.IExtensible
+ {
+ public JobStatusProto() {}
+
+ private string _identifier;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string identifier
+ {
+ get { return _identifier; }
+ set { _identifier = value; }
+ }
+ private State _state;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public State state
+ {
+ get { return _state; }
+ set { _state = value; }
+ }
+ private byte[] _message = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private byte[] _exception = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] exception
+ {
+ get { return _exception; }
+ set { _exception = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextStatusProto")]
+ public partial class ContextStatusProto : global::ProtoBuf.IExtensible
+ {
+ public ContextStatusProto() {}
+
+ private ContextStatusProto.State _context_state;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public ContextStatusProto.State context_state
+ {
+ get { return _context_state; }
+ set { _context_state = value; }
+ }
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private string _parent_id = "";
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"parent_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string parent_id
+ {
+ get { return _parent_id; }
+ set { _parent_id = value; }
+ }
+ private byte[] _error = null;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] error
+ {
+ get { return _error; }
+ set { _error = value; }
+ }
+ private bool _recovery = false;
+ [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool recovery
+ {
+ get { return _recovery; }
+ set { _recovery = value; }
+ }
+ private readonly global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> _context_message = new global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto>();
+ [global::ProtoBuf.ProtoMember(7, Name=@"context_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> context_message
+ {
+ get { return _context_message; }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextMessageProto")]
+ public partial class ContextMessageProto : global::ProtoBuf.IExtensible
+ {
+ public ContextMessageProto() {}
+
+ private string _source_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string source_id
+ {
+ get { return _source_id; }
+ set { _source_id = value; }
+ }
+ private byte[] _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"State")]
+ public enum State
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"READY", Value=0)]
+ READY = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=1)]
+ DONE = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"FAIL", Value=2)]
+ FAIL = 2
+ }
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskStatusProto")]
+ public partial class TaskStatusProto : global::ProtoBuf.IExtensible
+ {
+ public TaskStatusProto() {}
+
+ private string _task_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"task_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string task_id
+ {
+ get { return _task_id; }
+ set { _task_id = value; }
+ }
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private State _state;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public State state
+ {
+ get { return _state; }
+ set { _state = value; }
+ }
+ private byte[] _result = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"result", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] result
+ {
+ get { return _result; }
+ set { _result = value; }
+ }
+ private bool _recovery = false;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool recovery
+ {
+ get { return _recovery; }
+ set { _recovery = value; }
+ }
+ private readonly global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> _task_message = new global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto>();
+ [global::ProtoBuf.ProtoMember(6, Name=@"task_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> task_message
+ {
+ get { return _task_message; }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskMessageProto")]
+ public partial class TaskMessageProto : global::ProtoBuf.IExtensible
+ {
+ public TaskMessageProto() {}
+
+ private string _source_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string source_id
+ {
+ get { return _source_id; }
+ set { _source_id = value; }
+ }
+ private byte[] _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"EvaluatorStatusProto")]
+ public partial class EvaluatorStatusProto : global::ProtoBuf.IExtensible
+ {
+ public EvaluatorStatusProto() {}
+
+ private string _evaluator_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"evaluator_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string evaluator_id
+ {
+ get { return _evaluator_id; }
+ set { _evaluator_id = value; }
+ }
+ private State _state;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public State state
+ {
+ get { return _state; }
+ set { _state = value; }
+ }
+ private byte[] _error = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] error
+ {
+ get { return _error; }
+ set { _error = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"State")]
+ public enum State
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"INIT", Value=0)]
+ INIT = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"RUNNING", Value=1)]
+ RUNNING = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=2)]
+ DONE = 2,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"SUSPEND", Value=3)]
+ SUSPEND = 3,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"FAILED", Value=4)]
+ FAILED = 4,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"KILLED", Value=5)]
+ KILLED = 5
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"FileType")]
+ public enum FileType
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"PLAIN", Value=0)]
+ PLAIN = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"LIB", Value=1)]
+ LIB = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"ARCHIVE", Value=2)]
+ ARCHIVE = 2
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"SIZE")]
+ public enum SIZE
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"SMALL", Value=0)]
+ SMALL = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"MEDIUM", Value=1)]
+ MEDIUM = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"LARGE", Value=2)]
+ LARGE = 2,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"XLARGE", Value=3)]
+ XLARGE = 3
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"ProcessType")]
+ public enum ProcessType
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"JVM", Value=0)]
+ JVM = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"CLR", Value=1)]
+ CLR = 1
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/Serializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/Serializer.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/Serializer.cs
new file mode 100644
index 0000000..1af9a62
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/Serializer.cs
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Utilities;
+
+using ProtoBuf;
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.IO;
+using System.Text;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1403:FileMayOnlyContainASingleNamespace", Justification = "Serializers for all protobuf types")]
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "Serializers for all protobuf types")]
+
+namespace Org.Apache.Reef.Common.ProtoBuf.ReefProtocol
+{
+ /// <summary>
+ /// Add serializer/deserializer to REEFMessage
+ /// </summary>
+ public partial class REEFMessage
+ {
+ public REEFMessage(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
+ {
+ _evaluatorHeartBeat = evaluatorHeartbeatProto;
+ }
+
+ public static REEFMessage Deserialize(byte[] bytes)
+ {
+ REEFMessage pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<REEFMessage>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ byte[] b = null;
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize<REEFMessage>(s, this);
+ b = new byte[s.Position];
+ var fullBuffer = s.GetBuffer();
+ Array.Copy(fullBuffer, b, b.Length);
+ }
+ return b;
+ }
+ }
+}
+
+namespace Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto
+{
+ /// <summary>
+ /// Add serializer/deserializer to EvaluatorHeartbeatProto
+ /// </summary>
+ public partial class EvaluatorHeartbeatProto
+ {
+ public static EvaluatorHeartbeatProto Deserialize(byte[] bytes)
+ {
+ EvaluatorHeartbeatProto pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<EvaluatorHeartbeatProto>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ byte[] b = null;
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize<EvaluatorHeartbeatProto>(s, this);
+ b = new byte[s.Position];
+ var fullBuffer = s.GetBuffer();
+ Array.Copy(fullBuffer, b, b.Length);
+ }
+ return b;
+ }
+
+ public override string ToString()
+ {
+ string contextStatus = string.Empty;
+ string taskStatusMessage = string.Empty;
+ foreach (ContextStatusProto contextStatusProto in context_status)
+ {
+ contextStatus += string.Format(CultureInfo.InvariantCulture, "evaluator {0} has context {1} in state {2} with recovery flag {3}",
+ evaluator_status.evaluator_id,
+ contextStatusProto.context_id,
+ contextStatusProto.context_state,
+ contextStatusProto.recovery);
+ }
+ if (task_status != null && task_status.task_message != null && task_status.task_message.Count > 0)
+ {
+ foreach (TaskStatusProto.TaskMessageProto taskMessageProto in task_status.task_message)
+ {
+ taskStatusMessage += ByteUtilities.ByteArrarysToString(taskMessageProto.message);
+ }
+ }
+ return string.Format(CultureInfo.InvariantCulture, "EvaluatorHeartbeatProto: task_id=[{0}], task_status=[{1}], task_message=[{2}], evaluator_status=[{3}], context_status=[{4}], timestamp=[{5}], recoveryFlag =[{6}]",
+ task_status == null ? string.Empty : task_status.task_id,
+ task_status == null ? string.Empty : task_status.state.ToString(),
+ taskStatusMessage,
+ evaluator_status.state.ToString(),
+ contextStatus,
+ timestamp,
+ recovery);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
new file mode 100644
index 0000000..9dfe708
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Wake.Remote;
+
+namespace Org.Apache.Reef.Common.ProtoBuf.ReefProtocol
+{
+ public class EvaluatorHeartbeatProtoCodec : ICodec<EvaluatorHeartbeatProto>
+ {
+ public byte[] Encode(EvaluatorHeartbeatProto obj)
+ {
+ EvaluatorHeartbeatProto pbuf = new EvaluatorHeartbeatProto();
+
+ pbuf.evaluator_status = obj.evaluator_status;
+ return pbuf.Serialize();
+ }
+
+ public EvaluatorHeartbeatProto Decode(byte[] data)
+ {
+ EvaluatorHeartbeatProto pbuf = EvaluatorHeartbeatProto.Deserialize(data);
+ return pbuf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/REEFMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/REEFMessageCodec.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/REEFMessageCodec.cs
new file mode 100644
index 0000000..84d6d05
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/cs/codec/REEFMessageCodec.cs
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake.Remote;
+
+namespace Org.Apache.Reef.Common.ProtoBuf.ReefProtocol
+{
+ public class REEFMessageCodec : ICodec<REEFMessage>
+ {
+ public byte[] Encode(REEFMessage obj)
+ {
+ return obj.Serialize();
+ }
+
+ public REEFMessage Decode(byte[] data)
+ {
+ REEFMessage pbuf = REEFMessage.Deserialize(data);
+ return pbuf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/client_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/client_runtime.proto b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/client_runtime.proto
new file mode 100644
index 0000000..3d1f927
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/client_runtime.proto
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "ClientRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "reef_service_protos.proto";
+
+// Messages from REEF Client -> Driver Runtime
+
+message JobSubmissionProto {
+ required string identifier = 1; // the job identifier
+ required string remote_id = 2; // the remote identifier
+ required string configuration = 5; // the runtime configuration
+ required string user_name = 6; // the user name
+
+ optional SIZE driver_size = 7;
+ optional int32 driver_memory = 8;
+ optional int32 priority = 9;
+ optional string queue = 10;
+
+ repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators
+ repeated FileResourceProto local_File = 12; // files that should be placed on the driver only
+
+}
+
+enum Signal {
+ SIG_TERMINATE = 1;
+ SIG_SUSPEND = 2;
+ SIG_RESUME = 3;
+}
+
+message JobControlProto {
+ required string identifier = 1;
+ optional Signal signal = 2;
+ optional bytes message = 3;
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/driver_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/driver_runtime.proto b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/driver_runtime.proto
new file mode 100644
index 0000000..2b21ac7
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/driver_runtime.proto
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "DriverRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+
+import "reef_service_protos.proto";
+
+// Messages from Driver Runtime -> Driver Process
+
+message DriverProcessRegistrationProto {
+ required string remote_identifier = 1;
+}
+
+
+message NodeDescriptorProto {
+ required string identifier = 1;
+ required string host_name = 2; // e.g., IP address
+ required int32 port = 3; // e.g., IP port
+ required int32 memory_size = 4;
+ optional string rack_name = 5; // e.g., /default-rack
+}
+
+message ResourceAllocationProto {
+ required string identifier = 1; // e.g., the container id, or the thread id
+ required int32 resource_memory = 2; // megabytes
+ required string node_id = 3;
+}
+
+message ResourceStatusProto {
+ required string identifier = 1;
+ required State state = 2;
+ optional string diagnostics = 3;
+ optional int32 exit_code = 4;
+ optional bool is_from_previous_driver = 5;
+}
+
+message RuntimeStatusProto {
+ required string name = 1; // e.g., local, yarn21
+ required State state = 2;
+ optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error
+
+ optional int32 outstanding_container_requests = 5;
+ repeated string container_allocation = 6;
+}
+
+//////////////////////////////////////////////////////
+// Messages from Driver Process -> Driver Runtime
+
+message ResourceRequestProto {
+ // optional SIZE resource_size = 1; // Removed in REEF 0.3 in favor of memory_size.
+ optional int32 memory_size = 2; // Memory size of the evaluator in MB
+ optional int32 priority = 3;
+
+ required int32 resource_count = 5;
+ repeated string node_name = 6; // a list of specific nodes
+ repeated string rack_name = 7; // a list of specific racks
+
+ optional bool relax_locality = 10;
+}
+
+message ResourceReleaseProto {
+ required string identifier = 1;
+}
+
+message ResourceLaunchProto {
+ required string identifier = 1;
+ required string remote_id = 2;
+ required string evaluator_conf = 3;
+ required ProcessType type = 4;
+ repeated FileResourceProto file = 10;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/evaluator_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/evaluator_runtime.proto b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/evaluator_runtime.proto
new file mode 100644
index 0000000..1415e5c
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/evaluator_runtime.proto
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "EvaluatorRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "reef_service_protos.proto";
+
+// Stop the evaluator
+message StopEvaluatorProto {
+}
+
+// Kill the evaluator
+message KillEvaluatorProto {
+}
+
+// Start a task
+message StartTaskProto {
+ required string context_id = 1;
+ required string configuration = 2;
+}
+
+message AddContextProto {
+ required string parent_context_id = 1;
+ required string context_configuration = 2;
+ optional string service_configuration = 3;
+}
+
+message RemoveContextProto {
+ required string context_id = 1;
+}
+
+// Stop the task
+message StopTaskProto {
+}
+
+// Suspend the task
+message SuspendTaskProto {
+}
+
+/////////////////////////////////////////
+// Message aggregators
+
+message ContextMessageProto {
+ required string context_id = 1;
+ required bytes message = 2;
+}
+
+message ContextControlProto {
+ optional bytes task_message = 1;
+ optional ContextMessageProto context_message = 2;
+
+ optional AddContextProto add_context = 5;
+ optional RemoveContextProto remove_context = 6;
+ optional StartTaskProto start_task = 7;
+ optional StopTaskProto stop_task = 8;
+ optional SuspendTaskProto suspend_task = 9;
+}
+
+message EvaluatorHeartbeatProto {
+ required int64 timestamp = 1;
+ required EvaluatorStatusProto evaluator_status = 2;
+ repeated ContextStatusProto context_status = 3;
+ optional TaskStatusProto task_status = 4;
+ optional bool recovery = 5;
+}
+
+message EvaluatorControlProto {
+ required int64 timestamp = 1;
+ required string identifier = 2;
+
+ optional ContextControlProto context_control = 3;
+ optional KillEvaluatorProto kill_evaluator = 4;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_protocol.proto b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_protocol.proto
new file mode 100644
index 0000000..a442445
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_protocol.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import "client_runtime.proto";
+
+import "evaluator_runtime.proto";
+
+import "reef_service_protos.proto";
+
+
+option java_package = "com.Org.Apache.Reef.proto";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+option java_outer_classname = "REEFProtocol";
+
+message REEFMessage {
+ // Messages defined in client_runtime.proto
+ optional JobSubmissionProto jobSubmission = 1;
+ optional JobControlProto jobControl = 2;
+ // Messages defined in reef_service_protos.proto
+ optional RuntimeErrorProto runtimeError = 3;
+ optional JobStatusProto jobStatus = 4;
+ // Messages from evaluator_runtime.proto
+ optional EvaluatorControlProto evaluatorControl = 5;
+ optional EvaluatorHeartbeatProto evaluatorHeartBeat = 6;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_service_protos.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_service_protos.proto b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_service_protos.proto
new file mode 100644
index 0000000..a553ca9
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/protobuf/proto/reef_service_protos.proto
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.reef.proto";
+
+option java_outer_classname = "ReefServiceProtos";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+enum State {
+ INIT = 0;
+ RUNNING = 1;
+ DONE = 2;
+ SUSPEND = 3;
+ FAILED = 4;
+ KILLED = 5;
+}
+
+enum FileType {
+ PLAIN = 0;
+ LIB = 1;
+ ARCHIVE = 2;
+}
+
+// Removed in REEF 0.3 in favor of explicit memory sizes.
+// enum SIZE {
+// SMALL = 0;
+// MEDIUM = 1;
+// LARGE = 2;
+// XLARGE = 3;
+//}
+
+enum ProcessType {
+ JVM = 0;
+ CLR = 1;
+}
+
+message FileResourceProto {
+ required FileType type = 1;
+ required string name = 2;
+ required string path = 3;
+}
+
+message RuntimeErrorProto {
+ required string name = 1; // e.g., local, yarn21
+ required string message = 2;
+ optional bytes exception = 3;
+
+ optional string identifier = 5; // e.g., evaluator id
+}
+
+message JobStatusProto {
+ required string identifier = 1;
+ required State state = 2;
+ optional bytes message = 3;
+ optional bytes exception = 4;
+}
+
+message ContextStatusProto {
+ enum State {
+ READY = 0;
+ DONE = 1;
+ FAIL = 2;
+ }
+ required State context_state = 1;
+
+ required string context_id = 2;
+ optional string parent_id = 3;
+
+ optional bytes error = 5; // when creating the context
+
+ optional bool recovery = 6;
+ // Context messages
+ message ContextMessageProto {
+ required string source_id = 1;
+ required bytes message = 2;
+ }
+ repeated ContextMessageProto context_message = 7;
+}
+
+message TaskStatusProto {
+ required string task_id = 1;
+ required string context_id = 2;
+ required State state = 3;
+ optional bytes result = 4; // e.g., return value from Task.call()
+ optional bool recovery = 5;
+
+ // TaskMessageSource messages
+ message TaskMessageProto {
+ required string source_id = 1;
+ required bytes message = 2;
+ }
+ repeated TaskMessageProto task_message = 6;
+}
+
+message EvaluatorStatusProto {
+ required string evaluator_id = 1;
+ required State state = 2;
+ optional bytes error = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/MachineStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/MachineStatus.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/MachineStatus.cs
new file mode 100644
index 0000000..548888c
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/MachineStatus.cs
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Globalization;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Common.Runtime
+{
+ public class MachineStatus
+ {
+ private static PerformanceCounter _cpuCounter;
+
+ private static PerformanceCounter _ramCounter;
+
+ private static PerformanceCounter _processCpuCounter;
+
+ private static Process _process;
+
+ private static bool _checkStatus;
+
+ static MachineStatus()
+ {
+ _checkStatus = true;
+ _process = Process.GetCurrentProcess();
+ string processName = _process.ProcessName;
+
+ _cpuCounter = _cpuCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Processor",
+ CounterName = "% Processor Time",
+ InstanceName = "_Total",
+ };
+
+ _ramCounter = _ramCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Memory",
+ CounterName = "Available MBytes"
+ };
+
+ _processCpuCounter = _processCpuCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Process",
+ CounterName = "% Processor Time",
+ InstanceName = processName
+ };
+ }
+
+ public static string CurrentNodeCpuUsage
+ {
+ get
+ {
+ return _cpuCounter.NextValue() + "%";
+ }
+ }
+
+ public static string AvailableMemory
+ {
+ get
+ {
+ return _ramCounter.NextValue() + "MB";
+ }
+ }
+
+ public static string CurrentProcessMemoryUsage
+ {
+ get
+ {
+ return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+ }
+ }
+
+ public static string PeakProcessMemoryUsage
+ {
+ get
+ {
+ return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+ }
+ }
+
+ // this may not be accurate if there are multiple evaluator processes running on a single machine
+ public static string CurrentProcessCpuUsage
+ {
+ get
+ {
+ return ((float)_processCpuCounter.RawValue / 1000000.0) + "%";
+ }
+ }
+
+ public override string ToString()
+ {
+ string info = "No machine status information retrieved. Could be due to lack of admin right to get the info.";
+ if (_checkStatus)
+ {
+ try
+ {
+ _process.Refresh();
+ info = string.Format(
+ CultureInfo.InvariantCulture,
+ "current node is running at [{0}] CPU usage and with [{1}] memory available.{2} current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]",
+ CurrentNodeCpuUsage,
+ AvailableMemory,
+ Environment.NewLine,
+ CurrentProcessCpuUsage,
+ CurrentProcessMemoryUsage,
+ PeakProcessMemoryUsage);
+ }
+ catch (Exception e)
+ {
+ _checkStatus = false; // It only takes one exception to switch the cheking off for good.
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus)));
+ // we do not want to crash the evealuator just because we cannot get the information.
+ info = "Cannot obtain machine status due to error " + e.Message;
+ }
+ }
+
+ return info;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/Constants.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/Constants.cs
new file mode 100644
index 0000000..dc4ee0a
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/Constants.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Evaluator
+{
+ public class Constants
+ {
+ public const string RootContextConfiguration = "RootContextConfiguration";
+
+ public const string EvaluatorIdentifier = "EvaluatorIdentifier";
+
+ public const string RootServiceConfiguration = "RootServiceConfiguration";
+
+ public const string TaskConfiguration = "TaskConfiguration";
+
+ public const string ContextIdentifier = "ContextIdentifier";
+
+ public const string ApplicationIdentifier = "ApplicationIdentifier";
+
+ public const int DefaultEvaluatorHeartbeatPeriodInMs = 4000;
+
+ public const int DefaultEvaluatorHeartbeatMaxRetry = 3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
new file mode 100644
index 0000000..052764d
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Context;
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Evaluator;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Time;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common
+{
+ public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime));
+
+ private readonly string _evaluatorId;
+
+ private readonly ContextManager _contextManager;
+
+ private readonly HeartBeatManager _heartBeatManager;
+
+ private readonly IRemoteManager<REEFMessage> _remoteManager;
+
+ private readonly IClock _clock;
+
+ private State _state = State.INIT;
+
+ private IDisposable _evaluatorControlChannel;
+
+ [Inject]
+ public EvaluatorRuntime(
+ ContextManager contextManager,
+ HeartBeatManager heartBeatManager)
+ {
+ using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
+ {
+ _clock = heartBeatManager.EvaluatorSettings.RuntimeClock;
+ _heartBeatManager = heartBeatManager;
+ _contextManager = contextManager;
+ _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId;
+ _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager;
+
+ ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver();
+
+ // subscribe to driver proto message
+ driverObserver.Subscribe(o => OnNext(o.Message));
+
+ // register the driver observer
+ _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver);
+
+ // start the hearbeat
+ _clock.ScheduleAlarm(0, heartBeatManager);
+ }
+ }
+
+ public State State
+ {
+ get
+ {
+ return _state;
+ }
+ }
+
+ public void Handle(EvaluatorControlProto message)
+ {
+ lock (_heartBeatManager)
+ {
+ LOGGER.Log(Level.Info, "Handle Evaluator control message");
+ if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
+ {
+ Handle(new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId)));
+ }
+ else if (_state != State.RUNNING)
+ {
+ Handle(new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state)));
+ }
+ else
+ {
+ if (message.context_control != null)
+ {
+ LOGGER.Log(Level.Info, "Send task control message to ContextManager");
+ try
+ {
+ _contextManager.HandleTaskControl(message.context_control);
+ if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING)
+ {
+ LOGGER.Log(Level.Info, "Context stack is empty, done");
+ _state = State.DONE;
+ _heartBeatManager.OnNext(GetEvaluatorStatus());
+ _clock.Dispose();
+ }
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+ Handle(e);
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER);
+ }
+ }
+ if (message.kill_evaluator != null)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId));
+ _state = State.KILLED;
+ _clock.Dispose();
+ }
+ }
+ }
+ }
+
+ public EvaluatorStatusProto GetEvaluatorStatus()
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state));
+ EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
+ {
+ evaluator_id = _evaluatorId,
+ state = _state
+ };
+ return evaluatorStatusProto;
+ }
+
+ public void OnNext(RuntimeStart runtimeStart)
+ {
+ lock (_evaluatorId)
+ {
+ try
+ {
+ LOGGER.Log(Level.Info, "Runtime start");
+ if (_state != State.INIT)
+ {
+ var e = new InvalidOperationException("State should be init.");
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ _state = State.RUNNING;
+ _contextManager.Start();
+ _heartBeatManager.OnNext();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+ Handle(e);
+ }
+ }
+ }
+
+ void IObserver<RuntimeStart>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<REEFMessage>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<REEFMessage>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStop>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStop>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStart>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnNext(RuntimeStop runtimeStop)
+ {
+ LOGGER.Log(Level.Info, "Runtime stop");
+ _contextManager.Dispose();
+
+ if (_state == State.RUNNING)
+ {
+ _state = State.DONE;
+ _heartBeatManager.OnNext();
+ }
+ try
+ {
+ _evaluatorControlChannel.Dispose();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER);
+ }
+ LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete");
+ }
+
+ public void OnNext(REEFMessage value)
+ {
+ if (value != null && value.evaluatorControl != null)
+ {
+ LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl");
+ Handle(value.evaluatorControl);
+ }
+ }
+
+ private void Handle(Exception e)
+ {
+ lock (_heartBeatManager)
+ {
+ LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e);
+ _state = State.FAILED;
+ string errorMessage = string.Format(
+ CultureInfo.InvariantCulture,
+ "failed with error [{0}] with mesage [{1}] and stack trace [{2}]",
+ e,
+ e.Message,
+ e.StackTrace);
+ EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
+ {
+ evaluator_id = _evaluatorId,
+ error = ByteUtilities.StringToByteArrays(errorMessage),
+ state = _state
+ };
+ _heartBeatManager.OnNext(evaluatorStatusProto);
+ _contextManager.Dispose();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
new file mode 100644
index 0000000..067a0a0
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Common.Evaluator.Context;
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Time;
+using System;
+
+namespace Org.Apache.Reef.Evaluator
+{
+ // TODO: merge with EvaluatorConfigurations class
+ public class EvaluatorSettings
+ {
+ private string _applicationId;
+
+ private string _evaluatorId;
+
+ private int _heartBeatPeriodInMs;
+
+ private int _maxHeartbeatRetries;
+
+ private ContextConfiguration _rootContextConfig;
+
+ private IClock _clock;
+
+ private IRemoteManager<REEFMessage> _remoteManager;
+
+ private IInjector _injector;
+
+ private EvaluatorOperationState _operationState;
+
+ private INameClient _nameClient;
+
+ public EvaluatorSettings(
+ string applicationId,
+ string evaluatorId,
+ int heartbeatPeriodInMs,
+ int maxHeartbeatRetries,
+ ContextConfiguration rootContextConfig,
+ IClock clock,
+ IRemoteManager<REEFMessage> remoteManager,
+ IInjector injecor)
+ {
+ if (string.IsNullOrWhiteSpace(evaluatorId))
+ {
+ throw new ArgumentNullException("evaluatorId");
+ }
+ if (rootContextConfig == null)
+ {
+ throw new ArgumentNullException("rootContextConfig");
+ }
+ if (clock == null)
+ {
+ throw new ArgumentNullException("clock");
+ }
+ if (remoteManager == null)
+ {
+ throw new ArgumentNullException("remoteManager");
+ }
+ if (injecor == null)
+ {
+ throw new ArgumentNullException("injecor");
+ }
+ _applicationId = applicationId;
+ _evaluatorId = evaluatorId;
+ _heartBeatPeriodInMs = heartbeatPeriodInMs;
+ _maxHeartbeatRetries = maxHeartbeatRetries;
+ _rootContextConfig = rootContextConfig;
+ _clock = clock;
+ _remoteManager = remoteManager;
+ _injector = injecor;
+ _operationState = EvaluatorOperationState.OPERATIONAL;
+ }
+
+ public EvaluatorOperationState OperationState
+ {
+ get
+ {
+ return _operationState;
+ }
+
+ set
+ {
+ _operationState = value;
+ }
+ }
+
+ public string EvalutorId
+ {
+ get
+ {
+ return _evaluatorId;
+ }
+ }
+
+ public int HeartBeatPeriodInMs
+ {
+ get
+ {
+ return _heartBeatPeriodInMs;
+ }
+ }
+
+ public string ApplicationId
+ {
+ get
+ {
+ return _applicationId;
+ }
+ }
+
+ public int MaxHeartbeatFailures
+ {
+ get
+ {
+ return _maxHeartbeatRetries;
+ }
+ }
+
+ public ContextConfiguration RootContextConfig
+ {
+ get
+ {
+ return _rootContextConfig;
+ }
+ }
+
+ public IClock RuntimeClock
+ {
+ get
+ {
+ return _clock;
+ }
+ }
+
+ public INameClient NameClient
+ {
+ get
+ {
+ return _nameClient;
+ }
+
+ set
+ {
+ _nameClient = value;
+ }
+ }
+
+ public IRemoteManager<REEFMessage> RemoteManager
+ {
+ get
+ {
+ return _remoteManager;
+ }
+ }
+
+ public IInjector Injector
+ {
+ get
+ {
+ return _injector;
+ }
+ }
+ }
+}