You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:06:09 UTC
[48/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs
new file mode 100644
index 0000000..383467e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/ReefService.pb.cs
@@ -0,0 +1,393 @@
+//------------------------------------------------------------------------------
+// <auto-generated>
+// This code was generated by a tool.
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </auto-generated>
+//------------------------------------------------------------------------------
+
+// Generated from: reef_service_protos.proto
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto
+
+{
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"FileResourceProto")]
+ public partial class FileResourceProto : global::ProtoBuf.IExtensible
+ {
+ public FileResourceProto() {}
+
+ private FileType _type;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"type", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public FileType type
+ {
+ get { return _type; }
+ set { _type = value; }
+ }
+ private string _name;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string name
+ {
+ get { return _name; }
+ set { _name = value; }
+ }
+ private string _path;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"path", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string path
+ {
+ get { return _path; }
+ set { _path = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"RuntimeErrorProto")]
+ public partial class RuntimeErrorProto : global::ProtoBuf.IExtensible
+ {
+ public RuntimeErrorProto() {}
+
+ private string _name;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"name", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string name
+ {
+ get { return _name; }
+ set { _name = value; }
+ }
+ private string _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private byte[] _exception = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] exception
+ {
+ get { return _exception; }
+ set { _exception = value; }
+ }
+ private string _identifier = "";
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string identifier
+ {
+ get { return _identifier; }
+ set { _identifier = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"JobStatusProto")]
+ public partial class JobStatusProto : global::ProtoBuf.IExtensible
+ {
+ public JobStatusProto() {}
+
+ private string _identifier;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"identifier", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string identifier
+ {
+ get { return _identifier; }
+ set { _identifier = value; }
+ }
+ private State _state;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public State state
+ {
+ get { return _state; }
+ set { _state = value; }
+ }
+ private byte[] _message = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private byte[] _exception = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"exception", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] exception
+ {
+ get { return _exception; }
+ set { _exception = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextStatusProto")]
+ public partial class ContextStatusProto : global::ProtoBuf.IExtensible
+ {
+ public ContextStatusProto() {}
+
+ private ContextStatusProto.State _context_state;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"context_state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public ContextStatusProto.State context_state
+ {
+ get { return _context_state; }
+ set { _context_state = value; }
+ }
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private string _parent_id = "";
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"parent_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue("")]
+ public string parent_id
+ {
+ get { return _parent_id; }
+ set { _parent_id = value; }
+ }
+ private byte[] _error = null;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] error
+ {
+ get { return _error; }
+ set { _error = value; }
+ }
+ private bool _recovery = false;
+ [global::ProtoBuf.ProtoMember(6, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool recovery
+ {
+ get { return _recovery; }
+ set { _recovery = value; }
+ }
+ private readonly global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> _context_message = new global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto>();
+ [global::ProtoBuf.ProtoMember(7, Name=@"context_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public global::System.Collections.Generic.List<ContextStatusProto.ContextMessageProto> context_message
+ {
+ get { return _context_message; }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"ContextMessageProto")]
+ public partial class ContextMessageProto : global::ProtoBuf.IExtensible
+ {
+ public ContextMessageProto() {}
+
+ private string _source_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string source_id
+ {
+ get { return _source_id; }
+ set { _source_id = value; }
+ }
+ private byte[] _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"State")]
+ public enum State
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"READY", Value=0)]
+ READY = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=1)]
+ DONE = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"FAIL", Value=2)]
+ FAIL = 2
+ }
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskStatusProto")]
+ public partial class TaskStatusProto : global::ProtoBuf.IExtensible
+ {
+ public TaskStatusProto() {}
+
+ private string _task_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"task_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string task_id
+ {
+ get { return _task_id; }
+ set { _task_id = value; }
+ }
+ private string _context_id;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"context_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string context_id
+ {
+ get { return _context_id; }
+ set { _context_id = value; }
+ }
+ private State _state;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public State state
+ {
+ get { return _state; }
+ set { _state = value; }
+ }
+ private byte[] _result = null;
+ [global::ProtoBuf.ProtoMember(4, IsRequired = false, Name=@"result", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] result
+ {
+ get { return _result; }
+ set { _result = value; }
+ }
+ private bool _recovery = false;
+ [global::ProtoBuf.ProtoMember(5, IsRequired = false, Name = @"recovery", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(false)]
+ public bool recovery
+ {
+ get { return _recovery; }
+ set { _recovery = value; }
+ }
+ private readonly global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> _task_message = new global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto>();
+ [global::ProtoBuf.ProtoMember(6, Name=@"task_message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public global::System.Collections.Generic.List<TaskStatusProto.TaskMessageProto> task_message
+ {
+ get { return _task_message; }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"TaskMessageProto")]
+ public partial class TaskMessageProto : global::ProtoBuf.IExtensible
+ {
+ public TaskMessageProto() {}
+
+ private string _source_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"source_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string source_id
+ {
+ get { return _source_id; }
+ set { _source_id = value; }
+ }
+ private byte[] _message;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"message", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public byte[] message
+ {
+ get { return _message; }
+ set { _message = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::System.Serializable, global::ProtoBuf.ProtoContract(Name=@"EvaluatorStatusProto")]
+ public partial class EvaluatorStatusProto : global::ProtoBuf.IExtensible
+ {
+ public EvaluatorStatusProto() {}
+
+ private string _evaluator_id;
+ [global::ProtoBuf.ProtoMember(1, IsRequired = true, Name=@"evaluator_id", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ public string evaluator_id
+ {
+ get { return _evaluator_id; }
+ set { _evaluator_id = value; }
+ }
+ private State _state;
+ [global::ProtoBuf.ProtoMember(2, IsRequired = true, Name=@"state", DataFormat = global::ProtoBuf.DataFormat.TwosComplement)]
+ public State state
+ {
+ get { return _state; }
+ set { _state = value; }
+ }
+ private byte[] _error = null;
+ [global::ProtoBuf.ProtoMember(3, IsRequired = false, Name=@"error", DataFormat = global::ProtoBuf.DataFormat.Default)]
+ [global::System.ComponentModel.DefaultValue(null)]
+ public byte[] error
+ {
+ get { return _error; }
+ set { _error = value; }
+ }
+ private global::ProtoBuf.IExtension extensionObject;
+ global::ProtoBuf.IExtension global::ProtoBuf.IExtensible.GetExtensionObject(bool createIfMissing)
+ { return global::ProtoBuf.Extensible.GetExtensionObject(ref extensionObject, createIfMissing); }
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"State")]
+ public enum State
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"INIT", Value=0)]
+ INIT = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"RUNNING", Value=1)]
+ RUNNING = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"DONE", Value=2)]
+ DONE = 2,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"SUSPEND", Value=3)]
+ SUSPEND = 3,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"FAILED", Value=4)]
+ FAILED = 4,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"KILLED", Value=5)]
+ KILLED = 5
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"FileType")]
+ public enum FileType
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"PLAIN", Value=0)]
+ PLAIN = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"LIB", Value=1)]
+ LIB = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"ARCHIVE", Value=2)]
+ ARCHIVE = 2
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"SIZE")]
+ public enum SIZE
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"SMALL", Value=0)]
+ SMALL = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"MEDIUM", Value=1)]
+ MEDIUM = 1,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"LARGE", Value=2)]
+ LARGE = 2,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"XLARGE", Value=3)]
+ XLARGE = 3
+ }
+
+ [global::ProtoBuf.ProtoContract(Name=@"ProcessType")]
+ public enum ProcessType
+ {
+
+ [global::ProtoBuf.ProtoEnum(Name=@"JVM", Value=0)]
+ JVM = 0,
+
+ [global::ProtoBuf.ProtoEnum(Name=@"CLR", Value=1)]
+ CLR = 1
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs
new file mode 100644
index 0000000..6bd90e8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/Serializer.cs
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Utilities;
+
+using ProtoBuf;
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.IO;
+using System.Text;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1403:FileMayOnlyContainASingleNamespace", Justification = "Serializers for all protobuf types")]
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "Serializers for all protobuf types")]
+
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol
+{
+ /// <summary>
+ /// Add serializer/deserializer to REEFMessage
+ /// </summary>
+ public partial class REEFMessage
+ {
+ public REEFMessage(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
+ {
+ _evaluatorHeartBeat = evaluatorHeartbeatProto;
+ }
+
+ public static REEFMessage Deserialize(byte[] bytes)
+ {
+ REEFMessage pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<REEFMessage>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ byte[] b = null;
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize<REEFMessage>(s, this);
+ b = new byte[s.Position];
+ var fullBuffer = s.GetBuffer();
+ Array.Copy(fullBuffer, b, b.Length);
+ }
+ return b;
+ }
+ }
+}
+
+namespace Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto
+{
+ /// <summary>
+ /// Add serializer/deserializer to EvaluatorHeartbeatProto
+ /// </summary>
+ public partial class EvaluatorHeartbeatProto
+ {
+ public static EvaluatorHeartbeatProto Deserialize(byte[] bytes)
+ {
+ EvaluatorHeartbeatProto pbuf = null;
+ using (var s = new MemoryStream(bytes))
+ {
+ pbuf = Serializer.Deserialize<EvaluatorHeartbeatProto>(s);
+ }
+ return pbuf;
+ }
+
+ public byte[] Serialize()
+ {
+ byte[] b = null;
+ using (var s = new MemoryStream())
+ {
+ Serializer.Serialize<EvaluatorHeartbeatProto>(s, this);
+ b = new byte[s.Position];
+ var fullBuffer = s.GetBuffer();
+ Array.Copy(fullBuffer, b, b.Length);
+ }
+ return b;
+ }
+
+ public override string ToString()
+ {
+ string contextStatus = string.Empty;
+ string taskStatusMessage = string.Empty;
+ foreach (ContextStatusProto contextStatusProto in context_status)
+ {
+ contextStatus += string.Format(CultureInfo.InvariantCulture, "evaluator {0} has context {1} in state {2} with recovery flag {3}",
+ evaluator_status.evaluator_id,
+ contextStatusProto.context_id,
+ contextStatusProto.context_state,
+ contextStatusProto.recovery);
+ }
+ if (task_status != null && task_status.task_message != null && task_status.task_message.Count > 0)
+ {
+ foreach (TaskStatusProto.TaskMessageProto taskMessageProto in task_status.task_message)
+ {
+ taskStatusMessage += ByteUtilities.ByteArrarysToString(taskMessageProto.message);
+ }
+ }
+ return string.Format(CultureInfo.InvariantCulture, "EvaluatorHeartbeatProto: task_id=[{0}], task_status=[{1}], task_message=[{2}], evaluator_status=[{3}], context_status=[{4}], timestamp=[{5}], recoveryFlag =[{6}]",
+ task_status == null ? string.Empty : task_status.task_id,
+ task_status == null ? string.Empty : task_status.state.ToString(),
+ taskStatusMessage,
+ evaluator_status.state.ToString(),
+ contextStatus,
+ timestamp,
+ recovery);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
new file mode 100644
index 0000000..0f997a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/EvaluatorHeartbeatProtoCodec.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol
+{
+ public class EvaluatorHeartbeatProtoCodec : ICodec<EvaluatorHeartbeatProto>
+ {
+ public byte[] Encode(EvaluatorHeartbeatProto obj)
+ {
+ EvaluatorHeartbeatProto pbuf = new EvaluatorHeartbeatProto();
+
+ pbuf.evaluator_status = obj.evaluator_status;
+ return pbuf.Serialize();
+ }
+
+ public EvaluatorHeartbeatProto Decode(byte[] data)
+ {
+ EvaluatorHeartbeatProto pbuf = EvaluatorHeartbeatProto.Deserialize(data);
+ return pbuf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs
new file mode 100644
index 0000000..41109e3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/cs/codec/REEFMessageCodec.cs
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Common.ProtoBuf.ReefProtocol
+{
+ public class REEFMessageCodec : ICodec<REEFMessage>
+ {
+ public byte[] Encode(REEFMessage obj)
+ {
+ return obj.Serialize();
+ }
+
+ public REEFMessage Decode(byte[] data)
+ {
+ REEFMessage pbuf = REEFMessage.Deserialize(data);
+ return pbuf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto
new file mode 100644
index 0000000..3d1f927
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/client_runtime.proto
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "ClientRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "reef_service_protos.proto";
+
+// Messages from REEF Client -> Driver Runtime
+
+message JobSubmissionProto {
+ required string identifier = 1; // the job identifier
+ required string remote_id = 2; // the remote identifier
+ required string configuration = 5; // the runtime configuration
+ required string user_name = 6; // the user name
+
+ optional SIZE driver_size = 7;
+ optional int32 driver_memory = 8;
+ optional int32 priority = 9;
+ optional string queue = 10;
+
+ repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators
+ repeated FileResourceProto local_File = 12; // files that should be placed on the driver only
+
+}
+
+enum Signal {
+ SIG_TERMINATE = 1;
+ SIG_SUSPEND = 2;
+ SIG_RESUME = 3;
+}
+
+message JobControlProto {
+ required string identifier = 1;
+ optional Signal signal = 2;
+ optional bytes message = 3;
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto
new file mode 100644
index 0000000..2b21ac7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/driver_runtime.proto
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "DriverRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+
+import "reef_service_protos.proto";
+
+// Messages from Driver Runtime -> Driver Process
+
+message DriverProcessRegistrationProto {
+ required string remote_identifier = 1;
+}
+
+
+message NodeDescriptorProto {
+ required string identifier = 1;
+ required string host_name = 2; // e.g., IP address
+ required int32 port = 3; // e.g., IP port
+ required int32 memory_size = 4;
+ optional string rack_name = 5; // e.g., /default-rack
+}
+
+message ResourceAllocationProto {
+ required string identifier = 1; // e.g., the container id, or the thread id
+ required int32 resource_memory = 2; // megabytes
+ required string node_id = 3;
+}
+
+message ResourceStatusProto {
+ required string identifier = 1;
+ required State state = 2;
+ optional string diagnostics = 3;
+ optional int32 exit_code = 4;
+ optional bool is_from_previous_driver = 5;
+}
+
+message RuntimeStatusProto {
+ required string name = 1; // e.g., local, yarn21
+ required State state = 2;
+ optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error
+
+ optional int32 outstanding_container_requests = 5;
+ repeated string container_allocation = 6;
+}
+
+//////////////////////////////////////////////////////
+// Messages from Driver Process -> Driver Runtime
+
+message ResourceRequestProto {
+ // optional SIZE resource_size = 1; // Removed in REEF 0.3 in favor of memory_size.
+ optional int32 memory_size = 2; // Memory size of the evaluator in MB
+ optional int32 priority = 3;
+
+ required int32 resource_count = 5;
+ repeated string node_name = 6; // a list of specific nodes
+ repeated string rack_name = 7; // a list of specific racks
+
+ optional bool relax_locality = 10;
+}
+
+message ResourceReleaseProto {
+ required string identifier = 1;
+}
+
+message ResourceLaunchProto {
+ required string identifier = 1;
+ required string remote_id = 2;
+ required string evaluator_conf = 3;
+ required ProcessType type = 4;
+ repeated FileResourceProto file = 10;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto
new file mode 100644
index 0000000..1415e5c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.proto";
+option java_outer_classname = "EvaluatorRuntimeProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "reef_service_protos.proto";
+
+// Stop the evaluator
+message StopEvaluatorProto {
+}
+
+// Kill the evaluator
+message KillEvaluatorProto {
+}
+
+// Start a task
+message StartTaskProto {
+ required string context_id = 1;
+ required string configuration = 2;
+}
+
+message AddContextProto {
+ required string parent_context_id = 1;
+ required string context_configuration = 2;
+ optional string service_configuration = 3;
+}
+
+message RemoveContextProto {
+ required string context_id = 1;
+}
+
+// Stop the task
+message StopTaskProto {
+}
+
+// Suspend the task
+message SuspendTaskProto {
+}
+
+/////////////////////////////////////////
+// Message aggregators
+
+message ContextMessageProto {
+ required string context_id = 1;
+ required bytes message = 2;
+}
+
+message ContextControlProto {
+ optional bytes task_message = 1;
+ optional ContextMessageProto context_message = 2;
+
+ optional AddContextProto add_context = 5;
+ optional RemoveContextProto remove_context = 6;
+ optional StartTaskProto start_task = 7;
+ optional StopTaskProto stop_task = 8;
+ optional SuspendTaskProto suspend_task = 9;
+}
+
+message EvaluatorHeartbeatProto {
+ required int64 timestamp = 1;
+ required EvaluatorStatusProto evaluator_status = 2;
+ repeated ContextStatusProto context_status = 3;
+ optional TaskStatusProto task_status = 4;
+ optional bool recovery = 5;
+}
+
+message EvaluatorControlProto {
+ required int64 timestamp = 1;
+ required string identifier = 2;
+
+ optional ContextControlProto context_control = 3;
+ optional KillEvaluatorProto kill_evaluator = 4;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto
new file mode 100644
index 0000000..6b99415
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import "client_runtime.proto";
+
+import "evaluator_runtime.proto";
+
+import "reef_service_protos.proto";
+
+
+option java_package = "com.Org.Apache.REEF.proto";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+option java_outer_classname = "REEFProtocol";
+
+message REEFMessage {
+ // Messages defined in client_runtime.proto
+ optional JobSubmissionProto jobSubmission = 1;
+ optional JobControlProto jobControl = 2;
+ // Messages defined in reef_service_protos.proto
+ optional RuntimeErrorProto runtimeError = 3;
+ optional JobStatusProto jobStatus = 4;
+ // Messages from evaluator_runtime.proto
+ optional EvaluatorControlProto evaluatorControl = 5;
+ optional EvaluatorHeartbeatProto evaluatorHeartBeat = 6;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto
new file mode 100644
index 0000000..a553ca9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+option java_package = "org.apache.reef.reef.proto";
+
+option java_outer_classname = "ReefServiceProtos";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+enum State {
+ INIT = 0;
+ RUNNING = 1;
+ DONE = 2;
+ SUSPEND = 3;
+ FAILED = 4;
+ KILLED = 5;
+}
+
+enum FileType {
+ PLAIN = 0;
+ LIB = 1;
+ ARCHIVE = 2;
+}
+
+// Removed in REEF 0.3 in favor of explicit memory sizes.
+// enum SIZE {
+// SMALL = 0;
+// MEDIUM = 1;
+// LARGE = 2;
+// XLARGE = 3;
+//}
+
+enum ProcessType {
+ JVM = 0;
+ CLR = 1;
+}
+
+message FileResourceProto {
+ required FileType type = 1;
+ required string name = 2;
+ required string path = 3;
+}
+
+message RuntimeErrorProto {
+ required string name = 1; // e.g., local, yarn21
+ required string message = 2;
+ optional bytes exception = 3;
+
+ optional string identifier = 5; // e.g., evaluator id
+}
+
+message JobStatusProto {
+ required string identifier = 1;
+ required State state = 2;
+ optional bytes message = 3;
+ optional bytes exception = 4;
+}
+
+message ContextStatusProto {
+ enum State {
+ READY = 0;
+ DONE = 1;
+ FAIL = 2;
+ }
+ required State context_state = 1;
+
+ required string context_id = 2;
+ optional string parent_id = 3;
+
+ optional bytes error = 5; // when creating the context
+
+ optional bool recovery = 6;
+ // Context messages
+ message ContextMessageProto {
+ required string source_id = 1;
+ required bytes message = 2;
+ }
+ repeated ContextMessageProto context_message = 7;
+}
+
+message TaskStatusProto {
+ required string task_id = 1;
+ required string context_id = 2;
+ required State state = 3;
+ optional bytes result = 4; // e.g., return value from Task.call()
+ optional bool recovery = 5;
+
+ // TaskMessageSource messages
+ message TaskMessageProto {
+ required string source_id = 1;
+ required bytes message = 2;
+ }
+ repeated TaskMessageProto task_message = 6;
+}
+
+message EvaluatorStatusProto {
+ required string evaluator_id = 1;
+ required State state = 2;
+ optional bytes error = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs
new file mode 100644
index 0000000..54aca4c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Globalization;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Runtime
+{
+ public class MachineStatus
+ {
+ private static PerformanceCounter _cpuCounter;
+
+ private static PerformanceCounter _ramCounter;
+
+ private static PerformanceCounter _processCpuCounter;
+
+ private static Process _process;
+
+ private static bool _checkStatus;
+
+ static MachineStatus()
+ {
+ _checkStatus = true;
+ _process = Process.GetCurrentProcess();
+ string processName = _process.ProcessName;
+
+ _cpuCounter = _cpuCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Processor",
+ CounterName = "% Processor Time",
+ InstanceName = "_Total",
+ };
+
+ _ramCounter = _ramCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Memory",
+ CounterName = "Available MBytes"
+ };
+
+ _processCpuCounter = _processCpuCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Process",
+ CounterName = "% Processor Time",
+ InstanceName = processName
+ };
+ }
+
+ public static string CurrentNodeCpuUsage
+ {
+ get
+ {
+ return _cpuCounter.NextValue() + "%";
+ }
+ }
+
+ public static string AvailableMemory
+ {
+ get
+ {
+ return _ramCounter.NextValue() + "MB";
+ }
+ }
+
+ public static string CurrentProcessMemoryUsage
+ {
+ get
+ {
+ return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+ }
+ }
+
+ public static string PeakProcessMemoryUsage
+ {
+ get
+ {
+ return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+ }
+ }
+
+ // this may not be accurate if there are multiple evaluator processes running on a single machine
+ public static string CurrentProcessCpuUsage
+ {
+ get
+ {
+ return ((float)_processCpuCounter.RawValue / 1000000.0) + "%";
+ }
+ }
+
+ public override string ToString()
+ {
+ string info = "No machine status information retrieved. Could be due to lack of admin right to get the info.";
+ if (_checkStatus)
+ {
+ try
+ {
+ _process.Refresh();
+ info = string.Format(
+ CultureInfo.InvariantCulture,
+ "current node is running at [{0}] CPU usage and with [{1}] memory available.{2} current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]",
+ CurrentNodeCpuUsage,
+ AvailableMemory,
+ Environment.NewLine,
+ CurrentProcessCpuUsage,
+ CurrentProcessMemoryUsage,
+ PeakProcessMemoryUsage);
+ }
+ catch (Exception e)
+ {
+ _checkStatus = false; // It only takes one exception to switch the cheking off for good.
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus)));
+ // we do not want to crash the evealuator just because we cannot get the information.
+ info = "Cannot obtain machine status due to error " + e.Message;
+ }
+ }
+
+ return info;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs
new file mode 100644
index 0000000..97d705b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Evaluator
+{
+ public class Constants
+ {
+ public const string RootContextConfiguration = "RootContextConfiguration";
+
+ public const string EvaluatorIdentifier = "EvaluatorIdentifier";
+
+ public const string RootServiceConfiguration = "RootServiceConfiguration";
+
+ public const string TaskConfiguration = "TaskConfiguration";
+
+ public const string ContextIdentifier = "ContextIdentifier";
+
+ public const string ApplicationIdentifier = "ApplicationIdentifier";
+
+ public const int DefaultEvaluatorHeartbeatPeriodInMs = 4000;
+
+ public const int DefaultEvaluatorHeartbeatMaxRetry = 3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs
new file mode 100644
index 0000000..217e24d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Evaluator;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.REEF.Common
+{
+ public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime));
+
+ private readonly string _evaluatorId;
+
+ private readonly ContextManager _contextManager;
+
+ private readonly HeartBeatManager _heartBeatManager;
+
+ private readonly IRemoteManager<REEFMessage> _remoteManager;
+
+ private readonly IClock _clock;
+
+ private State _state = State.INIT;
+
+ private IDisposable _evaluatorControlChannel;
+
+ [Inject]
+ public EvaluatorRuntime(
+ ContextManager contextManager,
+ HeartBeatManager heartBeatManager)
+ {
+ using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
+ {
+ _clock = heartBeatManager.EvaluatorSettings.RuntimeClock;
+ _heartBeatManager = heartBeatManager;
+ _contextManager = contextManager;
+ _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId;
+ _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager;
+
+ ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver();
+
+ // subscribe to driver proto message
+ driverObserver.Subscribe(o => OnNext(o.Message));
+
+ // register the driver observer
+ _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver);
+
+ // start the hearbeat
+ _clock.ScheduleAlarm(0, heartBeatManager);
+ }
+ }
+
+ public State State
+ {
+ get
+ {
+ return _state;
+ }
+ }
+
+ public void Handle(EvaluatorControlProto message)
+ {
+ lock (_heartBeatManager)
+ {
+ LOGGER.Log(Level.Info, "Handle Evaluator control message");
+ if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase))
+ {
+ Handle(new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId)));
+ }
+ else if (_state != State.RUNNING)
+ {
+ Handle(new InvalidOperationException(
+ string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state)));
+ }
+ else
+ {
+ if (message.context_control != null)
+ {
+ LOGGER.Log(Level.Info, "Send task control message to ContextManager");
+ try
+ {
+ _contextManager.HandleTaskControl(message.context_control);
+ if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING)
+ {
+ LOGGER.Log(Level.Info, "Context stack is empty, done");
+ _state = State.DONE;
+ _heartBeatManager.OnNext(GetEvaluatorStatus());
+ _clock.Dispose();
+ }
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+ Handle(e);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER);
+ }
+ }
+ if (message.kill_evaluator != null)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId));
+ _state = State.KILLED;
+ _clock.Dispose();
+ }
+ }
+ }
+ }
+
+ public EvaluatorStatusProto GetEvaluatorStatus()
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state));
+ EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
+ {
+ evaluator_id = _evaluatorId,
+ state = _state
+ };
+ return evaluatorStatusProto;
+ }
+
+ public void OnNext(RuntimeStart runtimeStart)
+ {
+ lock (_evaluatorId)
+ {
+ try
+ {
+ LOGGER.Log(Level.Info, "Runtime start");
+ if (_state != State.INIT)
+ {
+ var e = new InvalidOperationException("State should be init.");
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ _state = State.RUNNING;
+ _contextManager.Start();
+ _heartBeatManager.OnNext();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+ Handle(e);
+ }
+ }
+ }
+
+ void IObserver<RuntimeStart>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<REEFMessage>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<REEFMessage>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStop>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStop>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStart>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnNext(RuntimeStop runtimeStop)
+ {
+ LOGGER.Log(Level.Info, "Runtime stop");
+ _contextManager.Dispose();
+
+ if (_state == State.RUNNING)
+ {
+ _state = State.DONE;
+ _heartBeatManager.OnNext();
+ }
+ try
+ {
+ _evaluatorControlChannel.Dispose();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER);
+ }
+ LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete");
+ }
+
+ public void OnNext(REEFMessage value)
+ {
+ if (value != null && value.evaluatorControl != null)
+ {
+ LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl");
+ Handle(value.evaluatorControl);
+ }
+ }
+
+ private void Handle(Exception e)
+ {
+ lock (_heartBeatManager)
+ {
+ LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e);
+ _state = State.FAILED;
+ string errorMessage = string.Format(
+ CultureInfo.InvariantCulture,
+ "failed with error [{0}] with mesage [{1}] and stack trace [{2}]",
+ e,
+ e.Message,
+ e.StackTrace);
+ EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto()
+ {
+ evaluator_id = _evaluatorId,
+ error = ByteUtilities.StringToByteArrays(errorMessage),
+ state = _state
+ };
+ _heartBeatManager.OnNext(evaluatorStatusProto);
+ _contextManager.Dispose();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs
new file mode 100644
index 0000000..bc939d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Common.Evaluator.Context;
+using Org.Apache.REEF.Common.io;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Time;
+using System;
+
+namespace Org.Apache.REEF.Evaluator
+{
+ // TODO: merge with EvaluatorConfigurations class
+ public class EvaluatorSettings
+ {
+ private string _applicationId;
+
+ private string _evaluatorId;
+
+ private int _heartBeatPeriodInMs;
+
+ private int _maxHeartbeatRetries;
+
+ private ContextConfiguration _rootContextConfig;
+
+ private IClock _clock;
+
+ private IRemoteManager<REEFMessage> _remoteManager;
+
+ private IInjector _injector;
+
+ private EvaluatorOperationState _operationState;
+
+ private INameClient _nameClient;
+
+ public EvaluatorSettings(
+ string applicationId,
+ string evaluatorId,
+ int heartbeatPeriodInMs,
+ int maxHeartbeatRetries,
+ ContextConfiguration rootContextConfig,
+ IClock clock,
+ IRemoteManager<REEFMessage> remoteManager,
+ IInjector injecor)
+ {
+ if (string.IsNullOrWhiteSpace(evaluatorId))
+ {
+ throw new ArgumentNullException("evaluatorId");
+ }
+ if (rootContextConfig == null)
+ {
+ throw new ArgumentNullException("rootContextConfig");
+ }
+ if (clock == null)
+ {
+ throw new ArgumentNullException("clock");
+ }
+ if (remoteManager == null)
+ {
+ throw new ArgumentNullException("remoteManager");
+ }
+ if (injecor == null)
+ {
+ throw new ArgumentNullException("injecor");
+ }
+ _applicationId = applicationId;
+ _evaluatorId = evaluatorId;
+ _heartBeatPeriodInMs = heartbeatPeriodInMs;
+ _maxHeartbeatRetries = maxHeartbeatRetries;
+ _rootContextConfig = rootContextConfig;
+ _clock = clock;
+ _remoteManager = remoteManager;
+ _injector = injecor;
+ _operationState = EvaluatorOperationState.OPERATIONAL;
+ }
+
+ public EvaluatorOperationState OperationState
+ {
+ get
+ {
+ return _operationState;
+ }
+
+ set
+ {
+ _operationState = value;
+ }
+ }
+
+ public string EvalutorId
+ {
+ get
+ {
+ return _evaluatorId;
+ }
+ }
+
+ public int HeartBeatPeriodInMs
+ {
+ get
+ {
+ return _heartBeatPeriodInMs;
+ }
+ }
+
+ public string ApplicationId
+ {
+ get
+ {
+ return _applicationId;
+ }
+ }
+
+ public int MaxHeartbeatFailures
+ {
+ get
+ {
+ return _maxHeartbeatRetries;
+ }
+ }
+
+ public ContextConfiguration RootContextConfig
+ {
+ get
+ {
+ return _rootContextConfig;
+ }
+ }
+
+ public IClock RuntimeClock
+ {
+ get
+ {
+ return _clock;
+ }
+ }
+
+ public INameClient NameClient
+ {
+ get
+ {
+ return _nameClient;
+ }
+
+ set
+ {
+ _nameClient = value;
+ }
+ }
+
+ public IRemoteManager<REEFMessage> RemoteManager
+ {
+ get
+ {
+ return _remoteManager;
+ }
+ }
+
+ public IInjector Injector
+ {
+ get
+ {
+ return _injector;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs
new file mode 100644
index 0000000..6d2121e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Common.Runtime;
+using Org.Apache.REEF.Evaluator;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.Linq;
+using System.Net;
+using System.Threading;
+
+namespace Org.Apache.REEF.Common
+{
+ public class HeartBeatManager : IObserver<Alarm>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager));
+
+ private static readonly MachineStatus MachineStatus = new MachineStatus();
+
+ private readonly IRemoteManager<REEFMessage> _remoteManager;
+
+ private readonly IClock _clock;
+
+ private readonly int _heartBeatPeriodInMillSeconds;
+
+ private readonly int _maxHeartbeatRetries = 0;
+
+ private readonly string _evaluatorId;
+
+ private IRemoteIdentifier _remoteId;
+
+ private IObserver<REEFMessage> _observer;
+
+ private int _heartbeatFailures = 0;
+
+ private IDriverConnection _driverConnection;
+
+ private EvaluatorSettings _evaluatorSettings;
+
+ // the queue can only contains the following:
+ // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state
+ // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat)
+ private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>();
+
+ public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId)
+ {
+ using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager"))
+ {
+ _remoteManager = settings.RemoteManager;
+ _remoteId = remoteId;
+ _evaluatorId = settings.EvalutorId;
+ _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
+ _clock = settings.RuntimeClock;
+ _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs;
+ _maxHeartbeatRetries = settings.MaxHeartbeatFailures;
+ EvaluatorSettings = settings;
+ MachineStatus.ToString(); // kick start the CPU perf counter
+ }
+ }
+
+ [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
+ public EvaluatorRuntime _evaluatorRuntime { get; set; }
+
+ [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")]
+ public ContextManager _contextManager { get; set; }
+
+ public EvaluatorSettings EvaluatorSettings
+ {
+ get
+ {
+ return _evaluatorSettings;
+ }
+
+ private set
+ {
+ _evaluatorSettings = value;
+ }
+ }
+
+ public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto)
+ {
+ lock (_queuedHeartbeats)
+ {
+ if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY)
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto));
+ _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
+ return;
+ }
+
+ // NOT during recovery, try to send
+ REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto);
+ try
+ {
+ _observer.OnNext(payload);
+ _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures
+ }
+ catch (Exception e)
+ {
+ if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER);
+ }
+
+ _heartbeatFailures++;
+
+ _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto);
+ LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e);
+
+ if (_heartbeatFailures >= _maxHeartbeatRetries)
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures));
+ try
+ {
+ _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>();
+ }
+ catch (Exception ex)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER);
+ }
+ LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection);
+ _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY;
+
+ // clean heartbeat failure
+ _heartbeatFailures = 0;
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Assemble a complete new heartbeat and send it out.
+ /// </summary>
+ public void OnNext()
+ {
+ LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()");
+ lock (this)
+ {
+ LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()");
+ EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto();
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+ Send(heartbeatProto);
+ }
+ }
+
+ /// <summary>
+ /// Called with a specific TaskStatus that must be delivered to the driver
+ /// </summary>
+ /// <param name="taskStatusProto"></param>
+ public void OnNext(TaskStatusProto taskStatusProto)
+ {
+ LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)");
+ lock (this)
+ {
+ LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)");
+ EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
+ _evaluatorRuntime.GetEvaluatorStatus(),
+ _contextManager.GetContextStatusCollection(),
+ Optional<TaskStatusProto>.Of(taskStatusProto));
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+ Send(heartbeatProto);
+ }
+ }
+
+ /// <summary>
+ /// Called with a specific ContextStatusProto that must be delivered to the driver
+ /// </summary>
+ /// <param name="contextStatusProto"></param>
+ public void OnNext(ContextStatusProto contextStatusProto)
+ {
+ LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)");
+ lock (this)
+ {
+ LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)");
+ List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>();
+ contextStatusProtos.Add(contextStatusProto);
+ contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection());
+ EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(
+ _evaluatorRuntime.GetEvaluatorStatus(),
+ contextStatusProtos,
+ Optional<TaskStatusProto>.Empty());
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+ Send(heartbeatProto);
+ }
+ }
+
+ /// <summary>
+ /// Called with a specific EvaluatorStatus that must be delivered to the driver
+ /// </summary>
+ /// <param name="evaluatorStatusProto"></param>
+ public void OnNext(EvaluatorStatusProto evaluatorStatusProto)
+ {
+ LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)");
+ lock (this)
+ {
+ LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)");
+ EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto()
+ {
+ timestamp = CurrentTimeMilliSeconds(),
+ evaluator_status = evaluatorStatusProto
+ };
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto));
+ Send(heartbeatProto);
+ }
+ }
+
+ public void OnNext(Alarm value)
+ {
+ LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)");
+ lock (this)
+ {
+ LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)");
+ if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING)
+ {
+ EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
+ LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString()));
+ Send(evaluatorHeartbeatProto);
+ _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
+ }
+ else
+ {
+ LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State));
+ try
+ {
+ DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId);
+ if (driverInformation == null)
+ {
+ LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later.");
+ }
+ else
+ {
+ LOGGER.Log(
+ Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId));
+ Recover(driverInformation);
+ }
+ }
+ catch (Exception e)
+ {
+ // we do not want any exception to stop the query for driver status
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER);
+ }
+ _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this);
+ }
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ private static long CurrentTimeMilliSeconds()
+ {
+ // this is an implmenation to get current time milli second counted from Jan 1st, 1970
+ // it is chose as such to be compatible with java implmentation
+ DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds;
+ }
+
+ private void Recover(DriverInformation driverInformation)
+ {
+ IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier);
+ _remoteId = new SocketRemoteIdentifier(driverEndpoint);
+ _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId));
+ lock (_evaluatorSettings)
+ {
+ if (_evaluatorSettings.NameClient != null)
+ {
+ try
+ {
+ LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId);
+ _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId));
+ LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId);
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+ }
+ }
+ }
+
+ lock (_queuedHeartbeats)
+ {
+ bool firstHeartbeatInQueue = true;
+ while (_queuedHeartbeats.Any())
+ {
+ LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId);
+ try
+ {
+ if (firstHeartbeatInQueue)
+ {
+ // first heartbeat is specially construted to include the recovery flag
+ EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue());
+ LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat);
+ _observer.OnNext(new REEFMessage(recoveryHeartbeat));
+ firstHeartbeatInQueue = false;
+ }
+ else
+ {
+ _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue()));
+ }
+ }
+ catch (Exception e)
+ {
+ // we do not handle failures during RECOVERY
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(
+ e,
+ Level.Error,
+ string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId),
+ LOGGER);
+ }
+ Thread.Sleep(500);
+ }
+ }
+ _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL;
+ LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ===========");
+ }
+
+ private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat)
+ {
+ heartbeat.recovery = true;
+ heartbeat.context_status.ForEach(c => c.recovery = true);
+ heartbeat.task_status.recovery = true;
+ return heartbeat;
+ }
+
+ private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto()
+ {
+ return GetEvaluatorHeartbeatProto(
+ _evaluatorRuntime.GetEvaluatorStatus(),
+ _contextManager.GetContextStatusCollection(),
+ _contextManager.GetTaskStatus());
+ }
+
+ private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto(
+ EvaluatorStatusProto evaluatorStatusProto,
+ ICollection<ContextStatusProto> contextStatusProtos,
+ Optional<TaskStatusProto> taskStatusProto)
+ {
+ EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto()
+ {
+ timestamp = CurrentTimeMilliSeconds(),
+ evaluator_status = evaluatorStatusProto
+ };
+ foreach (ContextStatusProto contextStatusProto in contextStatusProtos)
+ {
+ evaluatorHeartbeatProto.context_status.Add(contextStatusProto);
+ }
+ if (taskStatusProto.IsPresent())
+ {
+ evaluatorHeartbeatProto.task_status = taskStatusProto.Value;
+ }
+ return evaluatorHeartbeatProto;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs
new file mode 100644
index 0000000..5593e08
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using System;
+using System.Globalization;
+using System.Threading;
+
+namespace Org.Apache.REEF.Common
+{
+ public class ReefMessageProtoObserver :
+ IObserver<IRemoteMessage<REEFMessage>>,
+ IObservable<IRemoteMessage<REEFMessage>>,
+ IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver));
+ private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null;
+ private long _count = 0;
+ private DateTime _begin;
+ private DateTime _origBegin;
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(IRemoteMessage<REEFMessage> value)
+ {
+ REEFMessage remoteEvent = value.Message;
+ IRemoteIdentifier id = value.Identifier;
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id));
+
+ if (remoteEvent.evaluatorControl != null)
+ {
+ if (remoteEvent.evaluatorControl.context_control != null)
+ {
+ string context_message = null;
+ string task_message = null;
+
+ if (remoteEvent.evaluatorControl.context_control.context_message != null)
+ {
+ context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString();
+ }
+ if (remoteEvent.evaluatorControl.context_control.task_message != null)
+ {
+ task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message);
+ }
+
+ if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message)))
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.remove_context != null)
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.add_context != null)
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.start_task != null)
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.stop_task != null)
+ {
+ LOGGER.Log(Level.Info, "Control protobuf to stop task");
+ }
+ else if (remoteEvent.evaluatorControl.context_control.suspend_task != null)
+ {
+ LOGGER.Log(Level.Info, "Control protobuf to suspend task");
+ }
+ }
+ }
+ if (_count == 0)
+ {
+ _begin = DateTime.Now;
+ _origBegin = _begin;
+ }
+ var count = Interlocked.Increment(ref _count);
+
+ int printBatchSize = 100000;
+ if (count % printBatchSize == 0)
+ {
+ DateTime end = DateTime.Now;
+ var diff = (end - _begin).TotalMilliseconds;
+ double seconds = diff / 1000.0;
+ long eventsPerSecond = (long)(printBatchSize / seconds);
+ _begin = DateTime.Now;
+ }
+
+ var observer = _observer;
+ if (observer != null)
+ {
+ observer.OnNext(value);
+ }
+ }
+
+ public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer)
+ {
+ if (_observer != null)
+ {
+ return null;
+ }
+ _observer = observer;
+ return this;
+ }
+
+ public void Dispose()
+ {
+ _observer = null;
+ }
+ }
+}